您现在的位置是:首页 > 文章详情

阿里云物联网平台设备数据转发到消息队列RocketMQ全链路测试

日期:2019-12-05点击:470

概述

您可以使用规则引擎,将物联网平台数据转发到消息队列(RocketMQ)中存储。从而实现消息从设备、物联网平台、RocketMQ到应用服务器之间的全链路高可靠传输能力。文本从物联网平台的产品及设备的创建开始,逐步介绍整个链路的完整实现。

操作步骤


1、创建物联网产品及设备

参考 阿里云物联网平台Qucik Start 快速创建产品和设备。

2、RocketMQ控制台 创建实例、Topic和Group,这个为了方便本地测试消费MQ的消息,选择在公网区域创建相关资源

_

3、配置规则引擎

a、配置总览
_

b、处理数据
_

c、转发数据
_

d、配置完开启规则引擎

_

相关参考:
SQL语句参考
数据转发到消息队列RocketMQ

设备上报属性消息
import com.alibaba.taro.AliyunIoTSignUtil; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.util.HashMap; import java.util.Map; public class IoTDemoPubSubDemo1 { public static String productKey = "*********"; public static String deviceName = "*********"; public static String deviceSecret = "*********"; public static String regionId = "cn-shanghai"; // 物模型-属性上报topic private static String pubTopic = "/sys/" + productKey + "/" + deviceName + "/thing/event/property/post"; private static MqttClient mqttClient; public static void main(String [] args){ // 初始化Mqtt Client对象 initAliyunIoTClient(); // 汇报属性 postDeviceProperties(); } /** * 初始化 Client 对象 */ private static void initAliyunIoTClient() { try { // 构造连接需要的参数 String clientId = "java" + System.currentTimeMillis(); Map<String, String> params = new HashMap<>(16); params.put("productKey", productKey); params.put("deviceName", deviceName); params.put("clientId", clientId); String timestamp = String.valueOf(System.currentTimeMillis()); params.put("timestamp", timestamp); // cn-shanghai String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883"; String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|"; String mqttUsername = deviceName + "&" + productKey; String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1"); connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword); } catch (Exception e) { System.out.println("initAliyunIoTClient error " + e.getMessage()); } } public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception { MemoryPersistence persistence = new MemoryPersistence(); mqttClient = new MqttClient(url, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); // MQTT 3.1.1 connOpts.setMqttVersion(4); connOpts.setAutomaticReconnect(false); // connOpts.setCleanSession(true); connOpts.setCleanSession(false); connOpts.setUserName(mqttUsername); connOpts.setPassword(mqttPassword.toCharArray()); connOpts.setKeepAliveInterval(60); mqttClient.connect(connOpts); } /** * 汇报属性 */ private static void postDeviceProperties() { try { //上报数据 //高级版 物模型-属性上报payload System.out.println("上报属性值"); String payloadJson = "{\"params\":{\"Status\":1,\"Data\":\"33\"}}"; MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8")); message.setQos(1); mqttClient.publish(pubTopic, message); } catch (Exception e) { System.out.println(e.getMessage()); } } }
运行状态显示

_

参考链接:基于开源JAVA MQTT Client连接阿里云IoT

MQ Topic消息订阅

1、pom.xml

 <dependencies> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.8.0.Final</version> </dependency> </dependencies>

2、Code Sample

import com.aliyun.openservices.ons.api.*; import java.util.Properties; public class ConsumerTest { public static void main(String[] args) { Properties properties = new Properties(); // 您在控制台创建的 Group ID properties.put(PropertyKeyConst.GROUP_ID, "GID_****"); // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建 properties.put(PropertyKeyConst.AccessKey,"********"); // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建 properties.put(PropertyKeyConst.SecretKey, "********"); properties.put(PropertyKeyConst.ConsumeThreadNums,50); // 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看 properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://MQ_INST_184821781661****_BaQU****.mq-internet-access.mq-internet.aliyuncs.com:80"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("Topic_MQDemo", "*", new MessageListener() { //订阅多个 Tag public Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + message); System.out.println("Message : " + new String(message.getBody()));//打印输出消息 return Action.CommitMessage; } }); consumer.start(); System.out.println("Consumer Started"); } }

3、测试结果

Receive: Message [topic=Topic_MQDemo, systemProperties={KEYS=1202850299555940352, __KEY=1202850299555940352, __RECONSUMETIMES=0, __BORNHOST=/11.115.104.187:51935, __MSGID=0B7368BB19AF531D72CA1D0A9B540077, MIN_OFFSET=520, __BORNTIMESTAMP=1575616834388, MAX_OFFSET=525}, userProperties={UNIQ_KEY=0B7368BB19AF531D72CA1D0A9B540077, MSG_REGION=cn-qingdao-publictest, eagleTraceId=0bc5f2c215756168339094214d0a1b, TRACE_ON=true, eagleData=s3bef5fb6, CONSUME_START_TIME=1575616834470, eagleRpcId=0.1.11.10.10.1.1.1.1}, body=38] Message : {"data1":"33","deviceName":"MQDevice"}

日志查询(方便对问题进行跟踪定位排查)

1、物联网平台 -》 运维监控 -》 日志服务 -》 上行日志
_

2、消息队列RocketMQ -》 消息查询
_

AMQP服务端订阅

1、管理门户配置

_
_

2、代码订阅,参考链接

3、Code Sample

import java.net.URI; import java.util.Hashtable; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; import org.apache.commons.codec.binary.Base64; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsConnectionListener; import org.apache.qpid.jms.message.JmsInboundMessageDispatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class AmqpJavaClientDemo { private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClientDemo.class); public static void main(String[] args) throws Exception { //参数说明,请参见上一篇文档:AMQP客户端接入说明。 String accessKey = "******"; String accessSecret = "******"; String consumerGroupId = "******"; long timeStamp = System.currentTimeMillis(); //签名方法:支持hmacmd5,hmacsha1和hmacsha256 String signMethod = "hmacsha1"; //控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。 //建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。 String clientId = "yutaodemotest"; logger.debug("demo"); logger.error("error","this is my test error info."); logger.info("info"); logger.error("error"); //UserName组装方法,请参见上一篇文档:AMQP客户端接入说明。 String userName = clientId + "|authMode=aksign" + ",signMethod=" + signMethod + ",timestamp=" + timeStamp + ",authId=" + accessKey + ",consumerGroupId=" + consumerGroupId + "|"; //password组装方法,请参见上一篇文档:AMQP客户端接入说明。 String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp; String password = doSign(signContent,accessSecret, signMethod); //按照qpid-jms的规范,组装连接URL。 String connectionUrl = "failover:(amqps://******.iot-amqp.cn-shanghai.aliyuncs.com?amqp.idleTimeout=80000)" + "?failover.maxReconnectAttempts=10&failover.reconnectDelay=30"; Hashtable<String, String> hashtable = new Hashtable<>(); hashtable.put("connectionfactory.SBCF",connectionUrl); hashtable.put("queue.QUEUE", "default"); hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory"); Context context = new InitialContext(hashtable); ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF"); Destination queue = (Destination)context.lookup("QUEUE"); // Create Connection Connection connection = cf.createConnection(userName, password); ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener); // Create Session // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge() // Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); connection.start(); // Create Receiver Link MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(messageListener); } private static MessageListener messageListener = new MessageListener() { @Override public void onMessage(Message message) { try { byte[] body = message.getBody(byte[].class); String content = new String(body); String topic = message.getStringProperty("topic"); String messageId = message.getStringProperty("messageId"); System.out.println("Content:" + content); logger.info("receive message" + ", topic = " + topic + ", messageId = " + messageId + ", content = " + content); //如果创建Session选择的是Session.CLIENT_ACKNOWLEDGE,这里需要手动ACK。 //message.acknowledge(); //如果要对收到的消息做耗时的处理,请异步处理,确保这里不要有耗时逻辑。 } catch (Exception e) { e.printStackTrace(); } } }; private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() { /** * 连接成功建立。 */ @Override public void onConnectionEstablished(URI remoteURI) { logger.info("onConnectionEstablished, remoteUri:{}", remoteURI); } /** * 尝试过最大重试次数之后,最终连接失败。 */ @Override public void onConnectionFailure(Throwable error) { logger.error("onConnectionFailure, {}", error.getMessage()); } /** * 连接中断。 */ @Override public void onConnectionInterrupted(URI remoteURI) { logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI); } /** * 连接中断后又自动重连上。 */ @Override public void onConnectionRestored(URI remoteURI) { logger.info("onConnectionRestored, remoteUri:{}", remoteURI); } @Override public void onInboundMessage(JmsInboundMessageDispatch envelope) {} @Override public void onSessionClosed(Session session, Throwable cause) {} @Override public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {} @Override public void onProducerClosed(MessageProducer producer, Throwable cause) {} }; /** * password签名计算方法,请参见上一篇文档:AMQP客户端接入说明。 */ private static String doSign(String toSignString, String secret, String signMethod) throws Exception { SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod); Mac mac = Mac.getInstance(signMethod); mac.init(signingKey); byte[] rawHmac = mac.doFinal(toSignString.getBytes()); return Base64.encodeBase64String(rawHmac); } } 

4、测试结果

 Content:{"deviceType":"None","iotId":"QyLOC9FsRiJePV*********","requestId":"null","productKey":"a1QVZRPkS5g","gmtCreate":1575632021248,"deviceName":"MQDevice","items":{"Status":{"value":1,"time":1575632021255},"Data":{"value":"33","time":1575632021255}}}

5、服务端订阅门户监控
_

更多参考

控制台配置AMQP服务端订阅

Rocket MQ订阅消息

原文链接:https://yq.aliyun.com/articles/738228
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章