[雪峰磁针石博客]python工具库介绍-dubbo:通过telnet接口访问dubbo服务
dubbo服务发布之后,我们可以利用telnet命令进行调试、管理。更多资料参见 [Telnet命令参考手册](http://alibaba.github.io/dubbo-doc-static/Telnet+Command+Reference-zh-showComments=true&showCommentArea=true.htm)
telnet 调用示例:
```python
$ telnet 172.17.103.110 9097
Trying 172.17.103.110...
Connected to 172.17.103.110.
Escape character is '^]'.
dubbo>ls com.oppo.sso.service.onekey.IOnekeyRegister
register
dubbo>invoke com.oppo.sso.service.onekey.IOnekeyRegister.register({"applicationKey":"mac","imei":"","mobile":"13244448888","createIP":"127.0.0.1","createBy":"172.17.0.1"})
{"configCountry":null,"userIdLong":0,"appPackage":null,"appVersion":null,"accountName":null,"romVersion":null,"resultCode":3001,"thirdStatus":null,"registerType":0,"sendChannel":null,"operator":null,"manufacturer":null,"password":null,"osVersion":null,"lock":false,"model":null,"visitorLocked":false,"OK":false,"brand":null,"email":null,"createIP":null,"deny":false,"encryptEmail":null,"sessionKey":null,"thirdId":null,"passwordOriginal":null,"mobile":null,"applicationKey":null,"thirdpartyOk":false,"userAgent":null,"userName":null,"resultDesc":"应用不存在","userId":0,"encryptMobile":null,"emailStatus":null,"createBy":null,"freePwd":false,"changeTimes":0,"createTime":null,"mobileStatus":null,"oldLock":false,"codeTimeout":null,"lastUpdate":null,"imei":null,"sessionTimeout":0,"sdkVersion":null,"networkID":0,"status":null}
elapsed: 98 ms.
dubbo>
```
## Python实现
源码:
```python
"""
Name: dubbo.py
Tesed in python3.5
"""
import json
import telnetlib
import socket
class Dubbo(telnetlib.Telnet):
prompt = 'dubbo>'
coding = 'utf-8'
def __init__(self, host=None, port=0,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
super().__init__(host, port)
self.write(b'\n')
def command(self, flag, str_=""):
data = self.read_until(flag.encode())
self.write(str_.encode() + b"\n")
return data
def invoke(self, service_name, method_name, arg):
command_str = "invoke {0}.{1}({2})".format(
service_name, method_name, json.dumps(arg))
self.command(Dubbo.prompt, command_str)
data = self.command(Dubbo.prompt, "")
data = json.loads(data.decode(Dubbo.coding,
errors='ignore').split('\n')[0].strip())
return data
if __name__ == '__main__':
conn = Dubbo('172.17.103.110', 9097)
json_data = {
"applicationKey":"mac",
"imei":"",
"mobile":"13244448888",
"createIP":"127.0.0.1",
"createBy":"172.17.0.1"
}
result = conn.invoke(
"com.oppo.sso.service.onekey.IOnekeyRegister",
"register",
json_data
)
print(result)
```
执行结果:
```python
# python3 dubbo.py
{'manufacturer': None, 'applicationKey': None, 'OK': False, 'codeTimeout': None, 'password': None, 'encryptEmail': None, 'passwordOriginal': None, 'thirdId': None, 'emailStatus': None, 'freePwd': False, 'sessionTimeout': 0, 'createTime': None, 'osVersion': None, 'lastUpdate': None, 'email': None, 'sdkVersion': None, 'registerType': 0, 'sendChannel': None, 'visitorLocked': False, 'createIP': None, 'thirdStatus': None, 'encryptMobile': None, 'networkID': 0, 'resultCode': 3001, 'brand': None, 'changeTimes': 0, 'userAgent': None, 'imei': None, 'operator': None, 'romVersion': None, 'model': None, 'lock': False, 'sessionKey': None, 'configCountry': None, 'deny': False, 'userIdLong': 0, 'resultDesc': '应用不存在', 'status': None, 'createBy': None, 'thirdpartyOk': False, 'appPackage': None, 'appVersion': None, 'accountName': None, 'userId': 0, 'oldLock': False, 'userName': None, 'mobile': None, 'mobileStatus': None}
```
更复杂的例子。
源码:
```python
import json
import telnetlib
import socket
class Dubbo(telnetlib.Telnet):
prompt = 'dubbo>'
coding = 'gbk'
def __init__(self, host=None, port=0,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
super().__init__(host, port)
self.write(b'\n')
def command(self, flag, str_=""):
data = self.read_until(flag.encode())
self.write(str_.encode() + b"\n")
return data
def invoke(self, service_name, method_name, arg):
command_str = "invoke {0}.{1}({2})".format(
service_name, method_name, json.dumps(arg))
self.command(Dubbo.prompt, command_str)
data = self.command(Dubbo.prompt, "")
data = json.loads(data.decode(Dubbo.coding,
errors='ignore').split('\n')[0].strip())
return data
if __name__ == '__main__':
conn = Dubbo('183.131.22.99', 21881)
content = {
"sign": "FKeKnMEPybHujjBTzz11BrulB5av7pLhJpk=",
"partnerOrder": "0511e0d38f334319a96920fa02be02a7",
"productDesc": "hello",
"paySuccessTime": "2016-08-25 18:33:04",
"price": "1",
"count": "1",
"attach": "from_pay_demo",
"date": "20160825",
}
content_json = json.dumps(content).replace('"', '\\"')
json_data = {
"requestId": "0511e0d38f334319a96920fa02be02a7",
"reqUrl": 'http://www.oppo.com',
"httpReqType": "POST",
"headerMap": None,
"reqContent": content_json,
"appId": "10001",
"productName": "test",
"timeStamp": "1472121184957",
"partnerId": "2031",
"signType": "MD5",
"sign": "23B621FBBF887373C65E553C2222258F",
"successResult": "OK",
}
result = conn.invoke(
"com.oppo.pay.notify.api.facade.NotifyApplyService",
"httpNotify",
json_data
)
print(result)
```
执行结果:
```python
$ python3 dubbo.py
{'resMsg': 'ǩ', 'data': None, 'errorDetail': 'sign check fail!', 'resCode': '100003', 'success': False}
```
## 待改进
* 进行多个项目的实验,增加异常处理
## 参考资料
- [本文最新版本地址](https://china-testing.github.io/python3_lib_dubbo.html)
- [本文涉及的python测试开发库](https://github.com/china-testing/python-api-tesing) 谢谢点赞!
- [本文相关海量书籍下载](https://github.com/china-testing/python-api-tesing/blob/master/books.md)
- [本文源码地址](https://github.com/china-testing/python-api-tesing/tree/master/python3_libraries/_dubbo)
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
phpmyadmin 503错误无法访问的解决方法
昨天ytkah更新了一些服务器软件,今天访问数据库居然出现503错误,主要提示如下。点开phpmyadmin设置,查看了一下端口,没有改动;重启了一下phpmyadmin也不能运行;再看了一下php版本,发现默认值是“纯静态”,这个应该跟php版本有关,因为phpmyadmin也是运行在php上,纯静态是错误的 1 The server is temporarily unable to service your request due to maintenance downtime or capacity problems. Please try again later. phpmyadmin设置 phpmyadmin管理的php版本修改,选择php-71保存后,再访问phpmyadmin控制面板,没问题了!如果还是访问不了,多试几次。 更多详情请访问503错误。
- 下一篇
[雪峰磁针石博客]python机器学习、web开发等书籍汇总
Learning Python Web Penetration - Christian Martorella - 2018.pdf 下载地址 Leverage the simplicity of Python and available libraries to build web security testing tools for your application Key Features Understand the web application penetration testing methodology and toolkit using Python Write a web crawler/spider with the Scrapy library Detect and exploit SQL injection vulnerabilities by creating a script all by yourself Book Description Web penetration testing is the use of tools and code to att...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- CentOS8安装Docker,最新的服务器搭配容器使用
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- 设置Eclipse缩进为4个空格,增强代码规范
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS8编译安装MySQL8.0.19
- SpringBoot2整合Redis,开启缓存,提高访问速度
- SpringBoot2配置默认Tomcat设置,开启更多高级功能