python调用阿里云产品接口
因为有些人使用爬虫访问web,浪费服务器资源不说还会影响正常用户,所以需要限制爬虫ip,本可以通过nginx或者防火墙限制,但需要重新编译nginx,开启防火墙怕又不知道会影响到哪些程序,所以
想用/etc/hosts.deny文件限制某些ip访问web的,于是配置了http:124.90.. 发现根本没用,但我之前使用该文件是可以限制ip ssh的,为什么现在不能限制http了呢,
于是上网搜了下,发现如下内容:
hosts.allow和hosts.deny规则的执行者为TCP wrappers,对应守护进程为tcpd;而tcpd执行依赖于程序使用了libwrap库。
也就是说:hosts.allow和hosts.deny支持且只支持使用了libwrap库的服务。
那如何查看程序是否使用libwarp?有俩种方法:
方法一、查看hosts_access字段串
查看应用程序是否支持 wrapper,可以使用 strings 程序然后 grep 字符串 hosts_access:
方法二、使用ldd,
ldd /usr/sbin/sshd | grep libwrap
查测发现使用xinetd的都可以、sshd可以、vsftpd可以,nginx httpd则不可以。
继续寻找方法时发现可以调用负载均衡相关api结果,于是乎采用了此方法
'''
先安装模块,主要有两个,一个是核心模块,必须安装 pip install aliyun-python-sdk-core
另一个是你要修改哪个产品,本次是更改负载均衡,所以安装 pip install aliyun-python-sdk-slb ,其他产品参考 https://developer.aliyun.com/tools/sdk#/python
功能:每隔10分钟分析各个服务器日志,查看是否有异常IP调用短信接口(单个IP一分钟内调用短信接口10次视为异常IP),如有则把IP加入黑名单,2个小时后解封
'''
!/usr/bin/python
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkslb.request.v20140515.AddAccessControlListEntryRequest import AddAccessControlListEntryRequest
from aliyunsdkslb.request.v20140515.RemoveAccessControlListEntryRequest import RemoveAccessControlListEntryRequest
import smtplib
from email.mime.text import MIMEText
from email.header import Header
import paramiko,re,logging,os
import time,datetime,json
过滤日志,最近一分钟调用短信接口超过10次的ip
def check_log(host,file):
# 创建SSH对象 ssh = paramiko.SSHClient() # 允许连接不在know_hosts文件中的主机 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 连接服务器 try: ssh.connect(host, port=22, username='root', password='******$%^') # 执行命令, 过滤日志,最近一分钟调用短信接口超过10次的ip one_min_time = (datetime.datetime.now()-datetime.timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M") #一分钟前时间 shell = "sed -n '/%s/,$p' %s | grep regip | awk '{print $5}' | cut -d : -f2 | sort | uniq -c | sort -r | head -5 | awk '{if ($1 >9)print $2}'" % (one_min_time,file) #print(shell) stdin, stdout, stderr = ssh.exec_command(shell) # 获取命令结果 result = stdout.read().decode(encoding = "utf-8") deny_host_re = re.compile(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}') #待拒绝ip的正则表达式 deny_host_list = deny_host_re.findall(result) if deny_host_list: for deny_host in deny_host_list: now_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M") shell = 'cat %s | grep "%s" | wc -l' % (deny_file,deny_host) if int(os.popen(shell).read()) == 0: #判断当前IP是否已存在黑名单,避免重复操作 f = open(deny_file, "a", encoding='utf-8') str_info = now_time + " " + deny_host + " " + "服务器地址:" + host + "\n" #文本格式:封禁时间 待封IP 待封IP所在服务器地址 logging.info("str_info: %s" % str_info) f.write(str_info) f.close() slb_add_host(deny_host,accessKeyId,accessSecret,acl_id) #执行添加黑名单动作 else: logging.info("IP:%s 已存在黑名单中" % deny_host) else: logging.info("主机:%s 本次检测无异常~" % host) except Exception as e: logging.error("主机:%s 连接失败,原因:%s" % ( host,e ) ) # 关闭连接 ssh.close()
ip加入黑名单时发送邮件提醒
这种发邮件是使用服务器上邮件服务器,需要在服务器/etc/mail.rc里进行相应配置
def send_mail(host,str):
try:
shell = '''
echo -e "短信接口异常调用IP:%sn状态:%s"|mail -s "短信接口异常调用" zhangpan@tan66.com
''' % (host,str)
os.system(shell)
logging.info("封禁通知邮件发送成功")
except smtplib.SMTPException as e:
logging.error("发送邮件失败,原因:%s" % e)
这种更灵活方便,无需在服务器上配置
def send_mail(host,str):
# 第三方 SMTP 服务 mail_host = "smtp.exmail.qq.com" # 设置服务器 mail_user = "********@tan66.com" # 用户名 mail_pass = "********" # 密码 sender = '******@tan66.com' receivers = ['*******@tan66.com'] # 接收邮件,可设置为你的QQ邮箱或者其他邮箱 message = MIMEText('短信接口异常调用IP:%s\n状态:%s!'% (host,str), 'plain', 'utf-8') message['From'] = Header("短信接口", 'utf-8') message['To'] = Header("研发组", 'utf-8') subject = '短信接口异常调用' message['Subject'] = Header(subject, 'utf-8') try: smtpObj = smtplib.SMTP_SSL(mail_host,465) # 465 为腾讯企业邮箱SMTP SSL端口号 smtpObj.login(mail_user, mail_pass) smtpObj.sendmail(sender, receivers, message.as_string()) logging.info("邮件发送成功") except smtplib.SMTPException as e: logging.error("发送邮件失败,原因:%s" % e)
阿里云slb访问控制里添加ip
def slb_add_host(host,accessKeyId,accessSecret,acl_id):
client = AcsClient(accessKeyId, accessSecret, 'cn-hangzhou') # 阿里云账号的key request = AddAccessControlListEntryRequest() # 本次api调用的接口,接口有很多,可参考https://help.aliyun.com/document_detail/27570.html?spm=a2c4g.11186623.6.622.3b735682J2ldXT request.set_accept_format('json') try: host = host + "/32" request.set_AclEntrys([{"entry": host, "comment": "deny"}]) # 需要拒绝的ip request.set_AclId(acl_id) # 负载均衡安全组id response = client.do_action_with_exception(request) # 执行操作 logging.info(str(response, encoding='utf-8')) # 查看调用接口结果 logging.info("slb添加异常IP:%s成功!" % host) send_mail(host, str="添加至黑名单") except BaseException as e: logging.error("添加黑名单失败,原因:%s" % e)
slb删除ip
def slb_del_host(host,accessKeyId,accessSecret,acl_id):
host = str(host) + "/32" logging.info("正在解封IP:%s" % host) try: client = AcsClient(accessKeyId, accessSecret, 'cn-hangzhou') request = RemoveAccessControlListEntryRequest() request.set_accept_format('json') request.set_AclEntrys([{"entry": host, "comment": "deny"}]) request.set_AclId(acl_id) client.do_action_with_exception(request) logging.info("slb删除IP:%s成功" % host) # 查看调用接口结果 send_mail(host, str="移出黑名单") except BaseException as e: logging.error("移出黑名单失败,原因:%s" % e)
删除slb里黑名单ip
def del_text(deny_file):
with open(deny_file, "r", encoding='utf-8') as f: data = f.read() logging.info("黑名单文件:%s" % deny_file) if data: d_list = [] # 解封ip清单 with open(deny_file, "r", encoding='utf-8') as f: lines = f.readlines() # logging.info("lines:%s" % lines) for line in lines: if line.strip().lstrip(): time1 = line.split(" ")[0] # 获取文本中被封ip的时间 time2 = time.mktime(time.strptime(time1, '%Y-%m-%d %H:%M')) # 将时间转换给时间戳 time3 = int(now_time - time2) # 解封倒计时 host = line.split(" ")[1].strip().lstrip() if time3 > 7200: slb_del_host(host, accessKeyId, accessSecret, acl_id) d_list.append(host) else: logging.info("%-15s 预计%s 秒解封" % (host, 7200 - time3)) for deny in d_list: shell = "sed -i '/%s/d' %s" % (deny, deny_file) os.system(shell) logging.info("文本中删除黑名单ip:%s" % deny)
if name == "__main__":
accessKeyId = '*********' #key 需要登录阿里云账号获取 accessSecret = '********' acl_id = 'acl-**********' #负载均衡安全组id log_file = '/var/log/msg-api.log' BASE_DIR = os.path.dirname(os.path.abspath(__file__)) deny_file = BASE_DIR + "/deny.hosts" if not os.path.exists(log_file): os.mknod(log_file) if not os.path.exists(deny_file): os.mknod(deny_file) now_time = time.time() logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s', filename=log_file, filemode='a+') logging.info("starting") host_dict = {"10.253.168.*":"api", "10.253.208.*":"api", "10.253.210.*":"cs", "10.253.210.*":"cs", } api_log_file = '/data1/application/api/tomcat/logs/catalina.out' cs_log_file = '/data1/application/cs/tomcat-1/logs/catalina.out' oss_log_file = '/data1/application/server/tomcat-2/logs/catalina.out' for host in host_dict: if host_dict[host] == "api": check_log(host,api_log_file) else: check_log(host,cs_log_file) check_log(host,oss_log_file) del_text(deny_file)
如果日志使用了es收集,则更方便操作
'''
先安装模块,主要有两个,一个是核心模块,必须安装 pip install aliyun-python-sdk-core
另一个是你要修改哪个产品,本次是更改负载均衡,所以安装 pip install aliyun-python-sdk-slb ,其他产品参考 https://developer.aliyun.com/tools/sdk#/python
功能:每隔10分钟分析各个服务器日志,查看是否有异常IP调用短信接口(单个IP一分钟内调用短信接口10次视为异常IP),如有则把IP加入黑名单,2个小时后解封
'''
!/usr/bin/python
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkslb.request.v20140515.AddAccessControlListEntryRequest import AddAccessControlListEntryRequest
from aliyunsdkslb.request.v20140515.RemoveAccessControlListEntryRequest import RemoveAccessControlListEntryRequest
from elasticsearch import Elasticsearch
import smtplib
from email.mime.text import MIMEText
from email.header import Header
import paramiko,re,logging,os
import time,datetime,json
class slb:
accessKeyId = '********' # key 需要登录阿里云账号获取 accessSecret = '********' acl_id = 'acl-****' # 负载均衡安全组id log_file = '/var/log/msg-api.log' BASE_DIR = os.path.dirname(os.path.abspath(__file__)) deny_file = BASE_DIR + "/deny.hosts" logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s', filename=log_file, filemode='a+') logging.info("starting") def __init__(self,index): self.index = index def start(self): if not os.path.exists(slb.log_file): os.mknod(slb.log_file) if not os.path.exists(slb.deny_file): os.mknod(slb.deny_file)
def chack_log(self): self.start() es=Elasticsearch("10.253.**.**",port=9200) body = { "query": { "bool": { "must": [ { "match": { "message": "regip" } }, { "range": { "@timestamp": { "gte": "now-1m", "lte": "now", "format": "epoch_millis" } } } ], "must_not": [] } } } logging.info(f"开始检查{index}日志") result = es.search(index=self.index, body=body) #返回字典形式 result1 = json.dumps(result, indent=2, ensure_ascii=False) #返回格式化形式,方便查看,但不是字典 if result["hits"]["total"] > 9: #es查询结果大于9 表示可能有用户异常调用短信接口 ip_list = [] ip_dict = {} try: #获取一分钟内调用短信接口的所有IP for info in result["hits"]["hits"]: log_info = info["_source"]["log_info"] deny_host_re = re.compile(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}') # ip的正则表达式 deny_host = re.search(deny_host_re,log_info).group() ip_list.append(deny_host) # 统计各个IP调用结果次数 for deny_ip in ip_list: if deny_ip in ip_dict: ip_dict[deny_ip] += 1 else: ip_dict[deny_ip] = 1 #判断异常IP是否已存在,不存在则执行添加黑名单操作 for deny_host,count in ip_dict.items(): #遍历字典,deny_host为调用短信接口的ip count为调用次数 if count > 2: now_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M") shell = f'cat {slb.deny_file} | grep "{deny_host}" | wc -l' if int(os.popen(shell).read()) == 0: # 判断当前IP是否已存在黑名单,避免重复操作 f = open(slb.deny_file, "a", encoding='utf-8') str_info = now_time + " " + deny_host + " " + f"web:{index}" + "\n" # 文本格式:封禁时间 待封IP web服务器 logging.info(f"{str_info}str_info: %s") f.write(str_info) f.close() self.slb_add_host(deny_host) # 执行添加黑名单动作 logging.info("执行添加黑名单操作") else: logging.info(f"IP:{deny_host} 已存在黑名单中") except BaseException as e: logging.error(f"检查{index}日志时发现错误,错误信息:{e}") else: logging.info(f"{index} 本次检测无异常")
#添加ip至黑名单 def slb_add_host(self,host): client = AcsClient(self.accessKeyId, self.accessSecret, 'cn-hangzhou') # 阿里云账号的key request = AddAccessControlListEntryRequest() # 本次api调用的接口,接口有很多,可参考https://help.aliyun.com/document_detail/27570.html?spm=a2c4g.11186623.6.622.3b735682J2ldXT request.set_accept_format('json') try: host = host + "/32" request.set_AclEntrys([{"entry": host, "comment": "deny"}]) # 需要拒绝的ip request.set_AclId(self.acl_id) # 负载均衡安全组id response = client.do_action_with_exception(request) # 执行操作 logging.info(str(response, encoding='utf-8')) # 查看调用接口结果 self.send_mail(host, str="添加至黑名单") logging.info(f"slb添加异常IP:{host}成功!") except BaseException as e: logging.error(f"添加黑名单失败,原因:{e}" )
#slb黑名单里移出ip def slb_del_host(self,host): host = str(host) + "/32" logging.info(f"正在解封IP:{host}" ) try: client = AcsClient(self.accessKeyId, self.accessSecret, 'cn-hangzhou') request = RemoveAccessControlListEntryRequest() request.set_accept_format('json') request.set_AclEntrys([{"entry": host, "comment": "deny"}]) request.set_AclId(self.acl_id) client.do_action_with_exception(request) self.send_mail(host, str="移出黑名单") logging.info(f"slb删除IP:{host}成功" ) # 查看调用接口结果 except BaseException as e: logging.error("移出黑名单失败,原因:%s" % e) #发送邮件 def send_mail(self,host, str): # 第三方 SMTP 服务 mail_host = "smtp.exmail.qq.com" # 设置服务器 mail_user = "****@tan66.com" # 用户名 mail_pass = "*****" # 密码 sender = '******@tan66.com' receivers = ['*****@tan66.com'] # 接收邮件,可设置为你的QQ邮箱或者其他邮箱 message = MIMEText(f'短信接口异常调用IP:{host}\n状态:{str}!', 'plain', 'utf-8') message['From'] = Header("短信接口", 'utf-8') message['To'] = Header("研发组", 'utf-8') subject = '短信接口异常调用' message['Subject'] = Header(subject, 'utf-8') try: smtpObj = smtplib.SMTP_SSL(mail_host, 465) # 465 为腾讯企业邮箱SMTP SSL端口号 smtpObj.login(mail_user, mail_pass) smtpObj.sendmail(sender, receivers, message.as_string()) logging.info("邮件发送成功") except smtplib.SMTPException as e: logging.error("发送邮件失败,原因:%s" % e)
# 删除slb文本里黑名单ip def del_text(self): with open(self.deny_file, "r", encoding='utf-8') as f: data = f.read() #logging.info("黑名单文件:%s" % self.deny_file) if data: d_list = [] # 解封ip清单 with open(self.deny_file, "r", encoding='utf-8') as f: lines = f.readlines() # logging.info("lines:%s" % lines) for line in lines: if line.strip().lstrip(): now_time = time.time() time1 = line.split(" ")[0] # 获取文本中被封ip的时间 time2 = time.mktime(time.strptime(time1, '%Y-%m-%d %H:%M')) # 将时间转换给时间戳 time3 = int(now_time - time2) # 解封倒计时 host = line.split(" ")[1].strip().lstrip() if time3 > 7200: self.slb_del_host(host) d_list.append(host) else: logging.info("%-15s 预计%s 秒解封" % (host, 7200 - time3)) for deny in d_list: shell = "sed -i '/%s/d' %s" % (deny, self.deny_file) os.system(shell) logging.info("文本中删除黑名单ip:%s" % deny)
if name == "__main__":
index_list = ["**-tomcat","**-tomcat","*****"] for index in index_list: s = slb(index) s.chack_log() s.del_text()
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
uni-app 中保持用户登录状态
在应用中保持登录状态是一个应用常见的需求,本文简单介绍下在 uni-app 中如何保存用户登录状态。 简介 uni-app 中不支持读写 cookie,所以不能如传统的应用那样通过读取 cookie 来判断是否是登录状态。 在 uni-app 进行登录操作时,可以将需要校验的数据放在 uni.request 的 data 中,也可以在 header 里设置 token,使用 token 进行登录状态校验。 流程:首页为未登录状态 => 进行登录 => 首页状态改变 => 退出应用再次进入仍然是已登录状态。 vuex 使用 vuex 进行管理登陆状态和保存用户信息,下面是部分代码。 const store = new Vuex.Store({ state: { uerInfo: {}, hasLogin: false }, mutations: { login(state, provider) {//改变登录状态 state.hasLogin = true state.uerInfo.token = provider.token state.uerInfo.userNa...
- 下一篇
微信小程序转换uni-app详细指南
小程序转换uni-app的步骤 微信小程序的语法,其实是vue.js语法的裁剪定制版,在数据绑定、自定义组件等很多方面都有相似之处。以下是一个小程序源码转换步骤指南: 客户端代码转换 新建一个uni-app项目,把之前的app.js、app.wxss的代码,挪到app.vue里,分别放到script和style节点下面 转换app.json为pages.json,把每个小程序page目录下的index.json(或页面名称对应的json)里的配置取出来,放到pages.json的style下 pages下每个页面目录放一个vue空文件模板 把之前页面的wxss内容复制到vue文件的style中,无需改动 把之前页面的js内容复制到vue文件的script中,然后执行如下改动 5.1 之前js里面的data,放到新的data return下之前 Page({ data: { show1: false } }) 现在 <script> export default { data() { return { show1: false } } } </script> - ...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- MySQL8.0.19开启GTID主从同步CentOS8
- 设置Eclipse缩进为4个空格,增强代码规范
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Hadoop3单机部署,实现最简伪集群
- CentOS8安装Docker,最新的服务器搭配容器使用
- Docker快速安装Oracle11G,搭建oracle11g学习环境