您现在的位置是:首页 > 文章详情

物联网通讯协议 iot-modbus V3.2.8 发布

日期:2022-12-14点击:287
关注“腾飞开源”,一起共同成长!
1、更新内容
iot-modbus本次发布的V3.2.8版本主要优化服务端上线、掉线监听处理,以及对客户端心跳检测(超时没有接收到客户端上传的心跳则自动断开连接),请看下面的源码解读。
2、控制台日志输出效果

编辑切换为居中

添加图片注释,不超过 140 字(可选)

3、服务端连接管理器
(1)服务端增加连接管理器MiiServerConnect,重写Channel的channelActive和channelInactive方法,监听Channel活跃状态情况进行处理,如下图所示:

编辑切换为居中

添加图片注释,不超过 140 字(可选)

(2)发布连接监听事件,主要通过spring的发布时间监听来处理,增加连接监听器ChannelConnectListener。

编辑切换为居中

添加图片注释,不超过 140 字(可选)

4、对客户端心跳检测
(1)增加心跳检测超时时间配置,如下图所示:

编辑切换为居中

添加图片注释,不超过 140 字(可选)

(2)服务端心跳检测超时时间,超时则主动断开链接。

编辑切换为居中

添加图片注释,不超过 140 字(可选)

5、源码解读
(1)服务端连接管理器源码
 package com.takeoff.iot.modbus.server.connect;  import com.takeoff.iot.modbus.common.entity.ChannelConnectData; import com.takeoff.iot.modbus.common.enums.DeviceConnectEnum; import com.takeoff.iot.modbus.common.utils.CacheUtils; import com.takeoff.iot.modbus.common.utils.JudgeEmptyUtils; import com.takeoff.iot.modbus.common.utils.SpringContextUtil; import com.takeoff.iot.modbus.netty.channel.MiiChannel; import com.takeoff.iot.modbus.netty.device.MiiDeviceChannel; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.ChannelHandler.Sharable; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; import org.springframework.util.ObjectUtils;  import java.net.SocketAddress; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit;  /**  * 类功能说明:客户端链接管理器<br/>  * 公司名称:TF(腾飞)开源 <br/>  * 作者:luorongxi <br/>  */ @Slf4j @Sharable public class MiiServerConnect extends ChannelInboundHandlerAdapter { private ApplicationContext getApplicationContext = SpringContextUtil.applicationContext;   private static int TIMEOUT = 5000;   /**  * 连接成功次数  */  private Map<String, Integer> onLineMap = new HashMap<>();   /**  * 连接断开次数  */  private Map<String, Integer> breakOffMap = new HashMap<>();   public MiiServerConnect(){ } @Override  public void channelActive(ChannelHandlerContext ctx) throws Exception { //成功后,重连失败次数清零  Channel channel = ctx.channel();  ctx.fireChannelActive();  if(!JudgeEmptyUtils.isEmpty(channel.remoteAddress())){ String address = channel.remoteAddress().toString().substring(1,channel.remoteAddress().toString().length());  MiiChannel miiChannel = new MiiDeviceChannel(channel);  Integer onLine = (ObjectUtils.isEmpty(onLineMap.get(miiChannel.name())) ? 0 : onLineMap.get(miiChannel.name())) + 1;  onLineMap.put(miiChannel.name(), onLine);  ChannelConnectData connectServerData = new ChannelConnectData(this, DeviceConnectEnum.ON_LINE.getKey(), address, onLine);  if(!JudgeEmptyUtils.isEmpty(connectServerData) && !JudgeEmptyUtils.isEmpty(getApplicationContext)){ getApplicationContext.publishEvent(connectServerData);  //将柜地址与通讯管道的绑定关系写入缓存  CacheUtils.put(miiChannel.name(), miiChannel);  } } } @Override  public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive();  Channel channel = ctx.channel();  if(!JudgeEmptyUtils.isEmpty(channel) && !JudgeEmptyUtils.isEmpty(channel.remoteAddress())){ String address = channel.remoteAddress().toString().substring(1,channel.remoteAddress().toString().length());  MiiChannel miiChannel = new MiiDeviceChannel(channel);  Integer breakOff = (ObjectUtils.isEmpty(breakOffMap.get(miiChannel.name())) ? 0 : breakOffMap.get(miiChannel.name())) + 1;  breakOffMap.put(miiChannel.name(), breakOff);  ChannelConnectData connectServerData = new ChannelConnectData(this, DeviceConnectEnum.BREAK_OFF.getKey(), address, breakOff);  if(!JudgeEmptyUtils.isEmpty(connectServerData) && !JudgeEmptyUtils.isEmpty(getApplicationContext)){ getApplicationContext.publishEvent(connectServerData);  } //将通讯管道的绑定关系从缓存中删除  CacheUtils.remove(miiChannel.name());  //连接断开后的最后处理  ctx.pipeline().remove(ctx.handler());  ctx.deregister();  ctx.close();  } } } 
(2)连接监听器发布事件源码
 package com.takeoff.iot.modbus.common.entity;  import com.takeoff.iot.modbus.common.enums.DeviceConnectEnum; import com.takeoff.iot.modbus.common.utils.JudgeEmptyUtils; import org.springframework.context.ApplicationEvent;  import lombok.Getter;  @Getter public class ChannelConnectData extends ApplicationEvent { /**  * 描述: TODO <br/>  * Fields serialVersionUID : TODO <br/>  */  private static final long serialVersionUID = 2111432846029949421L;   private String deviceAddress = null;   private Integer deviceConnect = null;   private String connectMsg = null;   public ChannelConnectData(Object source, Integer deviceConnect, String deviceAddress, int count) { super(source);  if(!JudgeEmptyUtils.isEmpty(deviceAddress)){ this.deviceConnect = deviceConnect;  this.deviceAddress = deviceAddress;  this.connectMsg = "设备:"+ deviceAddress + DeviceConnectEnum.getName(deviceConnect) + ",累计:"+ count + "";  } } } 
(3)连接监听器源码
 package com.takeoff.iot.modbus.test.listener;  import com.takeoff.iot.modbus.common.entity.ChannelConnectData; import com.takeoff.iot.modbus.common.utils.JudgeEmptyUtils; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component;  import lombok.extern.slf4j.Slf4j;  @Slf4j @Component public class ChannelConnectListener { @EventListener  public void handleReceiveDataEvent(ChannelConnectData data) { if(JudgeEmptyUtils.isEmpty(data.getDeviceConnect())){ log.info("设备连接状态码:"+data.getDeviceConnect()+" ---> "+data.getConnectMsg());  } } } 
(4)服务端心跳检测超时时间源码
 package com.takeoff.iot.modbus.server;  import java.util.List; import java.util.concurrent.TimeUnit;  import com.takeoff.iot.modbus.common.utils.CacheUtils; import com.takeoff.iot.modbus.netty.device.MiiDeviceChannel; import com.takeoff.iot.modbus.netty.device.MiiDeviceGroup; import com.takeoff.iot.modbus.netty.device.MiiControlCentre; import com.takeoff.iot.modbus.common.bytes.factory.MiiDataFactory; import com.takeoff.iot.modbus.common.data.MiiHeartBeatData; import com.takeoff.iot.modbus.common.message.MiiMessage; import com.takeoff.iot.modbus.netty.channel.MiiChannel; import com.takeoff.iot.modbus.netty.channel.MiiChannelGroup; import com.takeoff.iot.modbus.netty.data.factory.MiiServerDataFactory; import com.takeoff.iot.modbus.netty.handle.*; import com.takeoff.iot.modbus.netty.listener.MiiListener; import com.takeoff.iot.modbus.server.connect.MiiServerConnect; import com.takeoff.iot.modbus.server.message.sender.MiiServerMessageSender; import com.takeoff.iot.modbus.server.message.sender.ServerMessageSender; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import lombok.Getter; import lombok.extern.slf4j.Slf4j;  /**  * 类功能说明:设备通讯服务端<br/>  * 公司名称:TF(腾飞)开源 <br/>  * 作者:luorongxi <br/>  */ @Slf4j public class MiiServer extends ChannelInitializer<SocketChannel> implements MiiControlCentre { private static int IDLE_TIMEOUT = 60000;   private EventLoopGroup bossGroup;  private EventLoopGroup workerGroup;  private ChannelFuture future;  private int port,nThread;  @Getter  private MiiChannelGroup groups;  private MiiServerConnect connect;  private ServerMessageSender sender;  private MiiListenerHandler handler;  private MiiDataFactory dataFactory;   /**  * 创建指定服务端口,默认线程数的服务端  * @param port 服务端口  */  public MiiServer(int port){ this(port, 0, IDLE_TIMEOUT);  } /**  * 创建指定服务端口,指定线程数的服务端  * @param port 服务端口  * @param nThread 执行线程池线程数  * @param heartBeatTime 心跳检测超时时间(单位:毫秒)  */  public MiiServer(int port, int nThread, int heartBeatTime){ this.port = port;  this.nThread = nThread;  this.IDLE_TIMEOUT = heartBeatTime;  this.groups = new MiiChannelGroup();  this.connect = new MiiServerConnect();  this.sender = new MiiServerMessageSender();  this.handler = new MiiListenerHandler(this.groups);  this.handler.addListener(MiiMessage.HEARTBEAT, new MiiListener() { @Override  public void receive(MiiChannel channel, MiiMessage message) { //通讯通道绑定设备IP  groups.get(channel.name()).name(message.deviceGroup());  log.info("Netty通讯已绑定设备IP"+ message.deviceGroup());  } });  this.dataFactory = new MiiServerDataFactory();  } /**  * 启动服务  */  public void start(){ bossGroup = new NioEventLoopGroup(1);  workerGroup = new NioEventLoopGroup(nThread);  ServerBootstrap b = new ServerBootstrap();  b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(this);  future = b.bind(port);  } /**  * 停止服务  */  public void stop(){ future.channel().closeFuture();  workerGroup.shutdownGracefully();  bossGroup.shutdownGracefully();  } /**  * 根据名称/地址找已连接设备组  * 名称/地址不存在或者未连接时返回null * @param name 名称/地址  * @return 设备组  */  public MiiChannel group(String name) { return get(name);  } /**  * 列出所有已连接设备组清单  * @return 所有已连接身边组清单  */  public List<MiiChannel> groups() { return groups.list();  } public ServerMessageSender sender(){ return sender;  } /**  * 添加接收指定指令的消息监听器  * @param command 指令类型 {@link MiiMessage}  * @param listener 消息监听器  * @return 上一个消息监听器,如果没有返回null  */  public MiiListener addListener(int command, MiiListener listener){ return handler.addListener(command, listener);  } /**  * 移除接收指定指令的消息监听器  * @param command 指令类型 {@link MiiMessage}  * @return 移除消息监听器,如果没有返回null  */  public MiiListener removeListener(int command){ return handler.removeListener(command);  } @Override  protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline();  MiiDeviceGroup group = new MiiDeviceChannel(ch);  add(group);  //服务端心跳检测超时时间,超时则主动断开链接  p.addLast(new IdleStateHandler(0, 0, IDLE_TIMEOUT, TimeUnit.MILLISECONDS));  p.addLast(new ChannelInboundHandlerAdapter(){ @Override  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if(evt instanceof IdleStateEvent){ ctx.disconnect();  } else { super.userEventTriggered(ctx, evt);  } } });  p.addLast(new MiiMessageEncoder());  p.addLast(new MiiBasedFrameDecoder());  p.addLast(new MiiMessageDecoder(dataFactory));  p.addLast(connect);  p.addLast(handler);  p.addLast(new MiiExceptionHandler());  } @Override  public boolean add(MiiChannel channel) { return groups.add(channel);  } @Override  public MiiChannel remove(String name) { return groups.remove(name);  } @Override  public MiiChannel get(String name) { return groups.get(name);  } } 
(4)更详细的内容请查看“腾飞开源”物联网通讯协议 iot-modbus V3.2.8版本,gitee地址:https://gitee.com/takeoff/iot-modbus
原文链接:https://www.oschina.net/news/221504
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章