深入协议层:tlmqtt 如何通过自定义编解码器实现高性能 MQTT Broker
tlmqtt 是一款基于 Java 开发、底层依赖 Netty 和 Project Reactor 的完全开源免费的高性能 MQTT Broker。它提供完整的 MQTT 协议解析、QoS 0/1/2 消息支持、自定义消息存储、可扩展的认证机制以及数据桥接功能。
MQTT 编解码:深入控制与理解
在分析众多开源 MQTT Broker
实现时,发现绝大多数(约 99%)都直接使用 Netty
提供的编解码器,如下所示:
pipeline.addLast(MqttEncoder.INSTANCE); pipeline.addLast(new MqttDecoder(maxBytesInMessage));
这种方式让开发者无需关注协议解析细节,专注于业务逻辑开发,是其显著优势。然而,它也带来了两个关键限制:
- 高度依赖
Netty
: 扩展性和灵活性受限于Netty
的实现 - 协议理解不足: 开发者容易停留在“知其然”层面,对
CONNECT
、PUBLISH
等报文的具体结构和解析过程缺乏深入理解
tlmqtt 选择了自定义编解码器的实现路径,对 MQTT
消息报文进行逐步解析。这为我们提供了更深入的控制、灵活性和扩展性,同时也是深入理解MQTT
协议细节的实践。
Netty 基础与 MQTT 协议的挑战
Java
高性能网络开发离不开 Netty
。它提供了多种开箱即用的编解码器,如固定长度、分隔符和基于长度域的帧解码器。在实现自定义编解码器之前,必须理解 MQTT
协议对消息长度的独特定义方式 剩余长度编码规则:
- 单个字节可表示 0 到 127 的值。
- 大于 127 的值处理如下: - 每个字节的 低 7 位 (bits 0-6) 用于编码数据。 - 最高位 (bit 7) 作为标识位:1 表示还有后续字节,0 表示结束。
- 剩余长度最多由 四个字节 表示。
显然, MQTT
的这种变长编码方式与 Netty
内置的标准长度域解码器(通常是固定字节数表示长度)并不完全匹配。因此,自定义编解码器成为必然选择。
解码器实现:从字节流到消息对象
自定义解码器需继承 Netty
的 ByteToMessageDecoder
类,核心任务是将接收到的 ByteBuf
字节流转换为业务逻辑所需的 AbstractTlMessage
对象(及其各种具体子类,如 TlMqttConnectReq
)。 核心 decode
方法流程如下:
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // 1. 检查基本长度:可读字节数小于2(固定头最小长度),等待更多数据 if (in.readableBytes() < MIN_LENGTH) { return; } // 2. 标记当前读指针位置,以便后续数据不足时回退 in.markReaderIndex(); // 3. 读取第1字节:包含消息类型(高4位)和标志位(低4位) short firstByte = in.readUnsignedByte(); // 4. 解码剩余长度 (变长编码) int remainingLength = decodeRemainingLength(in); // 5. 检查载荷数据是否完整到达 (剩余长度指的就是载荷长度) if (in.readableBytes() < remainingLength) { in.resetReaderIndex(); // 数据不足,重置读指针,等待后续数据 return; } // 6. 数据完整:读取载荷部分到临时ByteBuf ByteBuf payloadBuf = in.readBytes(remainingLength); // 可选:打印原始报文(十六进制) - TlLog.logger("mqtt raw hex", payloadBuf); try { // 7. 提取消息类型 (右移4位取高4位) int messageType = firstByte >> Constant.MESSAGE_BIT; MqttMessageType mqttType = MqttMessageType.valueOf(messageType); // 8. 根据消息类型,分派给对应的具体解码器构建请求对象 AbstractTlMessage req = switch (mqttType) { case CONNECT -> connectDecoder.build(payloadBuf, firstByte, remainingLength); case DISCONNECT -> disConnectDecoder.build(payloadBuf, firstByte, remainingLength); case PUBLISH -> publishDecoder.build(payloadBuf, firstByte, remainingLength); case PUBACK -> pubAckDecoder.build(payloadBuf, firstByte, remainingLength); case PUBREC -> pubRecDecoder.build(payloadBuf, firstByte, remainingLength); case PUBREL -> pubRelDecoder.build(payloadBuf, firstByte, remainingLength); case PUBCOMP -> pubCompDecoder.build(payloadBuf, firstByte, remainingLength); case SUBSCRIBE -> subscribeDecoder.build(payloadBuf, firstByte, remainingLength); case UNSUBSCRIBE -> unSubscribeDecoder.build(payloadBuf, firstByte, remainingLength); case PINGREQ -> heartBeatDecoder.build(payloadBuf, firstByte, remainingLength); default -> throw new IllegalArgumentException("Unknown MQTT message type: " + mqttType); }; out.add(req); // 9. 将解析好的消息对象加入输出列表,传递给后续Handler } finally { payloadBuf.release(); // 10. 确保临时ByteBuf资源释放 } }
关键辅助方法:剩余长度解码 ( decodeRemainingLength
)
private int decodeRemainingLength(ByteBuf in) { int multiplier = 1; // 乘数因子 (128^0, 128^1, ...) int value = 0; // 累积计算出的剩余长度值 byte encodedByte; do { encodedByte = in.readByte(); // 读取一个编码字节 value += (encodedByte & 0x7F) * multiplier; // 取低7位数据并乘以当前乘数 multiplier *= 128; // 乘数递增 (128^1, 128^2, ...) } while ((encodedByte & 0x80) != 0); // 检查最高位(标识位)是否为1 (还有后续字节) return value; }
根据解析出的消息类型,数据会被分派给对应的具体解码器(如 TlMqttConnectDecoder
)。这些解码器通常采用模块化设计,包含 decodeFixedHeader
(固定头)、 decodeVariableHeader
(可变头)和 decodePayload
(载荷)三个核心方法。 以 CONNECT
报文解码 ( TlMqttConnectDecoder
) 为例:
// 解码固定头 (相对简单,主要是类型和长度) TlMqttFixedHead decodeFixedHeader(int remainingLength) { TlMqttFixedHead fixedHead = new TlMqttFixedHead(); fixedHead.setMessageType(MqttMessageType.CONNECT); fixedHead.setLength(remainingLength); // 设置整个报文剩余长度 return fixedHead; } // 解码可变头 (包含协议名、版本、连接标志和保活时间) TlMqttConnectVariableHead decodeVariableHeader(ByteBuf buf) { TlMqttConnectVariableHead variableHead = new TlMqttConnectVariableHead(); // 1. 协议名 (通常是"MQTT") int protocolNameLen = buf.readUnsignedShort(); // 长度域 (2字节) variableHead.setProtocolNameLength(protocolNameLen); byte[] protocolNameBytes = new byte[protocolNameLen]; buf.readBytes(protocolNameBytes); String protocolName = new String(protocolNameBytes, StandardCharsets.UTF_8); // 显式指定字符集 // 2. 协议版本 (e.g., 4 for MQTT 3.1.1) short protocolVersion = buf.readUnsignedByte(); variableHead.setProtocolVersion(protocolVersion); // 3. 连接标志字节 (Connect Flags) - 关键! int connectFlags = buf.readUnsignedByte(); // 位运算解析各个标志位 variableHead.setReserved(connectFlags & 0x01); // Bit 0 (保留位,必须为0) variableHead.setCleanSession((connectFlags >> 1) & 0x01); // Bit 1 (Clean Session) int willFlag = (connectFlags >> 2) & 0x01; // Bit 2 (Will Flag) variableHead.setWillFlag(willFlag); variableHead.setWillQos((connectFlags >> 3) & 0x03); // Bits 3-4 (Will QoS: 0, 1, 2) variableHead.setWillRetain((connectFlags >> 5) & 0x01); // Bit 5 (Will Retain) variableHead.setPasswordFlag(((connectFlags >> 6) & 0x01) > 0); // Bit 6 (Password Flag) variableHead.setUsernameFlag(((connectFlags >> 7) & 0x01) > 0); // Bit 7 (Username Flag) // 4. 保活时间 (Keep Alive Timer - 秒) short keepAlive = buf.readShort(); variableHead.setKeepAlive(keepAlive); log.debug("解析【CONNECT】可变头: 协议名=[{}], 版本=[{}], CleanSession=[{}], " + "WillFlag=[{}], WillQos=[{}], WillRetain=[{}], 用户名标志=[{}], 密码标志=[{}], KeepAlive=[{}]", protocolName, protocolVersion, variableHead.getCleanSession(), willFlag, variableHead.getWillQos(), variableHead.getWillRetain(), variableHead.isUsernameFlag(), variableHead.isPasswordFlag(), keepAlive); return variableHead; } // 解码载荷 (内容由可变头中的标志位决定) TlMqttConnectPayload decodePayload(ByteBuf buf, TlMqttConnectVariableHead variableHead) { TlMqttConnectPayload payload = new TlMqttConnectPayload(); // 1. Client Identifier (必选) int clientIdLen = buf.readUnsignedShort(); byte[] clientIdBytes = new byte[clientIdLen]; buf.readBytes(clientIdBytes); payload.setClientId(new String(clientIdBytes, StandardCharsets.UTF_8)); // 2. Will Topic & Will Message (如果 Will Flag = 1) if (variableHead.getWillFlag() == 1) { int willTopicLen = buf.readUnsignedShort(); byte[] willTopicBytes = new byte[willTopicLen]; buf.readBytes(willTopicBytes); payload.setWillTopic(new String(willTopicBytes, StandardCharsets.UTF_8)); int willMessageLen = buf.readUnsignedShort(); byte[] willMessageBytes = new byte[willMessageLen]; buf.readBytes(willMessageBytes); payload.setWillMessage(new String(willMessageBytes, StandardCharsets.UTF_8)); } // 3. Username (如果 Username Flag = true) if (variableHead.isUsernameFlag()) { int usernameLen = buf.readUnsignedShort(); byte[] usernameBytes = new byte[usernameLen]; buf.readBytes(usernameBytes); payload.setUsername(new String(usernameBytes, StandardCharsets.UTF_8)); } // 4. Password (如果 Password Flag = true) if (variableHead.isPasswordFlag()) { // 使用VariableHead中的标志位判断 int passwordLen = buf.readUnsignedShort(); byte[] passwordBytes = new byte[passwordLen]; buf.readBytes(passwordBytes); payload.setPassword(new String(passwordBytes, StandardCharsets.UTF_8)); } log.debug("解析【CONNECT】载荷: clientId=[{}], willFlag=[{}], willQos=[{}], willTopic=[{}], username=[{}]", payload.getClientId(), variableHead.getWillFlag(), variableHead.getWillQos(), payload.getWillTopic(), payload.getUsername()); return payload; }
其他 MQTT
报文类型( PUBLISH
, SUBSCRIBE
, PUBACK
等)的解码逻辑遵循类似模式,具体实现可参考对应的解码器类。
解码完成后,会得到一个具体的请求对象(如 TlMqttConnectReq
)。该对象随后会被传递给专门处理该类型消息的 ChannelInboundHandler
,例如 TlMqttConnectHandler
:
public class TlMqttConnectHandler extends SimpleChannelInboundHandler<TlMqttConnectReq> { @Override protected void channelRead0(ChannelHandlerContext ctx, TlMqttConnectReq req) throws Exception { // 在此处实现CONNECT请求的核心业务逻辑: // 1. 认证 (用户名/密码校验) // 2. 会话管理 (新建或复用会话) // 3. 遗嘱消息处理 // 4. 构建并发送CONNACK响应 } }
编码器实现:从对象到网络字节流
tlmqtt 的编码器负责将业务逻辑中需要发送给客户端的消息对象(如 TlMqttConnack
, TlMqttPublish
等)序列化为符合 MQTT
协议规范的二进制数据。开发者只需操作这些对象即可:
// 业务逻辑中创建CONNACK响应对象 TlMqttConnack connack = TlMqttConnack.build(cleanSessionPresent, MqttConnectReturnCode.CONNECTION_ACCEPTED); // 通过通道管理器发送 channelManager.writeAndFlush(clientId, connack);
编码器(继承 Netty
的 MessageToByteEncoder
)则透明地处理对象到字节流的转换。 以 CONNACK
报文编码 ( TlMqttConnackEncoder
) 为例:
@ChannelHandler.Sharable // 标记为可共享,通常无状态 @Slf4j // 日志注解 public class TlMqttConnackEncoder extends MessageToByteEncoder<TlMqttConnack> { @Override protected void encode(ChannelHandlerContext ctx, TlMqttConnack connack, ByteBuf out) throws Exception { TlMqttFixedHead fixedHead = connack.getFixedHead(); TlMqttConnackVariableHead variableHead = connack.getVariableHead(); // 1. 固定头编码 byte fixedHeaderByte = (byte) (fixedHead.getMessageType().value() << 4); // 消息类型(高4位) + 保留位(低4位=0) out.writeByte(fixedHeaderByte); // 2. 剩余长度编码 (CONNACK固定为2字节) out.writeByte(2); // Remaining Length = 2 // 3. 可变头编码 out.writeByte(variableHead.getSessionPresent()); // Byte 1: Session Present Flag (0 或 1) out.writeByte(variableHead.getConnectReturnCode().getValue()); // Byte 2: Connect Return Code } }
贡献与反馈
欢迎通过以下方式参与项目共建:
- 提交 Issue:反馈 Bug 或提出功能建议
- 提交 PR:优化代码或新增功能(建议先创建 Issue 沟通方案)
- Star/Fork:支持项目持续发展
联系方式:
- 邮箱:2534798858@qq.com
- 项目地址:https://github.com/ZHSQJM/tlmqtt
- 项目地址:https://gitee.com/PiQiHenHaoDeGangTieXia/tlmqtt
tlmqtt致力于为物联网开发者提供轻量、高效的 MQTT 消息服务,期待您的加入! 🚀

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
三星计划投资 Perplexity AI,将其搜索技术深度整合至 Galaxy 设备
彭博社报道称,三星电子即将达成一项涉及面广泛的协议,其将投资 Perplexity AI Inc.,并将这家人工智能初创公司的搜索技术置于其设备的核心位置。 据知情人士透露,两家公司正在洽谈在即将推出的三星设备上预装 Perplexity 的应用程序和助手,并将 Perplexity 的搜索功能集成到三星网络浏览器中。 由于谈判是私下进行的,这些知情人士不愿透露姓名。知情人士表示,两家公司还讨论了将 Perplexity 的技术融入三星的 Bixby 虚拟助手。 三星计划最早在今年宣布与 Perplexity 的集成,目标是将该服务作为 Galaxy S26 系列手机的默认助手选项,该系列手机将于 2026 年上半年推出。不过,具体细节尚未最终确定,仍有可能发生变化。 知情人士表示,这家科技巨头预计还将是 Perplexity 新一轮融资的最大投资者之一。据彭博社报道,这家初创公司正就融资 5 亿美元、估值 140 亿美元进行深入洽谈。 此次广泛的合作可能有助于三星减少对谷歌的依赖,并为其与多家人工智能开发商合作铺平道路,类似于苹果公司针对其设备和服务的战略。对于 Perplexity...
- 下一篇
腾讯跨端框架 Kuikly 鸿蒙版正式开源
Kuikly 是腾讯开源的跨端开发框架,基于 Kotlin Multiplatform 技术构建,为开发者提供了技术栈更统一的跨端开发体验。 在 Android、iOS 开源基础上,本次开源鸿蒙平台支持和 Compose DSL 支持,进一步提升业务多端适配和鸿蒙开发效率。 据介绍,目前 Kuikly 鸿蒙版已接入腾讯多款业务,开发并上架鸿蒙 App,如QQ浏览器、腾讯新闻、搜狗输入法、全民K歌、自选股等。 在鸿蒙平台上,Kuiky 打开页面速度比 RN 快 6 倍: Kuikly 基于 Kotlin MultiPlatform(KMP)技术,它利用了 KMP 逻辑跨平台的能力,并抽象出通用的跨平台 UI 渲染接口,复用平台的 UI 组件,从而达到 UI 跨平台,具有轻量、高性能、可动态化等优点; Kuikly 包括“KuiklyUI”和“KuiklyBase”两部分。其中,KuiklyUI 支持业务使用自研 DSL 和 Compose DSL 进行 UI 跨端开发,采用轻量、原生渲染方式,支持页面级动态化;KuiklyBase 支持 UI 和 KMP 逻辑全面跨端的基础能力和设施,包...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Mario游戏-低调大师作品
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- 2048小游戏-低调大师作品
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS8安装Docker,最新的服务器搭配容器使用
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Docker安装Oracle12C,快速搭建Oracle学习环境