smart-socket实战:玩转心跳消息
一、背景
在通信中设计的心跳消息,通常是为了检查网络链路是否正常。虽然TCP协议提供keep-alive机制,但需要在链路空闲2小时后才触发检测,这显然对业务非常不友好。当存在大量连接异常,而服务端却需要等2个小时后才感知到的时候,有限的系统资源会被逐渐耗尽,最终无法为新连接请求继续提供服务。
二、原理
要解决此类问题,业界的普遍做法是在应用层加入心跳机制。心跳消息可以是单向心跳也可以是双向心跳,所谓单向心跳表示由服务端或者客户端的其中一方主动发送心跳请求消息,而另一方返回响应消息(如下图)。双向心跳表示服务端与客户端相互发送心跳请求和响应。因为无论何种类型,实现方案都是一样的,本文以单向心跳为例给大家做讲解。
三、方案
心跳消息通常是周期性的发送,或者是在链路空闲一定时长后触发。如果经历几个周期后都未收到响应,则可以视为链路异常。此时可以继续尝试发送心跳,也可以执行告警并断开连接。
在 smart-socket 中我们提供了现成的心跳插件 HeartPlugin,可以很方便的实现心跳。本文是假定读者朋友对 smart-socket 已有了初步的了解,所以不会涉及 smart-socket 的基础使用,重点描述如何在服务中集成心跳插件。
3.1 HeartPlugin插件概述
3.1.1 心跳策略
在HeartPlugin中有三种心跳策略可供选择,通过选择不同的构造方案确定。
- HeartPlugin(int heartRate, TimeUnit timeUnit)
heartRate 表示心跳消息的发送频率;timeUnit 表示 heartRate 的数值单位。例如:heartRate=3,timeUnit=TimeUnit.SECONDS,表示每 3秒钟发送一次心跳。heartRate=2000,timeUnit=TimeUnit.MILLISECONDS,表示每 2秒钟发送一次心跳。该策略为周期性发送心跳消息,无论对方是否返回响应。 - HeartPlugin(int heartRate, int timeout, TimeUnit unit)
该构造方法相较前一个多出一个参数:timeout(过期时间),必须大于heartRate。如果在timeout时长内发送的心跳消息都没有收到响应消息,则视为链路异常并且该链路会被关闭,释放资源。 - HeartPlugin(int heartRate, int timeout, TimeUnit timeUnit, TimeoutCallback timeoutCallback)
该构造方法支持指定超时回调策略 timeoutCallback,其实上一个构造方法就是设置了超时断链策略。如果不满足业务所需,用户可按需定义。
3.1.2 心跳的识别与触发
心跳策略确定好后,下一步就是如何去发送心跳消息,以及如何识别收到的消息是否为响应消息。在 HeartPlugin 中已经定义了这两个接口,需要开发人员去实现处理逻辑:
- sendHeartRequest
发送心跳。HeartPlugin 在判断某个连接需要触发心跳后,会执行该方法。用户需要在该方法中实现心跳消息的编码并输出数据。public void sendHeartRequest(AioSession session) throws IOException{ WriteBuffer writeBuffer = session.writeBuffer(); byte[] heartBytes = "heart_req".getBytes(); writeBuffer.writeInt(heartBytes.length); writeBuffer.write(heartBytes); writeBuffer.flush(); }
- isHeartMessage
请求消息识别。true:表示本次收到的是心跳消息(请求/响应);false:其他业务消息,交由MessageProcessor#processor处理。public boolean isHeartMessage(AioSession session, String msg) { //心跳请求消息,返回响应 if("heart_req".equals(msg)){ try { WriteBuffer writeBuffer = session.writeBuffer(); byte[] heartBytes = "heart_rsp".getBytes(); writeBuffer.writeInt(heartBytes.length); writeBuffer.write(heartBytes); writeBuffer.flush(); }catch (Exception e){ } return true; } //是否为心跳响应消息 return "heart_rsp".equals(msg); }
3.2 代码演示
3.2.1 服务端
public class HeartServer { private static final Logger LOGGER = LoggerFactory.getLogger(HeartServer.class); public static void main(String[] args) throws IOException { //定义消息处理器 AbstractMessageProcessor<String> processor = new AbstractMessageProcessor<String>() { @Override public void process0(AioSession<String> session, String msg) { LOGGER.info("收到客户端:{}消息:{}", session.getSessionID(), msg); } @Override public void stateEvent0(AioSession<String> session, StateMachineEnum stateMachineEnum, Throwable throwable) { switch (stateMachineEnum) { case SESSION_CLOSED: LOGGER.info("客户端:{} 断开连接", session.getSessionID()); break; } } }; //注册心跳插件:每隔1秒发送一次心跳请求,5秒内未收到消息超时关闭连接 processor.addPlugin(new HeartPlugin<String>(1, 5, TimeUnit.SECONDS) { @Override public void sendHeartRequest(AioSession session) throws IOException { WriteBuffer writeBuffer = session.writeBuffer(); byte[] heartBytes = "heart_req".getBytes(); writeBuffer.writeInt(heartBytes.length); writeBuffer.write(heartBytes); writeBuffer.flush(); } @Override public boolean isHeartMessage(AioSession session, String msg) { //心跳请求消息,返回响应 if ("heart_req".equals(msg)) { try { WriteBuffer writeBuffer = session.writeBuffer(); byte[] heartBytes = "heart_rsp".getBytes(); writeBuffer.writeInt(heartBytes.length); writeBuffer.write(heartBytes); writeBuffer.flush(); } catch (Exception e) { } return true; } //是否为心跳响应消息 if ("heart_rsp".equals(msg)) { LOGGER.info("收到来自客户端:{} 的心跳响应消息", session.getSessionID()); return true; } return false; } }); //启动服务 AioQuickServer<String> server = new AioQuickServer<>(8888, new StringProtocol(), processor); server.start(); } }
3.2.2 客户端
- client_1:接受服务端的心跳消息,不做任何回应
- client_2:及时响应服务端的心跳消息
public class HeartClient { private static final Logger LOGGER = LoggerFactory.getLogger(HeartClient.class); public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { AbstractMessageProcessor<String> client_1_processor = new AbstractMessageProcessor<String>() { @Override public void process0(AioSession<String> session, String msg) { LOGGER.info("client_1 收到服务端消息:" + msg); } @Override public void stateEvent0(AioSession<String> session, StateMachineEnum stateMachineEnum, Throwable throwable) { LOGGER.info("stateMachineEnum:{}", stateMachineEnum); } }; AioQuickClient<String> client_1 = new AioQuickClient<>("localhost", 8888, new StringProtocol(), client_1_processor); client_1.start(); AbstractMessageProcessor<String> client_2_processor = new AbstractMessageProcessor<String>() { @Override public void process0(AioSession<String> session, String msg) { LOGGER.info("client_2 收到服务端消息:" + msg); try { if ("heart_req".equals(msg)) { WriteBuffer writeBuffer = session.writeBuffer(); byte[] heartBytes = "heart_rsp".getBytes(); writeBuffer.writeInt(heartBytes.length); writeBuffer.write(heartBytes); LOGGER.info("client_2 发送心跳响应消息"); } } catch (Exception e) { e.printStackTrace(); } } @Override public void stateEvent0(AioSession<String> session, StateMachineEnum stateMachineEnum, Throwable throwable) { LOGGER.info("stateMachineEnum:{}", stateMachineEnum); } }; AioQuickClient<String> client_2 = new AioQuickClient<>("localhost", 8888, new StringProtocol(), client_2_processor); client_2.start(); } }
3.2.3观察控制台
服务端
客户端
总结
本文围绕着心跳原理作了简单的实践分享。现实场景中如果对接的设备数量高达几万,甚至十几万,本文的心跳方案是否依旧适用,欢迎一起交流讨论。
本文涉及到的示例代码可从smart-socket仓库中下载

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
近期国内外重大数据泄露事件
即将开播:4月29日,民生银行郭庆谈商业银行金融科技赋能的探索与实践 2020年还未过半,“数据泄露”这样的字眼却显得异常活跃。全球各地深受数据泄露事件的困扰,同时也造成了重大损失。 根据IBM最新的数据泄露年度成本研究,平均数据泄露成本现在高达392万美元。这些费用在过去五年里增加了12%。据FireEye估计,不到一半的组织准备好面对网络攻击或数据泄露。 下面,我们来看看近期国内外发生的数据泄露和网络攻击事件。 一、国内数据泄露事件 高校学生信息泄露事件 近日,河南财经政法大学、西北工业大学明德学院、重庆大学城市科技学院等高校的数千名学生发现,自己的个人所得税App上有陌生公司的就职记录。税务人员称,很可能是学生信息被企业冒用,以达到偷税的目的。 此外,有类似遭遇的还包括湖北武汉、山东青岛、安徽滁州等多地的高校学生。企业冒用大学生信息偷税俨然成为行业潜规则,而受害的大学生因无就业经验,往往对此难以察觉,维权更是困难重重。 医院名单泄露事件 4月16日11时17分,有当地市民在胶州政务网反应,微信朋友圈中流传着出入胶州中心医院的数千人名单,涉及相关人员个人信息,已严重影响个人生活,并...
- 下一篇
JVM 对象分配过程
对象分配过程 1)依据逃逸分析,判断是否能栈上分配? 如果可以,使用标量替换方式,把对象分配到VM Stack中。如果 线程销毁或方法调用结束后,自动销毁,不需要 GC 回收器 介入。 否则,继续下一步。 2)判断是否大对象? 如果是,直接分配到堆上 Old Generation 老年代上。如果对象变为垃圾后,由老年代GC 收集器(比如 Parallel Old, CMS, G1)回收。 否则,继续下一步。 3)判断是否可以在 TLAB中分配? 如果是,在 TLAB中分配堆上Eden区。 否则,在 TLAB外堆上的Eden区分配。 栈上分配 本质上是JVM提供的一个优化技术。 基本思想:将线程私有的对象打散分配在栈 VM Stack上 优点: 可以在函数调用结束后自行销毁对象,不需要垃圾回收器的介入,有效避免垃圾回收带来的负面影响 栈上分配速度快,提高系统性能 局限性: 栈空间小,对于大对象无法实现栈上分配 技术基础: 逃逸分析、标量替换 什么是逃逸分析? 关于 Java 逃逸分析的定义: 逃逸分析(Escape Analysis)简单来讲就是,Java Hotspot 虚拟机可以分析...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS8编译安装MySQL8.0.19
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Hadoop3单机部署,实现最简伪集群