Nornir 网络自动化框架 代码解析

Nornir 一种类似ansible的python网络设备管理框架; 官方定义: ```text Nornir is an automation framework written in python to be used with python 基于python开发的、同时也是通过ptyhon方式调用的 (网络)自动化框架 ``` 官方参考文档:https://nornir.readthedocs.io/en/latest/index.html   本文目的:介绍nornir底层代码,以便后续进行二次开发。   ### 1.初识nornir 先根据官网的tutorial 跑通一下nornir的整个流程 * a.目录结构 ```language nornir_ssh % ls -R config.yaml inventory nornir.log nornir_01.py ./inventory: defaults.yaml groups.yaml hosts.yaml ``` * b.文件内容 ```yaml # ./inventory/hosts.yaml --- h3c_sw: hostname: 192.168.31.103 port: 22 username: admin password: admin platform: hp_comware # 这里根据使用的connectionPlugin的不同,选择对应厂商的名字, # 本次使用netmiko,所以netmiko连接h3c的设备可以用hp_comware的设备类型(device_type); # 如果是napalm的话 名字可能会不一样。 group: - H3C_Device data: cmds: - display cur ``` ```yaml # ./inventory/groups.yaml --- H3C_Device: data: - vsr1000 Cisco_Device: data: - ios ``` ```yaml # ./inventory/defaults.yaml 为空 # nornir.log 为空 ``` ```yaml # config.yaml --- inventory: plugin: SimpleInventory options: host_file: "inventory/hosts.yaml" group_file: "inventory/groups.yaml" defaults_file: "inventory/defaults.yaml" runner: plugin: threaded options: num_workers: 100 ``` ```python # nornir01.py 可以随便取,无所谓 from nornir import InitNornir from nornir_utils.plugins.functions import print_result from nornir_netmiko import netmiko_send_command def show_cmds(task): outputs = [] cmds = task.host.data['cmds'] for cmd in cmds: result = task.run(netmiko_send_command, command_string=cmd) output = result.result outputs += output return outputs nr = InitNornir(config_file='config.yaml', dry_run=True) results = nr.run(task=show_cmds) print_result(results['h3c_sw'][0]) ``` 运行上面的python文件: ```python python nornir01.py ```   ### 2.理解nornir组件概念和python包的知识 ```python # 1.最底层的task是函数,后面函数作为Task对象的参数,封装一个Task对象task # 2.processor 本质是一个接口,所谓接口就是要实现特定方法的类 # 3.理解pkg_resource 包的作用,其实就是将一些目录下的文件所定义的对象 加载到一个字典中来;下面的连接插件注册和运行插件注册都是用来这个原理 # 4.connection plugin 是什么? netmiko/napalm ? # 5.runner plugin 是什么? threaded ? # 6.插件加载完后,会根据输入的内容判断 是否 插件字典available有对应名字的插件对象 ```   ### 3.结合nornir底层代码片段来分析整个流程 下面会重点讲诉如下几行代码: ```python ''' ''' def show_cmds(task): outputs = [] cmds = task.host.data['cmds'] for cmd in cmds: result = task.run(netmiko_send_command, command_string=cmd) output = result.result outputs += output return outputs nr = InitNornir(config_file='config.yaml', dry_run=True) results = nr.run(task=show_cmds) ''' ''' ``` * 3.1.自定义函数show_comds中 必须要有1个参数,这个参数到时会被传递一个Task实例对象: ```python # nornir.core.task.py -> Task.start() r = self.task(self, **self.params) # 这个self.task是自定义函数show_cmds ``` * 3.2.InitNornir()根据配置文件或关键字参数进行 配置项初始化 ```python def InitNornir(config_file: str = "", dry_run: bool = False, **kwargs: Any,) -> Nornir: """ Arguments: config_file(str): Path to the configuration file (optional) dry_run(bool): Whether to simulate changes or not configure_logging: Whether to configure logging or not. This argument is being deprecated. Please use logging.enabled parameter in the configuration instead. **kwargs: Extra information to pass to the :obj:`nornir.core.configuration.Config` object Returns: :obj:`nornir.core.Nornir`: fully instantiated and configured """ ConnectionPluginRegister.auto_register() # 注册/加载 连接插件 比如netmiko、napalm等;通过pkg_resource包来注册 # 有传递配置文件的话就根据配置文件初始化,否则根据关键字参数初始化 if config_file: config = Config.from_file(config_file, **kwargs) else: config = Config.from_dict(**kwargs) data = GlobalState(dry_run=dry_run) config.logging.configure() # 返回一个Nornir实例对象 return Nornir( inventory=load_inventory(config), # 加载主机插件,比如SimpleInventory,将主机资产从配置文件 放到python对象中 runner=load_runner(config), # 加载runner插件,比如threaded、serial config=config, data=data, ) ``` * 3.3 下面看一下Nornir.runner.run这个方法 重点是这行代码:result = self.runner.run(task, run_on),其实是进入runner插件的run()方法里面 ```python # nornir.core.__init__.py -> Nornir.run() def run( self, task, raise_on_error=None, on_good=True, on_failed=False, name: Optional[str] = None, **kwargs, ): """ Run task over all the hosts in the inventory. Arguments: task (``callable``): function or callable that will be run against each device in the inventory raise_on_error (``bool``): Override raise_on_error behavior on_good(``bool``): Whether to run or not this task on hosts marked as good on_failed(``bool``): Whether to run or not this task on hosts marked as failed **kwargs: additional argument to pass to ``task`` when calling it Raises: :obj:`nornir.core.exceptions.NornirExecutionError`: if at least a task fails and self.config.core.raise_on_error is set to ``True`` Returns: :obj:`nornir.core.task.AggregatedResult`: results of each execution """ task = Task( task, # task 是一个函数 self, # nornir 实例 global_dry_run=self.data.dry_run, name=name, processors=self.processors, # 将包含processor的列表传递进去;processor是实现来一些方法的类,这些方法在任务执行过程的某个时间点会被执行 **kwargs, # 这个参数应该是放入到task函数里面的 ) self.processors.task_started(task) # 运行任务启动前的processor 的task_task_started方法;processor是针对所有设备而言的,不是针对特定的设备 run_on = [] # 存放需要运行的task函数的所有设备 if on_good: for name, host in self.inventory.hosts.items(): if name not in self.data.failed_hosts: run_on.append(host) if on_failed: for name, host in self.inventory.hosts.items(): if name in self.data.failed_hosts: run_on.append(host) # 打印日志,日志输出一共有多少台设备 num_hosts = len(self.inventory.hosts) if num_hosts: logger.info( "Running task %r with args %s on %d hosts", task.name, kwargs, num_hosts, ) else: logger.warning("Task %r has not been run – 0 hosts selected", task.name) # 将封装好的Task对象task 和所有主机 放入runner.run()中执行 result = self.runner.run(task, run_on) # runner.run()异常处理 raise_on_error = ( raise_on_error if raise_on_error is not None else self.config.core.raise_on_error ) # noqa if raise_on_error: result.raise_on_error() else: self.data.failed_hosts.update(result.failed_hosts.keys()) # runner.run()运行完成是调用processor.task_completed()函数 self.processors.task_completed(task, result) return result ``` * 3.3.1 进入runner插件的run方法看一下 如果直接点击代码引用的话,是直接跳到runner接口定义的代码中,我们要看具体的runner插件实现. 看如下代码,其实是将Task.start方法放入线程池里面(因为我们的runner插件在config.yaml中配置的是threaded) ```python # nornir/plugins/runners/__init__.py -> ThreadedRunner.run() def run(self, task: Task, hosts: List[Host]) -> AggregatedResult: result = AggregatedResult(task.name) futures = [] with ThreadPoolExecutor(self.num_workers) as pool: for host in hosts: future = pool.submit(task.copy().start, host) futures.append(future) ``` * 3.3.2 再进上面代码中的重点代码看看 future = pool.submit(task.copy().start, host) ```python # copy比较简单 直接把Task实例对象的属性 用来再实例化Task def copy(self) -> "Task": return Task( self.task, self.nornir, self.global_dry_run, self.processors, self.name, self.severity_level, self.parent_task, **self.params ) def __repr__(self) -> str: return self.name # 这个start方法是重点 def start(self, host: "Host") -> "MultiResult": """ Run the task for the given host. Arguments: host (:obj:`nornir.core.inventory.Host`): Host we are operating with. Populated right before calling the ``task`` nornir(:obj:`nornir.core.Nornir`): Populated right before calling the ``task`` Returns: host (:obj:`nornir.core.task.MultiResult`): Results of the task and its subtasks """ self.host = host if self.parent_task is not None: self.processors.subtask_instance_started(self, host) else: self.processors.task_instance_started(self, host) try: logger.debug("Host %r: running task %r", self.host.name, self.name) # 重点,self.task是自定义函数来,不是Task对象,看第一个参数是self,跟代码分析时说的一样,我们自定义函数时必须要给函数设定一个参数 r = self.task(self, **self.params) if not isinstance(r, Result): r = Result(host=host, result=r) except NornirSubTaskError as e: tb = traceback.format_exc() logger.error( "Host %r: task %r failed with traceback:\n%s", self.host.name, self.name, tb, ) r = Result(host, exception=e, result=str(e), failed=True) except Exception as e: tb = traceback.format_exc() logger.error( "Host %r: task %r failed with traceback:\n%s", self.host.name, self.name, tb, ) r = Result(host, exception=e, result=tb, failed=True) r.name = self.name if r.severity_level == DEFAULT_SEVERITY_LEVEL: if r.failed: r.severity_level = logging.ERROR else: r.severity_level = self.severity_level self.results.insert(0, r) if self.parent_task is not None: self.processors.subtask_instance_completed(self, host, self.results) else: self.processors.task_instance_completed(self, host, self.results) return self.results ``` * 3.4 下面在看一下Task.run()这个方法 注意跟Nornir.runner.run()区分,不要搞混淆 ```python # nornir/core/task.py -> Task.run() # 这个Task里面的run 是自定义函数调用 其他task函数时用的(有点嵌套的感觉) def run(self, task: Callable[..., Any], **kwargs: Any) -> "MultiResult": """ This is a utility method to call a task from within a task. For instance: def grouped_tasks(task): task.run(my_first_task) task.run(my_second_task) nornir.run(grouped_tasks) This method will ensure the subtask is run only for the host in the current thread. """ if not self.host: msg = ( "You have to call this after setting host and nornir attributes. ", "You probably called this from outside a nested task", ) raise Exception(msg) if "severity_level" not in kwargs: kwargs["severity_level"] = self.severity_level # 重点1: 将内嵌的task方法封装成Task对象 run_task = Task( task, self.nornir, global_dry_run=self.global_dry_run, processors=self.processors, parent_task=self, **kwargs ) # 重点2: 调用start方法,上面“3.3.2”中说过的Task.start r = run_task.start(self.host) self.results.append(r[0] if len(r) == 1 else cast("Result", r)) if r.failed: # Without this we will keep running the grouped task raise NornirSubTaskError(task=run_task, result=r) return r ``` ### 4.总结 通过上述代码的演示,应该可以看出Nornir底层代码的大概实现了。 如果要看懂,要自己亲自动手一起调试结合断点 才能看明白,不然会看的很晕。   ps: 其实还有个processor的东西还可以讲,其实就是在Task运行到某个状态执行processor类中的某个方法,其实在上面代码分析过程中也是可以大概看出一些眉目来的。因为篇幅的原因就说到这,如果有什么不对的地方,望批评指正,谢谢!
优秀的个人博客,低调大师

