实例讲解Python 解析JSON实现主机管理
本文分享自华为云社区《Python 解析JSON实现主机管理》,作者: LyShark。
JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,它以易于阅读和编写的文本形式表示数据。JSON 是一种独立于编程语言的数据格式,因此在不同的编程语言中都有对应的解析器和生成器。JSON 格式的设计目标是易于理解、支持复杂数据结构和具有良好的可扩展性。
JSON 数据是以键值对的形式存在的,而且易于阅读和编写。以下是一个简单的 JSON 示例:
{ "name": "John Doe", "age": 30, "city": "New York", "isStudent": false, "grades": [95, 88, 75, 92], "address": { "street": "123 Main St", "zipCode": "10001" } }
在这个例子中,JSON 对象包含了一些属性,包括字符串、数字、布尔值、数组和嵌套的对象。
"name": "John Doe"
:字符串键值对。"age": 30
:数字键值对。"city": "New York"
:字符串键值对。"isStudent": false
:布尔键值对。"grades": [95, 88, 75, 92]
:数组键值对。"address": {...}
:嵌套对象。
在实际应用中,JSON 数据通常用于前后端之间的数据交换,或者配置文件的存储。各种编程语言都提供了处理 JSON
数据的库或模块。
很早之前大概是两年前,当时为了实现批量管理SSH
账号密码并实现自动巡检功能,写过一个简单的命令行工具,通过使用JSON
实现对特定主机账号密码与组的管理,如下代码,通过定义AdminDataBase()
类,传如数据库文件名database.json
实现对特定JSON
文件的增删改查功能,在编写该案例后我对JSON
的使用变得更加深刻了。
# -*- coding: utf-8 -*- import os,json class AdminDataBase(object): def __init__(self, database_path): self.database_path = database_path # 判断数据库文件是否存在,不存则则创建一个. def InitDatabase(self): if os.path.exists(self.database_path) != None : init_database = \ { "HostList": [["1000", "127.0.0.1", "username", "password", "22"]], "HostGroup": [{"DefaultGroup": ["1000"]}, ], } with open(self.database_path, "w", encoding="utf-8") as fp: fp.write(json.dumps(init_database)) print("[+] {} 结构已被初始化.".format(self.database_path)) # table 接收一个列表名,根据列表名输出列表中的数据 def ShowHostList(self): with open(self.database_path, "r", encoding="utf-8") as Read_Pointer: load_json = json.loads(Read_Pointer.read()) base = load_json.get("HostList") print("-" * 80) print("UUID \t 主机地址 \t\t\t 登录用户 \t\t 登录密码 \t 端口") print("-" * 80) for each in range(0, len(base)): print("{0:4} \t {1:15} \t {2:10} \t {3:10} \t {4:10}" .format(base[each][0], base[each][1], base[each][2], base[each][3], base[each][4])) print() # 在原来的基础上添加一台新的主机,添加到HostList表中 def AddHost(self, uuid, address, username, password, port): with open(self.database_path, "r", encoding="utf-8") as Read_Pointer: # 读取 database.json 中的数据到内存中 load_json = json.loads(Read_Pointer.read()) # 先读入内存,修改后全部替换进去 host_list = load_json.get("HostList") # 获取到最后一次循环的元素列表 for each in range(len(host_list) - 1, len(host_list)): # 判断如果UUID不重复则添加,否则跳过添加 if (host_list[each][0] != uuid): host_list.append([uuid, address, username, password, port]) load_json["HostList"] = host_list # 最后再次将改好的数据,回写到文件保存 with open(self.database_path, "w", encoding="utf-8") as Write_Pointer: dump_json = json.dumps(load_json) Write_Pointer.write(dump_json) print("[+] UUID {} 添加成功.".format(uuid)) else: print("[-] UUID {} 列表中存在,添加失败.".format(uuid)) return 0 # 根据传入UUID号修改特定主机数据 def ModifyHost(self,uuid,modify_address,modify_username,modify_password,modify_port): with open(self.database_path, "r", encoding="utf-8") as Read_Pointer: # 读取 database.json 中的数据到内存中 load_json = json.loads(Read_Pointer.read()) host_list = load_json.get("HostList") # 根据uuid寻找一维数组的下标位置 index = -1 for index, value in enumerate(host_list): if value[0] == uuid: print("[*] 已找到UUID {} 所在下标为 {}".format(uuid,index)) break else: index = -1 # 判断是否找到了,找到了则修改 if index != -1: # 修改指定下标数值,并将修改后的数据放入原始JSON文件中 host_list[index] = [uuid,modify_address,modify_username,modify_password,modify_port] load_json["HostList"] = host_list # 最后再次将改好的数据,回写到文件保存 with open(self.database_path, "w", encoding="utf-8") as Write_Pointer: dump_json = json.dumps(load_json) Write_Pointer.write(dump_json) print("[+] UUID {} 修改完成.".format(uuid)) return 0 else: print("[-] UUID {} 不存在.".format(uuid)) return 0 # 根据传入的UUID号删除主机数据,先删除所对的组中的数据,然后在删除主机数据 def DeleteHost(self,uuid): with open(self.database_path, "r", encoding="utf-8") as Read_Pointer: load_json = json.loads(Read_Pointer.read()) host_group = load_json.get("HostGroup") # 首先在HostGroup中寻找并弹出相应UUID for each in range(0,len(host_group)): try: # 循环每个字典 for k,v in host_group[each].items(): # 判断UUID是否存在于每个字典中 if v.count(uuid) != 0: # 移除并放入原始JSON中 v.remove(uuid) load_json["HostGroup"] = v # 最后再次将改好的数据,回写到文件保存 with open(self.database_path, "w", encoding="utf-8") as Write_Pointer: dump_json = json.dumps(load_json) Write_Pointer.write(dump_json) #print("[+] UUID {} 修改完成.".format(uuid)) else: #print("[-] 当前组 {} 不能存在: {}".format(k,uuid)) break except Exception: print("[-] 当前组不能存在: {}".format(uuid)) return 0 # ---------------------------------------------- # 其次寻找HostList中的数据并弹出 host_list = load_json.get("HostList") try: # 根据uuid寻找一维数组的下标位置 index = -1 for index, value in enumerate(host_list): if value[0] == uuid: #print("[*] 已找到UUID {} 所在下标为 {}".format(uuid,index)) break else: index = -1 # 如果找到了,则直接根据下标弹出数据 if index != -1: host_list.pop(index) load_json["HostList"] = host_list # 最后再次将改好的数据,回写到文件保存 with open(self.database_path, "w", encoding="utf-8") as Write_Pointer: dump_json = json.dumps(load_json) Write_Pointer.write(dump_json) print("[+] UUID {} 主机: {} 已被移除.".format(uuid,host_list[index][1])) return 0 except Exception: return 0 # 向数据库中的HostGroup字段中添加一个主机组 def AddHostGroup(self,add_group_name): with open(self.database_path, "r", encoding="utf-8") as Read_Pointer: load_json = json.loads( Read_Pointer.read() ) group_obj = load_json.get("HostGroup") # 循环所有字典中的组 for each in range(0, len(group_obj)): list_name = str(list(group_obj[each].keys())[0]) # print("组节点: {}".format(list_name)) # 循环判断表中是否存在指定的组名称 if (list_name == add_group_name): print("[-] {} 存在于组中,无法继续添加.".format(list_name)) return 0 # 读取原始JSON文件,并构建回写参数 tmp = {} tmp[add_group_name] = ["1000"] group_obj.append(tmp) # 回写添加主机组 with open(self.database_path, "w", encoding="utf-8") as Write_Pointer: dump_json = json.dumps(load_json) Write_Pointer.write(dump_json) print("[+] 主机组 {} 已添加".format(add_group_name)) # 在主机组中删除一个主机组 def DeleteHostGroup(self,delete_group_name): with open(self.database_path, "r", encoding="utf-8") as Read_Pointer: load_json = json.loads( Read_Pointer.read() ) group_obj = load_json.get("HostGroup") # 循环所有字典中的组 for each in range(0, len(group_obj)): list_name = str(list(group_obj[each].keys())[0]) # 循环判断表中是否存在指定的组名称 if (list_name == delete_group_name): # 如果存在于组中,那我们就把他的each弹出列表 group_obj.pop(each) # 将弹出后的列表再次写回JSON序列中 with open(self.database_path, "w", encoding="utf-8") as Write_Pointer: dump_json = json.dumps(load_json) Write_Pointer.write(dump_json) print("[-] 主机组 {} 已移除.".format(delete_group_name)) return 0 print("[-] 主机组 {} 不存在.".format(delete_group_name)) return 0 # 向指定主机组中添加一个主机UUID def AddHostGroupOnUUID(self,group_name, uuid): with open(self.database_path, "r", encoding="utf-8") as Read_Pointer: load_json = json.loads( Read_Pointer.read() ) group_obj = load_json.get("HostGroup") # 循环所有字典中的所有组 for each in range(0, len(group_obj)): list_name = str(list(group_obj[each].keys())[0]) # 如果找到了需要添加的组 if (list_name == group_name): tmp = group_obj[each] # 获取到原始列表中的数据 val = list(tmp.values())[0] # 判断输入的UUID号,是否在val列表中,不在则添加 if str(uuid) not in val: val.append(str(uuid)) # 将原始数据赋值到新的表中 group_obj[each][list_name] = val else: print("[-] UUID {} 已存在于 {} 主机组,添加失败.".format(uuid,group_name)) return 0 with open(self.database_path, "w", encoding="utf-8") as Write_Pointer: dump_json = json.dumps(load_json) Write_Pointer.write(dump_json) print("[+] 向主机组 {} 增加UUID {} 完成".format(group_name, uuid)) return 0 # 从指定主机组中删除一个指定的UUID号 def DeleteHostGroupOnUUID(self,group_name, uuid): with open(self.database_path, "r", encoding="utf-8") as Read_Pointer: load_json =json.loads( Read_Pointer.read() ) group_obj = load_json.get("HostGroup") # 循环所有字典中的组 for each in range(0, len(group_obj)): list_name = str(list(group_obj[each].keys())[0]) # 先来寻找到需要删除的主机组 if (list_name == group_name): tmp = group_obj[each] # 寻找指定组中的指定列表 val = list(tmp.values())[0] for x in range(0, len(val)): if (val[x] == uuid): print("[*] 搜索UUID: {} 索引值: {}".format(val[x], x)) # 将要删除的UUID弹出列表 val.pop(x) # 将弹出后的列表重新复制到主机组中 group_obj[each][list_name] = val # 保存弹出后的列表到序列中 with open(self.database_path, "w", encoding="utf-8") as write_fp: dump_json = json.dumps(load_json) write_fp.write(dump_json) print("[+] 从主机组 {} 弹出UUID {} 完成".format(group_name, uuid)) return 0 return 0 # 输出所有主机组和全部节点信息 def ShowAllGroup(self): with open(self.database_path, "r", encoding="utf-8") as Read_Pointer: load_json = json.loads( Read_Pointer.read() ) group_obj = load_json.get("HostGroup") # 循环解析所有组,并解析出UUID所对应的主机地址等信息 for each in range(0, len(group_obj)): for k, v in group_obj[each].items(): print("-" * 140) print("主机组: {}".format(k)) print("-" * 140) for each in range(0, len(v)): # print("--> UUID: {}".format(v[each])) # 循环判断,拿着组内UUID判断是否存在,如果存在则打印出主机详细信息 base_obj = load_json.get("HostList") for x in range(0, len(base_obj)): if (v[each] == base_obj[x][0]): print("UUID: {0:6} \t 主机地址: {1:15} \t 主机账号: {2:15} \t 主机密码: {3:15} \t 端口: {4:5} \t". format(base_obj[x][0], base_obj[x][1], base_obj[x][2], base_obj[x][3], base_obj[x][4])) print("\n") # 只显示所有的主机组,包括组中主机数 def ShowGroup(self): with open(self.database_path, "r", encoding="utf-8") as Read_Pointer: load_json = json.loads( Read_Pointer.read() ) group_obj = load_json.get("HostGroup") print("-" * 80) print("{0:20} \t {1:5} \t {2:100}".format("主机组名","主机数","主机列表")) print("-" * 80) for each in range(0,len(group_obj)): for k,v in group_obj[each].items(): print("{0:20} \t {1:5} \t {2:100}".format(k,str(len(v)), str(v))) print() return 0 # 对指定的主机组进行Ping测试 def PingGroup(self,group_name): with open(self.database_path, "r", encoding="utf-8") as Read_Pointer: load_json = json.loads( Read_Pointer.read() ) group_obj = load_json.get("HostGroup") # 循环遍历主机列表 for each in range(0, len(group_obj)): for k, v in group_obj[each].items(): # 寻找主机组,找到后提取出所有的Value if (k == group_name): item_list = v # 再循环,拿着v的值对base进行循环,提取出其中的用户名密码等 for val in range(0, len(item_list)): # 得到循环列表:print(item_list[val]) base_obj = load_json.get("HostList") # 循环base表中的计数器 for base_count in range(0, len(base_obj)): # 判断base表中是否存在指定UUID,如果存在则输出其值 if (base_obj[base_count][0] == item_list[val]): print("地址: {} \t 用户名: {} [ok]".format(base_obj[base_count][1],base_obj[base_count][2])) if __name__ == "__main__": db = AdminDataBase("database.json") while True: try: cmd = str(input("[LyShell] # ")).split() if (cmd == ""): continue elif (cmd[0] == "exit"): exit(1) elif (cmd[0] == "clear"): os.system("cls") elif(cmd[0] == "Init"): db.InitDatabase() elif(cmd[0] == "ShowHostList"): db.ShowHostList() elif(cmd[0] == "ShowGroup"): db.ShowGroup() elif(cmd[0] == "ShowAllGroup"): db.ShowAllGroup() elif(cmd[0] == "AddHost" or cmd[0] == "ModifyHost"): if (len(cmd) - 1 >= 5): uuid = str(cmd[1]).split("=")[1] address = str(cmd[2]).split("=")[1] username = str(cmd[3]).split("=")[1] password = str(cmd[4]).split("=")[1] port = str(cmd[5]).split("=")[1] if cmd[0] == "AddHost": db.AddHost(uuid,address,username,password,port) elif cmd[0] == "ModifyHost": db.ModifyHost(uuid,address,username,password,port) elif(cmd[0] == "DeleteHost"): if(len(cmd)-1 >= 1): uuid = str(cmd[1]).split("=")[1] db.DeleteHost(uuid) elif(cmd[0] == "AddHostGroup"): group_name = str(cmd[1]).split("=")[1] db.AddHostGroup(group_name) elif(cmd[0] == "DeleteHostGroup"): group_name = str(cmd[1]).split("=")[1] db.DeleteHostGroup(group_name) elif(cmd[0] == "AddHostGroupOnUUID"): group_name = str(cmd[1]).split("=")[1] uuid = str(cmd[2]).split("=")[1] db.AddHostGroupOnUUID(group_name,uuid) elif(cmd[0] == "DelHostGroupOnUUID"): group_name = str(cmd[1]).split("=")[1] uuid = str(cmd[2]).split("=")[1] db.DeleteHostGroupOnUUID(group_name,uuid) elif(cmd[0] == "PingGroup"): group_name = str(cmd[1]).split("=")[1] db.PingGroup(group_name) elif (cmd[0] == "help"): print("By: LyShark") else: print("Not Command") except Exception: continue
使用案例
管理数据库(AdminDataBase
)中的主机信息和主机分组信息。用户通过输入命令来执行不同的操作,如初始化数据库、显示主机列表、添加主机、修改主机信息、删除主机等。
以下是代码的主要功能和命令列表:
- 初始化数据库:
Init
- 显示主机列表:
ShowHostList
- 显示主机分组:
ShowGroup
- 显示所有主机分组:
ShowAllGroup
- 添加主机:
AddHost
- 修改主机信息:
ModifyHost
- 删除主机:
DeleteHost
- 添加主机分组:
AddHostGroup
- 删除主机分组:
DeleteHostGroup
- 将主机添加到指定分组:
AddHostGroupOnUUID
- 从指定分组删除主机:
DelHostGroupOnUUID
- 按组执行 Ping 操作:
PingGroup
- 清屏:
clear
- 退出程序:
exit
- 帮助:
help
Init
初次使用需要执行本命令,对数据库文件进行写出,如下所示;
ShowHostList
用于输出当前主机列表信息,如下图所示;
ShowGroup
用于输出当前主机组,如下图所示;
ShowAllGroup
用于输出所有的主机组以及组内的主机详细信息,如下图所示;
AddHost
添加一个新的主机记录,如下图所示;
ModifyHost
修改一个已有的主机记录,此处以UUID作为修改条件,如下图所示;
DeleteHost
根据一个UUID唯一标识,删除一个已存在的主机记录,如下图所示;
AddHostGroup
新增一个组名,默认会携带1000为初始主机,如下图所示;
DeleteHostGroup
删除一整个主机组,如下图所示;
AddHostGroupOnUUID
根据UUID号将特定主机添加到特定组内,如下图所示;
DelHostGroupOnUUID
根据主机组名,删除特定的UUID,如下图所示;
PingGroup
对特定主机组执行Ping功能测试,此处可以扩展,如下图所示;
总结部分
该案例只是用于学习如何灵活运用JSON实现数据的增删改查,其实在实战中意义不大,因为完全可以使用SQLite这类精简数据库,此案例只是本人为了熟悉JSON的增删改查而写的一个Demo工具。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Apache Doris 在某工商信息商业查询平台的湖仓一体建设实践
本文导读: 信息服务行业可以提供多样化、便捷、高效、安全的信息化服务,为个人及商业决策提供了重要支撑与参考。本文以某工商信息商业查询平台为例,介绍其从传统 Lambda 架构到基于 Doris Multi-Catalog 的湖仓一体架构演进历程。同时通过一系列实践,展示了如何保证数据的准确性和实时性,以及如何高效地处理和分析大规模数据,为信息服务行业提供了有价值的参考思路,有助于推动整个行业的发展和创新。 作者:某工商信息商业查询平台 高级数据研发工程师 李昂 在社会信息化水平不断升高趋势下,人们对信息的依赖程度越来越高,信息服务行业持续发挥着重要的作用。它可以提供多样化、便捷、高效、安全的信息化服务,包括信息咨询、信息传递、信息技术服务、风险预警等,为个人及商业决策提供了重要支撑与参考。 对于行业相关企业来说,虽然数据源的获取并不困难,但如何合理利用这些数据,并转化成更直观、更具参考性的信息,是企业需要持续攻坚的难题。在这样的背景下,数据收集、加工、分析能力的重要性不言而喻。 以某工商信息商业查询平台为例,其面对企业公开信息不断变化的挑战,如注册资本变更、股权结构变更、债务债权转移、...
- 下一篇
「X」Embedding in NLP|Token 和 N-Gram、Bag-of-Words 模型释义
ChatGPT(GPT-3.5)和其他大型语言模型(Pi、Claude、Bard 等)凭何火爆全球?这些语言模型的运作原理是什么?为什么它们在所训练的任务上表现如此出色? 虽然没有人可以给出完整的答案,但了解自然语言处理的一些基本概念有助于我们了解 LLM 内在工作原理。尤其是了解 Token 和 N-gram 对于理解几乎所有当前自回归和自编码模型都十分重要。本文为“「X」Embedding in NLP”的进阶版,将带大家详解 NLP 的核心基础! 01.Token 和 N-gram 在 C/C++ 的入门计算机科学课程中,通常很早就会教授字符串的概念。例如,C 语言中的字符串可以表示为以空字符终止的字符数组: char my_str[128] = "Milvus"; 在这个例子中,每个字符都可以被视为一个离散单位,将它们组合在一起就形成了有意义的文本——在这种情况下,my_str表示了世界上最广泛采用的向量数据库。 简单来说,这就是 N-gram 的定义:一系列字符(或下一段讨论的其他离散单位),当它们连在一起时,具有连贯的意义。在这个实例中,N 对应于字符串中的字符总数(在这个...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS8安装Docker,最新的服务器搭配容器使用
- Linux系统CentOS6、CentOS7手动修改IP地址
- 2048小游戏-低调大师作品
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- CentOS8编译安装MySQL8.0.19
- CentOS6,CentOS7官方镜像安装Oracle11G
- CentOS7,8上快速安装Gitea,搭建Git服务器
- SpringBoot2整合Thymeleaf,官方推荐html解决方案