当我遇见Python——工作记事四则
4月14日的中午,偶然看到51CTO技术博客大赛,有奖品,就想参加一下,于是翻开日志整理出我实际工作中使用Python的四件事。本人在生产制造业担任运维工程师一职,平时的工作内容多而杂,维护业务系统的可用性和稳定性,数据的完整性和可靠性以及整个网络系统的安全性,总结一点就是大量的重复的苦逼运维工作,枯燥乏味,但是强烈的责任心告诉我少做或不做是不行的。于是,在最火的Python流行时,利用“摸鱼”时间稍微学习了一下Python,并使用Python处理了工作中的四件事,算是把学到的应用到实际工作中吧,在此感谢51CTO提供了非常好的学习平台,人生就是不断进步的过程,而51CTO就是助推前进的帮手。
1、利用Python批量修改图片名称。此案例是利用导入os模块进行文件名称的处理,大量的名称格式不统一的图片文件作为输入,经过Python生成的可执行工具进行快速处理后,输出为名称格式统一的图片文件,大量的复杂的重复的工作交给Python,这个得力“助手”处理快速又准确。有此期间帮我处理了大约八千张图片,可以说相当给力。
2、PyQt5实现每日自动备份交换机配置。网络设备配置备份工作对于网络工程而言是又爱又恨,手里有备份,心里就不慌,但是面对数十上百台设备的配置备份,就心里只有恨了。此案例利用Pyqt5实现配置自动备份,突出特点是工具界面可视化。用了最简单的if判断语句实现,此案例需要优化的地方很多但还算是能跑起来,现在我还用的不亦乐乎。
3、PyQt5实现设备连通性测试并实现不通的IP地址发送邮件通知。虽然有五花八门的监控平台来实现异常报警通知,但对于功能性适用性小的低的设备,监控平台并不能很好的监控处理,于是利用Python自定义一些IP地址来监控很方便,也很实用。只是此工具如果24小时开着,会让人半夜睡不好觉(偷笑),半夜收到报警的举手一下。
4、用PyQt5实现Linux系统自动巡检功能之PyQt5实现服务器性能监控。在Linux系统大行其道的今天,Linux运维工作时使用Python简直如鱼得水。此案例介绍利用Python自定义监控项目阀值,和邮件信息,超过阀值时会自动发通知邮件。
下面对每个实际案例进行介绍:
一、利用Python批量修改图片名称
2020年注定是不平凡的一年。疫情打乱了原本的工作计划,2019年底讨论的智慧园区项目,在2020年3月正式上班才被正式启动,项目时间紧迫可想而知,到了3月底,最关重的图片收集上传任务最为棘手,员工提交上来的照片如何按“标准格式”快速上传至服务器呢?
1、员工提交的图片名称带姓名。
2、服务器不识别中文名称。
3、服务器只识别小写jpg格式的图片。
基于以上需求,一个个修改图片名称是不现实的。于是想到了使用Python脚本批量操作,快速修改。
代码如下:
import os
def rename(path, all_files):
filelist = os.listdir(path) # 该文件夹下所有的文件(包括文件夹)
for files in filelist: # 遍历所有文件
Olddir = os.path.join(path, files) # 原来的文件路径
if os.path.isdir(Olddir): # 如果是文件夹则递归
rename(Olddir, all_files)
else:
filename = os.path.splitext(files)[0] # 文件名
#print(filename)
filetype = os.path.splitext(files)[1] # 文件扩展名
#print(filetype)
filetypejpg = '.jpg'
Newdir = os.path.join(path, files[-10:-4] + filetypejpg) # 新的文件路径
os.rename(Olddir, Newdir) # 重命名
print(filename+'已完成')
# 传入空的list接收文件名
contents = rename("D:\\sad", [])
# 循环打印show_files函数返回的文件名列表
print('全部结束')
input()
注:执行脚本时,当看到“全部结束”,才为全部修改完成,否则中间肯定会有没修改的。
温馨提示一下打包命令:(有打包才叫真正的完成)
1、cd命令切换到与.py源文件相同目录。
2、输入:pyinstaller -F Test.py
二、PyQt5实现每日自动备份交换机配置
可以实现每天中午12点自动备份交换机配置文件,把周期性、重复性、规律性的工作都交给Python去做,实现系统维护自动化 。
特点:
1、每日中午12点开始执行备份,备份设备登录地址、密码需要事先定义。
2、备份文件自动在D盘按日期生成文件夹。
3、备份工具同目录下必须有swlist.txt文件(保存着待备份的交换机信息)。
详细如下图:
效果实例图:
代码如下:
# -*- coding: utf-8 -*-
# Form implementation generated from reading ui file 'X0601.ui'
#
# Created by: PyQt5 UI code generator 5.13.0
#
# WARNING! All changes made in this file will be lost!
import sys,os
if hasattr(sys, 'frozen'):
os.environ['PATH'] = sys._MEIPASS + ";" + os.environ['PATH']
from PyQt5.QtCore import QThread, pyqtSignal, QDateTime, QObject
from PyQt5.QtWidgets import QApplication, QDialog, QLineEdit
import time
from PyQt5 import QtCore, QtGui, QtWidgets
import datetime
import telnetlib
import time
import re
class BackendThread0(QObject):
# 通过类成员对象定义信号
update_date0 = pyqtSignal(str)
# 处理业务逻辑
def run0(self):
while True:
data0 = QDateTime.currentDateTime()
currTime = data0.toString("yyyy-MM-dd hh:mm:ss")
self.update_date0.emit(str(currTime))
time.sleep(1)
class BackendThread1(QObject):
# 通过类成员对象定义信号
update_date1 = pyqtSignal(str)
# 处理业务逻辑
def run1(self):
b = 0
ts_sum = ''
while True:
time.sleep(7200)
#######################################################################################################################
#判断是否是每天的12点或13点,需要12点或13点执行备份
time_now = int(time.time()) # unix时间
time_local = time.localtime(time_now) # 转换为win_time
dt = time.strftime("%H:%M:%S", time_local) # 转换成新的时间格式(18:59:20)
a = dt.split(':')
bb = []
for a in dt.split(':'):
bb.append(a)
c = ''.join(bb)
cc=c[0:2]
if cc == '12'or cc =='13':
#print('是12点')
b += 1
######################################################################################################################
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
ts_sum = ts_sum + ts
ts = '开始备份' + '第' + str(b) + '次。'
ts_sum = ts_sum + ts + '\n'
# print(ts_sum)
# 在E盘back文件夹下创建以时间命名的文件夹
now = datetime.datetime.now()
tdate = now.date()
tdate = str(tdate)
backdir = 'E:\\back\\' + tdate
if (os.path.exists(backdir)):
# print("今日文件备份已存在")
ts_sum = ts_sum + '今日文件备份已存在' + '\n'
else:
os.mkdir(backdir)
# 循环读取需要备份的交换机列表
path = 'swlist.txt'
fpath = open(path, 'r')
flen = fpath.readlines()
for i in range(len(flen)):
temp1 = flen[i].split(':')
# 判断字符串是否包含指定字符串
zimushow = "show"
zimudis = "dis"
#######################################################################################################################
if zimushow in temp1[4]:
tnip = temp1[0]
username = temp1[1]
password = temp1[2]
enpassword = temp1[3]
try:
tn = telnetlib.Telnet(tnip, port=23, timeout=5)
tnip = tnip + '成功连接交换机23端口'
ts_sum = ts_sum + str(tnip) + '\n'
# print(ts_sum)
try:
if (username):
# 输用户名
tn.write(username.encode('ascii') + b'\n')
# time.sleep(2)
# 输密码
tn.read_until(b'Password:', timeout=5)
tn.write(password.encode('ascii') + b'\n')
time.sleep(5)
command_result = tn.read_very_eager().decode('ascii')
if 'Login invalid' in command_result:
# print(tn.read_very_eager(), '用户名或密码错误')
ts_sum = ts_sum + tnip + '用户名或密码错误' + '\n'
else:
# print(tnip, '成功登录交换机')
ts_sum = ts_sum + tnip + '成功登录交换机' + '\n'
# 进入enable模式:
# 备份配置
confComplete = ''
cmd = 'show run'
tn.write(cmd.encode('ascii') + b'\n')
time.sleep(5)
msg = tn.read_very_eager()
temsg = str(msg)
msg = temsg[:-11]
confComplete = msg
# 判断配置文件是否加载完毕
moreRge = '--More--'
moreFlag = re.search(moreRge, temsg)
while (moreFlag):
tn.write(' '.encode('ascii'))
time.sleep(2)
temsg = tn.read_very_eager()
temsg = str(temsg)
msg = temsg[:-11]
msg = msg[46:]
confComplete = re.sub(moreRge, '', confComplete)
confComplete = confComplete + msg
moreFlag = re.search(moreRge, temsg)
# print('正在打印配置')
# ts_sum = ts_sum + '正在打印配置' + '\n'
time.sleep(1)
# 退出telnet
tn.write(b"exit\n")
tn.write('exit'.encode('ascii'))
# 对全文被转义的符号处理一下
regr = '\r'
regn = '\n'
reg08 = '\x08'
confComplete = re.sub(r'\\r', regr, confComplete)
confComplete = re.sub(r'\\n', regn, confComplete)
confComplete = re.sub(r'\\x08', regr, confComplete)
# 保存配置
nameReg = r'hostname [\S]*[\r\r\n]'
name = re.search(nameReg, confComplete)
name = str(name)
name = name.split('hostname ')
name = name[1].split('\\')
name = name[0]
print(name + '开始写入文件')
ts_sum = ts_sum + name + '开始写入文件' + '\n'
filename = name + '.txt'
backdirfile = os.path.join(backdir, filename)
file1 = open(backdirfile, 'a+')
file1.write(confComplete)
file1.close()
print('成功保存配置')
ts_sum = ts_sum + name + '成功保存配置' + '\n'
else:
if (password):
# 使用密码登录交换机
tn.read_until(b'Password')
tn.write(password.encode('ascii') + b'\n')
time.sleep(1)
if b'Password' in tn.read_very_eager():
print(tnip, "交换机登录密码错误")
tnip = tnip + "交换机登录密码错误"
ts_sum = ts_sum + str(tnip) + '\n'
# self.update_date1.emit(str(ts_sum))
else:
print(tnip, '成功登录交换机')
tnip = tnip + '成功登录交换机'
ts_sum = ts_sum + str(tnip) + '\n'
# 进入enable模式:
try:
if (enpassword):
tn.write(b'\n')
tn.write(b'enable\n')
tn.read_until(b'Password:')
enPass = enpassword
tn.write(enPass.encode('ascii') + b'\n')
time.sleep(1)
# tn.read_very_eager()
if b'Password' in tn.read_very_eager():
print(tnip, "交换机特权密码错误")
tnip = tnip + "交换机特权密码错误"
ts_sum = ts_sum + str(tnip) + '\n'
else:
print(tnip, '成功进入特权模式')
tnip = tnip + '成功进入特权模式'
ts_sum = ts_sum + str(tnip) + '\n'
# 备份配置
confComplete = ''
cmd = 'show run'
tn.write(cmd.encode('ascii') + b'\n')
time.sleep(5)
msg = tn.read_very_eager()
temsg = str(msg)
msg = temsg[:-11]
confComplete = msg
# print(msg)
# 判断配置文件是否加载完毕
moreRge = '--More--'
moreFlag = re.search(moreRge, temsg)
# print(moreFlag)
while (moreFlag):
tn.write(' '.encode('ascii'))
time.sleep(2)
temsg = tn.read_very_eager()
temsg = str(temsg)
msg = temsg[:-11]
msg = msg[46:]
confComplete = re.sub(moreRge, '', confComplete)
confComplete = confComplete + msg
moreFlag = re.search(moreRge, temsg)
# print('正在打印配置')
# ts_sum = ts_sum + '正在打印配置' + '\n'
time.sleep(1)
# 退出telnet
tn.write(b"exit\n")
tn.write('exit'.encode('ascii'))
# 对全文被转义的符号处理一下
regr = '\r'
regn = '\n'
reg08 = '\x08'
confComplete = re.sub(r'\\r', regr, confComplete)
confComplete = re.sub(r'\\n', regn, confComplete)
confComplete = re.sub(r'\\x08', regr, confComplete)
# 保存配置
nameReg = r'hostname [\S]*[\r\r\n]'
name = re.search(nameReg, confComplete)
name = str(name)
name = name.split('hostname ')
name = name[1].split('\\')
name = name[0]
print(name + '开始写入文件')
ts_sum = ts_sum + name + '开始写入文件' + '\n'
filename = name + '.txt'
backdirfile = os.path.join(backdir, filename)
file1 = open(backdirfile, 'a+')
file1.write(confComplete)
file1.close()
print(name, '成功保存配置')
ts_sum = ts_sum + name + '成功保存配置' + '\n'
except Exception as e:
# print(e)
ts_sum = ts_sum + str(e) + '\n'
except Exception as e:
# print(e)
ts_sum = ts_sum + str(e) + '\n'
# print(ts_sum)
except Exception as e:
# print(e)
ts_sum = ts_sum + tnip + str(e) + '\n'
# print(ts_sum)
self.update_date1.emit(str(ts_sum))
######################################################################################################################
elif zimudis in temp1[4]:
tnip = temp1[0]
username = temp1[1]
password = temp1[2]
enpassword = temp1[3]
try:
tn = telnetlib.Telnet(tnip, port=23, timeout=5)
# print(tnip, '成功连接交换机23端口')
tnip = tnip + '成功连接交换机23端口'
ts_sum = ts_sum + str(tnip) + '\n'
try:
if (username):
# 输用户名
tn.write(username.encode('ascii') + b'\n')
time.sleep(1)
# 密码登录交换机
tn.read_until(b'Password:')
tn.write(password.encode('ascii') + b'\n')
time.sleep(1)
if b'login:' in tn.read_very_eager():
# print(tn.read_very_eager(), '交换机登录密码错误')
ts_sum = ts_sum + '用户名或密码错误' + '\n'
else:
# print(tnip, '成功登录交换机')
ts_sum = ts_sum + tnip + '用户名密码正确' + '\n'
# 进入enable模式:
# 备份配置
confComplete = ''
cmd = 'dis curr'
tn.write(cmd.encode('ascii') + b'\n')
time.sleep(3)
msg = tn.read_very_eager()
temsg = str(msg)
msg = temsg[:-15]
confComplete = msg
# 判断配置文件是否加载完毕
moreRge = '---- More ----'
moreFlag = re.search(moreRge, temsg)
# print(moreFlag)
while (moreFlag):
tn.write(' '.encode('ascii'))
time.sleep(2)
temsg = tn.read_very_eager()
temsg = str(temsg)
msg = temsg[:-15]
msg = msg[23:]
confComplete = re.sub(moreRge, '', confComplete)
confComplete = confComplete + msg
moreFlag = re.search(moreRge, temsg)
# print('正在打印配置')
time.sleep(1)
# 退出telnet
tn.write('quit'.encode('ascii'))
# 对全文被转义的符号处理一下
regr = '\r'
regn = '\n'
reg08 = '\x08'
confComplete = re.sub(r'\\r', regr, confComplete)
confComplete = re.sub(r'\\n', regn, confComplete)
confComplete = re.sub(r'\\x08', regr, confComplete)
# 保存配置
nameReg = r'sysname [\S]*[\r\r\n]'
name = re.search(nameReg, confComplete)
# print(name)
name = str(name)
name = name.split('sysname ')
name = name[1].split('\\')
name = name[0]
name = name[:-1]
# print(name + '开始写入文件')
ts_sum = ts_sum + name + '开始写入文件' + '\n'
filename = name + '.txt'
backdirfile = os.path.join(backdir, filename)
file1 = open(backdirfile, 'a+')
file1.write(confComplete)
# print(confComplete)
file1.close()
# print('成功保存配置文件')
ts_sum = ts_sum + tnip + '成功保存配置文件' + '\n'
else:
# print("there's something wrong!!")
ts_sum = ts_sum + "there's something wrong!!" + '\n'
except Exception as e:
# print(e)
ts_sum = ts_sum + str(e) + '\n'
# print(ts_sum)
except Exception as e:
# print(e)
ts_sum = ts_sum + tnip + str(e) + '\n'
# print(ts_sum)
else:
print('仅支持思科和华三交换机')
ts_sum = ts_sum + '仅支持思科和华三交换机' + '\n'
# print(tdate, ' 全部备份完成')
ts_sum = ts_sum + '全部备份完成' + '\n'
fpath.close()
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
ts_sum = ts_sum + ts
ts = '结束备份' + '第' + str(b) + '次。'
ts_sum = ts_sum + ts + '\n' + '\n'
self.update_date1.emit(str(ts_sum))
#######################################################################################################################
class Window(QDialog):
def __init__(self):
QDialog.__init__(self)
self.setWindowTitle('PyQt 5自动化巡检')
self.resize(800,600)
self.input0=QLineEdit(self)
self.input0.resize(799,50)
self.input1=QtWidgets.QTextEdit(self)
self.input1.resize(799,549)
self.input1.setGeometry(QtCore.QRect(0,49,799,549))
#self.input1.verticalScrollBar().setValue(self.input1.verticalScrollBar().maximum())
self.initUI()
def initUI(self):
# 创建线程
self.backend = BackendThread0()
self.backend1 = BackendThread1()
# 连接信号
self.backend.update_date0.connect(self.handleDisplay0)
self.backend1.update_date1.connect(self.handleDisplay1)
self.thread = QThread()
self.thread1 = QThread()
self.backend.moveToThread(self.thread)
self.backend1.moveToThread(self.thread1)
# 开始线程
self.thread.started.connect(self.backend.run0)
self.thread1.started.connect(self.backend1.run1)
self.thread.start()
self.thread1.start()
# 将当前时间输出到文本框
def handleDisplay0(self, data0):
self.input0.setText(data0)
def handleDisplay1(self, data1):
self.input1.setText(data1)
self.input1.verticalScrollBar().setValue(self.input1.verticalScrollBar().maximum())
if __name__ == '__main__':
app = QApplication(sys.argv)
win = Window()
win.show()
sys.exit(app.exec_())
温馨提示一下打包命令:(有打包才叫真正的完成)
1、cd命令切换到与.py源文件相同目录。
2、输入:pyinstaller -F Test.py
三、PyQt5实现设备连通性测试—不通的IP地址发送邮件通知
特点:1、自动化运维,不通的IP地址邮件通知。
2、IP地址、邮件通知地址为自定义方式。
3、工具相同目录下需要建立doip.txt和youjian.txt文件。
详细如下图:
效果实例图:
代码如下:
# -*- coding: utf-8 -*-
# Form implementation generated from reading ui file 'X0601.ui'
#
# Created by: PyQt5 UI code generator 5.13.0
#
# WARNING! All changes made in this file will be lost!
import sys,os
if hasattr(sys, 'frozen'):
os.environ['PATH'] = sys._MEIPASS + ";" + os.environ['PATH']
from PyQt5.QtCore import QThread, pyqtSignal, QDateTime, QObject
from PyQt5.QtWidgets import QApplication, QDialog, QLineEdit
import time
from PyQt5 import QtCore, QtGui, QtWidgets
import datetime
import re
from email.header import Header
from email.mime.text import MIMEText
from email.utils import parseaddr, formataddr
import smtplib
class BackendThread0(QObject):
# 通过类成员对象定义信号
update_date0 = pyqtSignal(str)
# 处理业务逻辑
def run0(self):
while True:
data0 = QDateTime.currentDateTime()
currTime = data0.toString("yyyy-MM-dd hh:mm:ss")
self.update_date0.emit(str(currTime))
time.sleep(1)
class BackendThread1(QObject):
# 通过类成员对象定义信号
update_date1 = pyqtSignal(str)
# 处理业务逻辑
def run1(self):
b = 0
ts_sum = ''
while True:
b += 1
time.sleep(5)
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
ts_sum = ts_sum + ts
ts = '开始测试'+'第'+str(b)+'次。'
ts_sum = ts_sum + ts + '\n'
#print(ts, '开始测试', '第%d次。' % (b))
pathip = open('doip.txt', 'r') # 以中读方式打开目标文件
ipline = pathip.readlines() # 读取文件中所有内容到列表ipline中
# print(ipline)
for i in range(len(ipline)): # 循环次数为IP地址个数
ip = ipline[i].split() # 赋值列表文件
ip = str(ip) # 转换成字符串
# print(ip)
ip = ip[2:-2] # 截取字符串
# print(ip,type(ip))
jieguo = ''
mingling = 'ping ' + ip + ' -n 4' # 形成ping命令
ping = os.popen(mingling).read() # 执行1次ping命令
if re.compile(r'来自..+', re.M).findall(ping): # 判断是否通
jieguo = '通'
else:
jieguo = '不通'
ipline[i] = ip + jieguo
#print(ipline)
ts_sum = ts_sum + str(ipline) +'\n'
a = []
for j in range(len(ipline)):
if ipline[j].find('不') > 1:
a.append(ipline[j])
if a != []:
# sendmail(a)
file = open(r'youjian.txt', 'r')
lenfile = file.readlines()
from_addr = lenfile[0].strip()
password = lenfile[1].strip()
to_addr = lenfile[2].strip()
smtp_server = lenfile[3].strip()
file.close()
try:
def _format_addr(s):
name, addr = parseaddr(s)
return formataddr((Header(name, 'utf-8').encode(), addr))
msg = MIMEText('IP地址为'+str(a)+'设备报警', 'plain', 'utf-8')
msg['From'] = _format_addr('管理员 <%s>' % from_addr)
msg['To'] = _format_addr('管理员 <%s>' % to_addr)
msg['Subject'] = Header('报警提醒', 'utf-8').encode()
server = smtplib.SMTP(smtp_server, 25)
server.login(from_addr, password)
#print('发送成功')
server.sendmail(from_addr, [to_addr], msg.as_string())
server.quit()
except Exception as e:
print(e)
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
ts_sum = ts_sum + ts
#print(ts, '结束测试', '第%d次。' % (b))
ts = '结束测试' + '第' + str(b) + '次。'
ts_sum = ts_sum + ts + '\n' + '\n'
pathip.close()
self.update_date1.emit(str(ts_sum))
class Window(QDialog):
def __init__(self):
QDialog.__init__(self)
self.setWindowTitle('PyQt 5自动化巡检')
self.resize(800,600)
self.input0=QLineEdit(self)
self.input0.resize(799,50)
self.input1=QtWidgets.QTextEdit(self)
self.input1.resize(799,549)
self.input1.setGeometry(QtCore.QRect(0,49,799,549))
#self.input1.verticalScrollBar().setValue(self.input1.verticalScrollBar().maximum())
self.initUI()
def initUI(self):
# 创建线程
self.backend = BackendThread0()
self.backend1 = BackendThread1()
# 连接信号
self.backend.update_date0.connect(self.handleDisplay0)
self.backend1.update_date1.connect(self.handleDisplay1)
self.thread = QThread()
self.thread1 = QThread()
self.backend.moveToThread(self.thread)
self.backend1.moveToThread(self.thread1)
# 开始线程
self.thread.started.connect(self.backend.run0)
self.thread1.started.connect(self.backend1.run1)
self.thread.start()
self.thread1.start()
# 将当前时间输出到文本框
def handleDisplay0(self, data0):
self.input0.setText(data0)
def handleDisplay1(self, data1):
self.input1.setText(data1)
self.input1.verticalScrollBar().setValue(self.input1.verticalScrollBar().maximum())
if __name__ == '__main__':
app = QApplication(sys.argv)
win = Window()
win.show()
sys.exit(app.exec_())
温馨提示一下打包命令:(有打包才叫真正的完成)
1、cd命令切换到与.py源文件相同目录。
2、输入:pyinstaller -F Test.py
四、用PyQt实现linux系统自动巡检功能
重点来了。
Linux系统在企业中的部署应用日渐重要,那么怎么用python自动巡检linux系统,做到超过阀值时报警呢?
实现条件:
1、在可执行文件.exe的同目录下建立server_list.txt(待巡检的服务器ssh登录信息)、fazhi.txt(自定义参数阀值)、youjian.txt(超过阀值后发送邮件的信息)
2、server_list.txt的内容信息必须用空格分开,fazhi.txt的信息为第行一个(分别为:IP地址、CPU利用率、内存利用率、网速Mb/s)
效果实例图:
代码如下:
# -*- coding: utf-8 -*-
# Form implementation generated from reading ui file 'X0601.ui'
#
# Created by: PyQt5 UI code generator 5.13.0
#
# WARNING! All changes made in this file will be lost!
import sys,os
if hasattr(sys, 'frozen'):
os.environ['PATH'] = sys._MEIPASS + ";" + os.environ['PATH']
from PyQt5.QtCore import QThread, pyqtSignal, QDateTime, QObject
from PyQt5.QtWidgets import QApplication, QDialog, QLineEdit
import time
from PyQt5 import QtCore, QtGui, QtWidgets
import datetime
import paramiko
import re
import psutil
from email.header import Header
from email.mime.text import MIMEText
from email.utils import parseaddr, formataddr
import smtplib
class BackendThread0(QObject):
# 通过类成员对象定义信号
update_date0 = pyqtSignal(str)
# 处理业务逻辑
def run0(self):
while True:
data0 = QDateTime.currentDateTime()
currTime = data0.toString("yyyy-MM-dd hh:mm:ss")
self.update_date0.emit(str(currTime))
time.sleep(1)
class BackendThread1(QObject):
# 通过类成员对象定义信号
update_date1 = pyqtSignal(str)
# 处理业务逻辑
def run1(self):
b = 0
ts_sum = ''
filefazhi = open(r'fazhi.txt', 'r')
lenfile = filefazhi.readlines()
fazhi_ip = lenfile[0].strip()
fazhi_cpu = lenfile[1].strip()
fazhi_mem = lenfile[2].strip()
fazhi_net = lenfile[3].strip()
filefazhi.close()
while True:
b += 1
time.sleep(5)
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
ts_sum = ts_sum + ts+'开始' +'\n'
# 读取服务器列表文件
serverlist = open(r"server_list.txt", "r")
for line in serverlist:
# 取IP、用户名、密码
ip = line.split()[0]
username = line.split()[1]
password = line.split()[2]
try:
# 创建一个SSH客户端client对象
sshClient = paramiko.SSHClient()
# 获取客户端host_keys,默认~/.ssh/known_hosts,非默认路径需指定
sshClient.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 创建SSH连接
sshClient.connect(hostname=ip, username=username, password=password, port=22)
###################################################################################################
mem_command = 'free'
stdin, stdout, stderr = sshClient.exec_command(mem_command)
ts_mem = ''
for std in stdout.readlines():
#print(std,)
ts_mem = ts_mem + str(std,)
ts_sum=ts_sum+ts_mem
stdin, stdout, stderr = sshClient.exec_command(mem_command)
sshRes = stdout.readlines()
mem_values = re.findall("(\d+)\ ", ",".join(sshRes))
MemTotal = mem_values[0]
MemUsed = mem_values[1]
Rate_Mem = 100 * float(MemUsed) / float(MemTotal)
memper=int(Rate_Mem)
Rate_Mem = "%.2f" % Rate_Mem
Rate_Mem = str(Rate_Mem) + '%'
#print(Rate_Mem)
ts_sum = ts_sum +'内存利用率:'+Rate_Mem+ '\n\n'
if memper > int(fazhi_mem):
# sendmail(a)
file = open(r'youjian.txt', 'r')
lenfile = file.readlines()
from_addr = lenfile[0].strip()
password = lenfile[1].strip()
to_addr = lenfile[2].strip()
smtp_server = lenfile[3].strip()
file.close()
try:
def _format_addr(s):
name, addr = parseaddr(s)
return formataddr((Header(name, 'utf-8').encode(), addr))
msg = MIMEText('服务器' + str(fazhi_ip) + '内存报警' + str(memper), 'plain', 'utf-8')
msg['From'] = _format_addr('管理员 <%s>' % from_addr)
msg['To'] = _format_addr('管理员 <%s>' % to_addr)
msg['Subject'] = Header('报警提醒', 'utf-8').encode()
server = smtplib.SMTP(smtp_server, 25)
server.login(from_addr, password)
# print('发送成功')
server.sendmail(from_addr, [to_addr], msg.as_string())
server.quit()
except Exception as e:
# print(e)
ts_sum = ts_sum + str(e)
###################################################################################################
uptime_command = 'uptime'
stdin, stdout, stderr = sshClient.exec_command(uptime_command)
ts_uptime = ''
for std in stdout.readlines():
#print(std, )
ts_uptime = ts_uptime + str(std, )
stdin, stdout, stderr = sshClient.exec_command(uptime_command)
sshRes_uptime = stdout.readlines()
uptime_values = re.findall(r'-?\d+\.?\d*e?-?\d*?', ",".join(sshRes_uptime))
uptime_values0 = uptime_values[-3::]
sshRes_uptime1 = uptime_values0[0]
sshRes_uptime5 = uptime_values0[1]
sshRes_uptime15 = uptime_values0[2]
if float(sshRes_uptime1)>5 or float(sshRes_uptime5)>5 or float(sshRes_uptime15)>5 :
# sendmail(a)
file = open(r'youjian.txt', 'r')
lenfile = file.readlines()
from_addr = lenfile[0].strip()
password = lenfile[1].strip()
to_addr = lenfile[2].strip()
smtp_server = lenfile[3].strip()
file.close()
try:
def _format_addr(s):
name, addr = parseaddr(s)
return formataddr((Header(name, 'utf-8').encode(), addr))
msg = MIMEText('服务器' + str(fazhi_ip) + '系统平均负荷太大' + str(sshRes_uptime15), 'plain', 'utf-8')
msg['From'] = _format_addr('管理员 <%s>' % from_addr)
msg['To'] = _format_addr('管理员 <%s>' % to_addr)
msg['Subject'] = Header('报警提醒', 'utf-8').encode()
server = smtplib.SMTP(smtp_server, 25)
server.login(from_addr, password)
# print('发送成功')
server.sendmail(from_addr, [to_addr], msg.as_string())
server.quit()
except Exception as e:
# print(e)
ts_sum = ts_sum + str(e)
###################################################################################################
cpu_command = 'cat /proc/stat |grep -w cpu'
stdin, stdout, stderr = sshClient.exec_command(cpu_command)
ts_cpu = ''
for std in stdout.readlines():
# print(std, )
ts_cpu = ts_cpu + str(std, )
ts_sum = ts_sum + ts_cpu
stdin, stdout, stderr = sshClient.exec_command(cpu_command)
cpu_values_temp0 = stdout.readlines()
# print(cpu_values_temp0)
cpu_values0 = re.findall(r'-?\d+\.?\d*e?-?\d*?', ",".join(cpu_values_temp0))
#print(cpu_values0)
cpu_user0 = cpu_values0[0]
cpu_nice0 = cpu_values0[1]
cpu_system0 = cpu_values0[2]
cpu_idle0 = cpu_values0[3]
cpu_iowait0 = cpu_values0[4]
cpu_irq0 = cpu_values0[5]
cpu_softirq0 = cpu_values0[6]
cpu_total0 = int(cpu_user0) + int(cpu_nice0) + int(cpu_system0) + int(cpu_idle0) + int(
cpu_iowait0) + int(cpu_irq0) + int(cpu_softirq0)
# print(cpu_total0,type(cpu_total0))
time.sleep(1)
stdin, stdout, stderr = sshClient.exec_command(cpu_command)
cpu_values_temp1 = stdout.readlines()
# print(cpu_values_temp1)
cpu_values1 = re.findall(r'-?\d+\.?\d*e?-?\d*?', ",".join(cpu_values_temp1))
#print(cpu_values1)
cpu_user1 = cpu_values1[0]
cpu_nice1 = cpu_values1[1]
cpu_system1 = cpu_values1[2]
cpu_idle1 = cpu_values1[3]
cpu_iowait1 = cpu_values1[4]
cpu_irq1 = cpu_values1[5]
cpu_softirq1 = cpu_values1[6]
cpu_total1 = int(cpu_user1) + int(cpu_nice1) + int(cpu_system1) + int(cpu_idle1) + int(
cpu_iowait1) + int(cpu_irq1) + int(cpu_softirq1)
cpu_total = cpu_total1 - cpu_total0
cpu_user = int(cpu_user1) - int(cpu_user0)
cpu_nice = int(cpu_nice1) - int(cpu_nice0)
cpu_system = int(cpu_system1) - int(cpu_system0)
cpu_usage = (cpu_user + cpu_nice + cpu_system) / cpu_total * 100
# print(cpu_total1,type(cpu_total1))
#print(cpu_usage)
# 除去cpu空闲时间外的其他所有cpu使用率,也可粗略计算为cpu利用率
cpu_idel = int(cpu_idle1) - int(cpu_idle0)
cpu_usage = (1 - cpu_idel / cpu_total) * 100
cpu_per = int(cpu_usage)
cpu_usage = "%.2f" % cpu_usage
cpu_usage = str(cpu_usage) + '%'
#print(cpu_usage)
ts_sum = ts_sum + 'CPU利用率:' + cpu_usage + '\n\n '
if cpu_per>int(fazhi_cpu):
# sendmail(a)
file = open(r'youjian.txt', 'r')
lenfile = file.readlines()
from_addr = lenfile[0].strip()
password = lenfile[1].strip()
to_addr = lenfile[2].strip()
smtp_server = lenfile[3].strip()
file.close()
try:
def _format_addr(s):
name, addr = parseaddr(s)
return formataddr((Header(name, 'utf-8').encode(), addr))
msg = MIMEText('服务器' + str(fazhi_ip) + 'CPU报警' + str(cpu_usage), 'plain', 'utf-8')
msg['From'] = _format_addr('管理员 <%s>' % from_addr)
msg['To'] = _format_addr('管理员 <%s>' % to_addr)
msg['Subject'] = Header('报警提醒', 'utf-8').encode()
server = smtplib.SMTP(smtp_server, 25)
server.login(from_addr, password)
# print('发送成功')
server.sendmail(from_addr, [to_addr], msg.as_string())
server.quit()
except Exception as e:
# print(e)
ts_sum = ts_sum + str(e)
###################################################################################################
net_command = 'cat /proc/net/dev'
stdin, stdout, stderr = sshClient.exec_command(net_command)
ts_net = ''
for std in stdout.readlines():
# print(std, )
ts_net = ts_net + str(std, )
ts_sum = ts_sum+ts_net
stdin, stdout, stderr = sshClient.exec_command(net_command)
net_values_temp0 = stdout.readlines()
result0 = {}
for line in net_values_temp0:
# 过滤前两行
if line.startswith('Inter-') or line.startswith(' face '):
continue
# print(line)
# 获取网卡名称与数据
head, data = line.split(':')
listd = data.split()
lens = len(listd)
# 获取收发字节数
rdata, sdata = listd[0], listd[int(lens / 2)]
vals = {}
vals['recv'] = rdata
vals['send'] = sdata
result0[head] = vals
# print(result0)
sum_speed0 = []
for i in result0.keys():
sum0 = 0
for j in result0[i].keys():
# print(j,d[i][j])
sum0 = sum0 + int(result0[i][j])
# print(i, sum0)
sum_speed0.append(sum0)
#print(sum_speed0)
time.sleep(1)
stdin, stdout, stderr = sshClient.exec_command(net_command)
net_values_temp1 = stdout.readlines()
result1 = {}
for line in net_values_temp1:
# 过滤前两行
if line.startswith('Inter-') or line.startswith(' face '):
continue
# print(line)
# 获取网卡名称与数据
head, data = line.split(':')
listd = data.split()
lens = len(listd)
# 获取收发字节数
rdata, sdata = listd[0], listd[int(lens / 2)]
vals = {}
vals['recv'] = rdata
vals['send'] = sdata
result1[head] = vals
# print(result1)
sum_speed1 = []
for i in result1.keys():
sum1 = 0
for j in result1[i].keys():
# print(j,d[i][j])
sum1 = sum1 + int(result1[i][j])
sum_speed1.append(sum1)
# print(i, sum1)
#print(sum_speed1)
sum_temp0 = 0
for i in result1.keys():
sum_speed = int(sum_speed1[sum_temp0] - sum_speed0[sum_temp0]) / 1024
net_speed=sum_speed
sum_speed = "%.2f" % sum_speed
sum_speed = str(sum_speed) + 'KB/s'
#print(i, ':', sum_speed, 'KB/s')
sum_speed=str(i)+str(':')+str(sum_speed)
ts_sum = ts_sum+sum_speed + '\n'
sum_temp0 += 1
ts_sum = ts_sum+'\n'
if float(net_speed/1024)>int(fazhi_net) :
# sendmail(a)
file = open(r'youjian.txt', 'r')
lenfile = file.readlines()
from_addr = lenfile[0].strip()
password = lenfile[1].strip()
to_addr = lenfile[2].strip()
smtp_server = lenfile[3].strip()
file.close()
try:
def _format_addr(s):
name, addr = parseaddr(s)
return formataddr((Header(name, 'utf-8').encode(), addr))
msg = MIMEText('服务器' + str(fazhi_ip) + '网速超过阀值' + str(net_speed), 'plain', 'utf-8')
msg['From'] = _format_addr('管理员 <%s>' % from_addr)
msg['To'] = _format_addr('管理员 <%s>' % to_addr)
msg['Subject'] = Header('报警提醒', 'utf-8').encode()
server = smtplib.SMTP(smtp_server, 25)
server.login(from_addr, password)
# print('发送成功')
server.sendmail(from_addr, [to_addr], msg.as_string())
server.quit()
except Exception as e:
# print(e)
ts_sum = ts_sum + str(e)
sshClient.close()
except Exception as e:
#print("SSH链接失败:[hostname:%s];[username:%s];[error:%s]" % (ssh_ip, ssh_name, e))
exit()
ts_sum=ts_sum+("SSH链接失败:[hostname:%s];[username:%s];[error:%s]" % (ip, username, e))
serverlist.close()
time.sleep(3000)
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
ts_sum = ts_sum + ts + '结束' + '\n\n\n\n'
self.update_date1.emit(str(ts_sum))
class Window(QDialog):
def __init__(self):
QDialog.__init__(self)
self.setWindowTitle('PyQt5自动化巡检-linux')
self.resize(800,600)
self.input0=QLineEdit(self)
self.input0.resize(799,50)
self.input1=QtWidgets.QTextEdit(self)
self.input1.resize(799,549)
self.input1.setGeometry(QtCore.QRect(0,49,799,549))
#self.input1.verticalScrollBar().setValue(self.input1.verticalScrollBar().maximum())
self.initUI()
def initUI(self):
# 创建线程
self.backend = BackendThread0()
self.backend1 = BackendThread1()
# 连接信号
self.backend.update_date0.connect(self.handleDisplay0)
self.backend1.update_date1.connect(self.handleDisplay1)
self.thread = QThread()
self.thread1 = QThread()
self.backend.moveToThread(self.thread)
self.backend1.moveToThread(self.thread1)
# 开始线程
self.thread.started.connect(self.backend.run0)
self.thread1.started.connect(self.backend1.run1)
self.thread.start()
self.thread1.start()
# 将当前时间输出到文本框
def handleDisplay0(self, data0):
self.input0.setText(data0)
def handleDisplay1(self, data1):
self.input1.setText(data1)
self.input1.verticalScrollBar().setValue(self.input1.verticalScrollBar().maximum())
if __name__ == '__main__':
app = QApplication(sys.argv)
win = Window()
win.show()
sys.exit(app.exec_())
温馨提示一下打包命令:(有打包才叫真正的完成)
1、cd命令切换到与.py源文件相同目录。
2、输入:pyinstaller -F Test.py
工作是永远做不完,“钱”进的道路没有尽头,Python工具是“高铁”,带我快速前进。相信在工作中,我的Python应用会越来越多,工具会越来越多好。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
全网最全最简单使用easypoi导入导出Excel的操作手册
@[TOC] 概况 今天做Excel导出时,发现了一款非常好用的POI框架EasyPoi,其 使用起来简洁明了。现在我们就来介绍下EasyPoi,首先感谢EasyPoi 的开发者 Lemur开源 easypoi 简介 easypoi 是为了让开发者快速的实现excel,word,pdf的导入导出,基于Apache poi基础上的一个工具包。 特性 基于注解的导入导出,修改注解就可以修改Excel 支持常用的样式自定义 基于map可以灵活定义的表头字段 支持一对多的导出,导入 支持模板的导出,一些常见的标签,自定义标签 支持HTML/Excel转换 支持word的导出,支持图片,Excel 常用注解 @Excel注解 @Excel 注解是作用到Filed 上面,是对Excel一列的一个描述,这个注解是必须要的注解,其部分属性如下: 其使用如下,其中orderNum是指定该字段在Excel中的位置,name与Excel中对应的表头单元格的名称 @Excel(name = "主讲老", orderNum = "1") private String name; @ExcelCollection...
- 下一篇
Java实现多附件的邮件发送
叙:本文主要记述了 Springboot 中如何集成并实现多附件的邮件发送,以QQ邮箱的服务器为发送邮件的邮件服务器; Java实现多附件的邮件发送 1、前期准备 1.1、获取收发服务器地址及其端口号 1.2、 授权码的获取 2、代码 2.1、pom文件 2.1、Controller层 Utils工具类 3、 测试接口 4、上传文件管控 1、前期准备 主要需要什么呢? 第一,你要知道你选择的作为发送邮件的邮箱账号所在服务器所提供的发送邮件服务器(一般都会有一个发送邮件服务器 [SMTP] 和接收邮件服务器 [POP3]); 第二,发送邮件服务器的端口号; 第三,你要针对发送邮件的邮箱设定 授权码,这个授权码就是第三方使用邮箱进行一系列操作时所需要登陆的密码; 1.1、获取收发服务器地址及其端口号 收发邮件服务器和端口号的整理都在此篇《各邮箱服务商的接收、发送邮件服务器地址、端口号》文章中,有兴趣的大家可以看看尝试一下不同的邮箱发送短信,想要学习一下自己去找这些的可以继续往下看; 第一步、第二步的获悉信息位置如下查找(QQ邮箱,其他的邮箱也差不多,有需要的可以去翻一下): 往下翻,找到下...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS6,CentOS7官方镜像安装Oracle11G
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- CentOS关闭SELinux安全模块
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- Hadoop3单机部署,实现最简伪集群
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长