您现在的位置是:首页 > 文章详情

RocketMQ源码:通信协议设计及编解码

日期:2018-09-07点击:391

    本文主要分析RocketMQ通信协议的设计。RocketMQ设计了自己的一个通信协议,用于消息内容和二进制格式之间的转换。

    RocketMQ的版本为:4.2.0 release。

一.通信协议的格式

200cac06142a29a1dd4b0328043f2a17cbe.jpg

    1.length:4字节整数,二三四部分长度总和;

    2.header length:4字节整数,第三部分header data长度;

    3.header data:存放Json序列化的数据;

    4.body data:应用自定义二进制序列化的数据。


二.消息的编码过程

    消息的编码是在 RemotingCommand 中 encode 方法中完成的:

public ByteBuffer encode() { // 1> header length size int length = 4; // 2> header data length byte[] headerData = this.headerEncode(); length += headerData.length; // 3> body data length if (this.body != null) { length += body.length; } ByteBuffer result = ByteBuffer.allocate(4 + length); // 1.先放入消息的总大小 result.putInt(length); // 2.再放入头部的长度 result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // 3.接着放入头部数据 result.put(headerData); // 4.最后放入消息体的数据 if (this.body != null) { result.put(this.body); } result.flip(); return result; }
wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw== wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==

    对于头部数据 - header data 的编码,在 RemotingCommand 中 headerEncode 方法处理:

private byte[] headerEncode() { this.makeCustomHeaderToNet(); if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) { return RocketMQSerializable.rocketMQProtocolEncode(this); } else { return RemotingSerializable.encode(this); } }
wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw== wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==

    还有一个细节的地方,保存 header length 的时候,经过了一个 markProtocolType 的处理,作用是将RPC类型和headerData长度编码放到一个byte[4]数组中:

 public static byte[] markProtocolType(int source, SerializeType type) { byte[] result = new byte[4]; result[0] = type.getCode(); result[1] = (byte) ((source >> 16) & 0xFF); result[2] = (byte) ((source >> 8) & 0xFF); result[3] = (byte) (source & 0xFF); return result; }
wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw== wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==

    记住这个方法,后面从消息数据解码消息头长度的时候,就不会看的很迷茫。


三.消息的解码过程

    消息的解码是在类 RemotingCommand 中 decode方法中完成的:

public static RemotingCommand decode(final ByteBuffer byteBuffer) { int length = byteBuffer.limit();// 获取byteBuffer的总长度 int oriHeaderLen = byteBuffer.getInt();// 1.获取前4个字节,组装int类型,该长度为总长度 图中 length int headerLength = getHeaderLength(oriHeaderLen);// length & 0xFFFFFF 获取消息头的长度,与运算,编码时候的长度即为24位 byte[] headerData = new byte[headerLength];// 保存header data byteBuffer.get(headerData);// 2.从缓冲区中读取headerLength个字节的数据,这个数据就是报文头部的数据 RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); int bodyLength = length - 4 - headerLength;// 报文体的数据,减去了第二、三部分的长度 byte[] bodyData = null; if (bodyLength > 0) { bodyData = new byte[bodyLength]; byteBuffer.get(bodyData);// 获取消息体的数据 } cmd.body = bodyData; return cmd; }
wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw== wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==

    对于头部数据 - header data 的解码,在 RemotingCommand 中 headerDecode 方法处理:

private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) { switch (type) { case JSON: RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class); resultJson.setSerializeTypeCurrentRPC(type); return resultJson; case ROCKETMQ: RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData); resultRMQ.setSerializeTypeCurrentRPC(type); return resultRMQ; default: break; } return null; }
wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw== wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==

    解码出 header length - 消息头长度,在 getHeaderLength 中:

public static int getHeaderLength(int length) { return length & 0xFFFFFF;// 为什么是和高位的24位与,可以参考编码时的方法:markProtocolType }
wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw== wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==


    通过代码分析,发现RocketMQ的协议设计不是很复杂,只要我们耐心一步步跟进去,看到最里面的代码。"哦,原来是这样!"


原文链接:https://yq.aliyun.com/articles/637255
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章