iot-modbus本次发布的V3.2.6版本支持设备上线、掉线、处理业务异常监听处理,请看下面的源码解读。
主要是在MiiListenerHandler重写了channelActive方法
![]()
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); if(!JudgeEmptyUtils.isEmpty(channel.remoteAddress())){ String address = channel.remoteAddress().toString().substring(1,channel.remoteAddress().toString().length()); ChannelConnectData connectServerData = new ChannelConnectData(this, DeviceConnectEnum.ON_LINE.getKey(), address); if(!JudgeEmptyUtils.isEmpty(connectServerData) && !JudgeEmptyUtils.isEmpty(getApplicationContext)){ getApplicationContext.publishEvent(connectServerData); } } }
主要是在MiiListenerHandler重写了channelInactive方法。
![]()
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); if(!JudgeEmptyUtils.isEmpty(channel.remoteAddress())){ String address = channel.remoteAddress().toString().substring(1,channel.remoteAddress().toString().length()); ChannelConnectData connectServerData = new ChannelConnectData(this, DeviceConnectEnum.BREAK_OFF.getKey(), address); if(!JudgeEmptyUtils.isEmpty(connectServerData) && !JudgeEmptyUtils.isEmpty(getApplicationContext)){ getApplicationContext.publishEvent(connectServerData); } } //连接断开后的最后处理 ctx.pipeline().remove(this); ctx.deregister(); ctx.close(); }
主要是增加了MiiExceptionHandler处理器,并在服务端MiiServer和客户端MiiClient初始化initChannel时添加进入。
![]()
package com.takeoff.iot.modbus.netty.handle;
import com.takeoff.iot.modbus.common.entity.ChannelConnectData;import com.takeoff.iot.modbus.common.enums.DeviceConnectEnum;import com.takeoff.iot.modbus.common.utils.JudgeEmptyUtils;import com.takeoff.iot.modbus.common.utils.SpringContextUtil;import org.springframework.context.ApplicationContext;
import io.netty.channel.Channel;import io.netty.channel.ChannelDuplexHandler;import io.netty.channel.ChannelHandlerContext;import lombok.extern.slf4j.Slf4j;
@Slf4jpublic class MiiExceptionHandler extends ChannelDuplexHandler { private ApplicationContext getApplicationContext = SpringContextUtil.applicationContext; public MiiExceptionHandler(){ }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel channel = ctx.channel(); if(!JudgeEmptyUtils.isEmpty(channel.remoteAddress())){ String address = channel.remoteAddress().toString().substring(1,channel.remoteAddress().toString().length()); ChannelConnectData connectServerData = new ChannelConnectData(this, DeviceConnectEnum.ABNORMAL.getKey(), address); if(!JudgeEmptyUtils.isEmpty(connectServerData) && !JudgeEmptyUtils.isEmpty(getApplicationContext)){ getApplicationContext.publishEvent(connectServerData); } //由Tail节点对异常进行统一处理 if(cause instanceof RuntimeException){ log.info("处理业务异常:"+channel.remoteAddress()); } super.exceptionCaught(ctx, cause); } }}
![]()
![]()
主要通过spring的发布时间监听来处理,增加连接监听器ChannelConnectListener。
![]()
package com.takeoff.iot.modbus.test.listener;
import com.takeoff.iot.modbus.common.entity.ChannelConnectData;import com.takeoff.iot.modbus.common.utils.JudgeEmptyUtils;import org.springframework.context.event.EventListener;import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Slf4j@Componentpublic class ChannelConnectListener {
@EventListener public void handleReceiveDataEvent(ChannelConnectData data) { if(JudgeEmptyUtils.isEmpty(data.getDeviceConnect())){ log.info("设备连接状态码:"+data.getDeviceConnect()+" ---> "+data.getConnectMsg()); } }}