阿里云物联网平台设备历史属性上报及MNS服务端订阅
概述
阿里云物联网平台不仅支持设备即时属性的上报,也支持因为某些原因,没有及时上报的属性数据,通过历史属性上报的方式进行上报,历史属性上报Topic:/sys/{productKey}/{deviceName}/thing/event/property/history/post。本文结合物联网平台最新推出的独享实例,在新的实例下面创建产品及设备,进行历史属性的上报测试,并进行 MNS 历史属性服务端订阅。
操作步骤
1、创建独享实例,在独享实例下面创建产品和设备
2、获取独享实例设备端MQTT接入点
3、设备端接入,参考链接
import com.alibaba.taro.AliyunIoTSignUtil; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.util.HashMap; import java.util.Map; public class IoTDemoPubSubDemo { public static String productKey = "g028S******"; public static String deviceName = "device1"; public static String deviceSecret = "aKEHNK1w0UBvGcA1BA7gf2eHC********"; public static String regionId = "cn-shanghai"; public static String isntanceId = "iot-instc-public-cn-0pp1g*******"; // 物模型 - 历史属性上报topic private static String pubTopic = "/sys/" + productKey + "/" + deviceName + "/thing/event/property/history/post"; private static MqttClient mqttClient; public static void main(String [] args){ initAliyunIoTClient(); postDeviceProperties(); } /** * 初始化 Client 对象 */ private static void initAliyunIoTClient() { try { // 构造连接需要的参数 String clientId = "java" + System.currentTimeMillis(); Map<String, String> params = new HashMap<>(16); params.put("productKey", productKey); params.put("deviceName", deviceName); params.put("clientId", clientId); String timestamp = String.valueOf(System.currentTimeMillis()); params.put("timestamp", timestamp); // MQTT 设备接入 公网终端节点(Endpoint) String targetServer = "tcp://" + isntanceId + ".iot-as-mqtt."+regionId+".iothub.aliyuncs.com:1883"; String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|"; String mqttUsername = deviceName + "&" + productKey; String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1"); connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword); } catch (Exception e) { System.out.println("initAliyunIoTClient error " + e.getMessage()); } } public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception { MemoryPersistence persistence = new MemoryPersistence(); mqttClient = new MqttClient(url, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); // MQTT 3.1.1 connOpts.setMqttVersion(4); connOpts.setAutomaticReconnect(false); // connOpts.setCleanSession(true); connOpts.setCleanSession(false); connOpts.setUserName(mqttUsername); connOpts.setPassword(mqttPassword.toCharArray()); connOpts.setKeepAliveInterval(60); mqttClient.connect(connOpts); } /** * 汇报属性 */ private static void postDeviceProperties() { try { //上报数据 //高级版 物模型-属性上报payload System.out.println("历史上报属性值"); String payloadJson = "{ \"id\": 123, \"version\": \"1.0\", \"method\": \"thing.event.property.history.post\", \"params\": [ { \"identity\": { \"productKey\": \"g028S******\", \"deviceName\": \"device1\" }, \"properties\": [ { \"AreaId\": { \"value\": \"history data test 4\", \"time\": 1578198990000 } } ] } ] }"; MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8")); message.setQos(1); mqttClient.publish(pubTopic, message); } catch (Exception e) { System.out.println(e.getMessage()); } } }
payLoad格式参考。
4、设备运行状态查看
5、MNS服务端订阅配置
6、使用 MNS Python SDK 获取MNS订阅到Queue中的消息
- 6.1 安装SDK
- 6.2 Code Sample
#init my_account, my_queue from mns.account import Account import sys import json import base64 endpoint = "http://18482178********.mns.cn-shanghai.aliyuncs.com/" accid = "LTAIOZZg********" acckey = "v7CjUJCMk7j9aKduMAQLjy********" token = "" my_account = Account(endpoint, accid, acckey, token) queue_name = "aliyun-iot-g028S******" my_queue = my_account.get_queue(queue_name) #循环读取删除消息直到队列空 #receive message请求使用long polling方式,通过wait_seconds指定长轮询时间为3秒 ## long polling 解析: ### 当队列中有消息时,请求立即返回; ### 当队列中没有消息时,请求在MNS服务器端挂3秒钟,在这期间,有消息写入队列,请求会立即返回消息,3秒后,请求返回队列没有消息; wait_seconds = 3 print("%sReceive And Delete Message From Queue%s\nQueueName:%s\nWaitSeconds:%s\n" % (10*"=", 10*"=", queue_name, wait_seconds)) while True: #读取消息 try: recv_msg = my_queue.receive_message(wait_seconds) print("Receive Message Succeed! ReceiptHandle:%s MessageBody:%s MessageID:%s" % (recv_msg.receipt_handle, recv_msg.message_body, recv_msg.message_id)) print("Message Body: ",base64.b64decode(json.loads(recv_msg.message_body)['payload'])) # 转发到mns 的messagebody经过了base64编码,获取具体消息的内容需要做base64解码 except Exception as e: #except MNSServerException as e: if e.type == u"QueueNotExist": print("Queue not exist, please create queue before receive message.") sys.exit(0) elif e.type == u"MessageNotExist": print("Queue is empty!") sys.exit(0) print("Receive Message Fail! Exception:%s\n" % e) continue #删除消息 try: my_queue.delete_message(recv_msg.receipt_handle) print("Delete Message Succeed! ReceiptHandle:%s" % recv_msg.receipt_handle) except Exception as e: print("Delete Message Fail! Exception:%s\n" % e) print("Delete Message Fail! Exception:%s\n" % e)
- 6.3 Test Result
Receive Message Succeed! ReceiptHandle:8-2zuDHj20LzZz8zcFz0z6YEzcSFqxyIKefW MessageBody:{"payload":"eyJkZXZpY2VUeXBlIjoiQ3VzdG9tQ2F0ZWdvcnkiLCJpb3RJZCI6IlY1WGJXekluY0EyOW41aWNib3RYZzAyODAwIiwicmVxdWVzdElkIjoiMTIzIiwicHJvZHVjdEtleSI6ImcwMjhTR1o4RlJDIiwiZ210Q3JlYXRlIjoxNTc4MjkxNzI0NjY4LCJkZXZpY2VOYW1lIjoiZGV2aWNlMSIsIml0ZW1zIjp7IkFyZWFJZCI6eyJ0aW1lIjoxNTc4MTk4OTkwMDAwLCJ2YWx1ZSI6Imhpc3RvcnkgZGF0YSB0ZXN0*********","messagetype":"thing_history","topic":"/g028S******/device1/thing/event/property/history/post","messageid":1214069604465248256,"timestamp":1578291724} MessageID:5F8092E53A67656C7F811CD50E19A150 Message Body: b'{"deviceType":"CustomCategory","iotId":"V5XbWzIncA29n5icbo********","requestId":"123","productKey":"g028S******","gmtCreate":1578291724668,"deviceName":"device1","items":{"AreaId":{"time":1578198990000,"value":"history data test 4"}}}' DEBUG: v7CjUJCMk7j9aKduMAQLjyCmb8cmCm hAbKzezvRlqeVXC18CzChL4OZZk= DELETE
更多参考
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
1月9日社区直播【使用Apache SuperSet和EMR Spark打造交互式的数据探索平台】
主题: 使用Apache SuperSet和EMR Spark打造交互式的数据探索平台 点击这里是直播间直达链接(回看链接) 时间1月9日:19:00-20:00 直播介绍:本次分享主要介绍如何结合Apache SuperSet和EMR Spark,利用EMR Spark提供的JindoCube高级特性在SuperSet进行秒级响应,交互式的可视化数据探索。 主讲人:李呈祥,花名司麟,阿里云智能EMR团队高级技术专家,Apache Hive Committer, Apache Flink Committer,目前主要专注于EMR产品中开源计算引擎的优化工作。 请钉钉扫码至群内观看直播,与嘉宾互动有机会获得社区礼物一份。
- 下一篇
图解Go语言的context了解编程语言核心实现源码
基础筑基 基于线程的编程语言中的一些设计 ThreadGroup ThreadGroup是基于线程并发的编程语言中常用的一个概念,当一个线程派生出一个子线程后通常会加入父线程的线程组(未指定线程组的情况下)中, 最后可以通过ThreadGroup来控制一组线程的退出等操作, 然后在go语言中goroutine没有明确的这种parent/children的关系,如果想退出当前调用链上的所有goroutine则需要用到context ThreadLocal 在基于线程的编程语言语言中,通常可以基于ThreadLocal来进行一些线程本地的存储,本质上是通过一个Map来进行key/value的存储,而在go里面并没有ThreadLocal的设计,在key/value传递的时候,除了通过参数来进行传递,也可以通过context来进行上下文信息的传递 context典型应用场景 场景 实现 原理 上下文信息传递 WithValue 通过一个内部的key/value属性来进行键值对的保存,不可修改,只能通过覆盖的方式来进行值得替换 退出通知 WithCancel 通过监听通知的channel来进行...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2整合Redis,开启缓存,提高访问速度
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS8编译安装MySQL8.0.19
- CentOS8安装Docker,最新的服务器搭配容器使用