微信关注我们

原文链接:https://blog.51cto.com/jackor/2977342

转载内容版权归作者及来源网站所有!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

相关文章

发表评论

资源下载

更多资源
优质分享Android(本站安卓app)

优质分享Android(本站安卓app)

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Oracle Database,又名Oracle RDBMS

Oracle Database,又名Oracle RDBMS

Oracle Database,又名Oracle RDBMS,或简称Oracle。是甲骨文公司的一款关系数据库管理系统。它是在数据库领域一直处于领先地位的产品。可以说Oracle数据库系统是目前世界上流行的关系数据库管理系统,系统可移植性好、使用方便、功能强,适用于各类大、中、小、微机环境。它是一种高效率、可靠性好的、适应高吞吐量的数据库方案。

Apache Tomcat7、8、9(Java Web服务器)

Apache Tomcat7、8、9(Java Web服务器)

Tomcat是Apache 软件基金会(Apache Software Foundation)的Jakarta 项目中的一个核心项目,由Apache、Sun 和其他一些公司及个人共同开发而成。因为Tomcat 技术先进、性能稳定,而且免费,因而深受Java 爱好者的喜爱并得到了部分软件开发商的认可,成为目前比较流行的Web 应用服务器。

Sublime Text 一个代码编辑器

Sublime Text 一个代码编辑器

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。