腾讯和 StreamNative 开源 RoP:Apache Pulsar 支持原生 RocketMQ 协议
我们很高兴地宣布 StreamNative 与腾讯云中间件团队联合宣布开源 RoP!
RoP 将 RocketMQ 协议处理插件引入 Pulsar broker,这样 Pulsar 能支持原生 RocketMQ 协议。
什么是 RoP?
与 KoP、MoP 和 AoP 相似,RoP 是一种可插拔的协议处理插件。
将 RoP 协议处理插件添加到现有 Pulsar 集群后,用户无需修改代码,便能将现有的 RocketMQ 应用程序和服务迁移到 Pulsar,同时还能使用 Pulsar 的强大功能,例如:
•计算与存储分离
•多租户
•跨地域复制
•分层分片
•轻量化计算框架 -- Pulsar Functions
•...
为什么开发 RoP?
Apache Pulsar 是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体。自 2016 年开源以来,Pulsar 已被广泛采用,并于 2018 年被指定为 Apache 顶级项目。
RocketMQ 是一款强大的开源分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。
Pulsar 和 RocketMQ 拥有广泛的用户群体和强劲的开发支持,全球许多头部公司都在使用这两种消息服务。同时,我们也收到了用户的需求,希望能在 Pulsar 与 RocketMQ 之间传输数据,并充分利用这两种消息系统的优势。
Apache Pulsar 通过对 Consumer 层的抽象,提供了队列和流两种消费模型的统一抽象。在 Client 与 Broker 的交互中,Pulsar 基于 Protobuf 的二进制协议,提供更高的性能和更低的延迟。除此之外,通过 Protobuf 协议,Pulsar 可以更容易地支持并实现多语言的客户端,比如:Java、CPP、Python 和 Go语言等客户端。
但是,对于使用其他消息传输协议编写的应用程序(例如,RocketMQ),由于使用的消息处理协议和 Pulsar 不同,如果 Pulsar 想要兼容 RocketMQ 协议,为了将 RocketMQ 的协议适配到 Pulsar 的消息协议层中,用户需要重写整个协议层,这给用户的迁移和切换带来了很大的成本。
为了解决这个问题,最直观的处理方式是使用类似 Pulsar Connector 的形式,将用户在 RocketMQ 中的现存数据通过 RocketMQ Wrapper 的方式导入到 Pulsar 集群,但是这需要业务端更改自己的业务代码逻辑,同时需要确保两边的数据能够保证一致,这给使用 RocketMQ 的用户带来了很大的技术挑战。所以,能否给用户提供一个开箱即用的迁移策略和方案并且用户无需做任何代码修改呢?这便是 RoP 诞生的最初目的。
怎样开发 RoP?
Apache Pulsar 在 PIP-41 中介绍了一种全新的接入方式。通过在 Broker 端暴露 Protocol Handler 插件,将 Netty 的 channel 和 Pulsar 的 Broker Service 对象暴露给用户。这允许用户直接操作和调用 Pulsar 中比较低阶的 API(例如:PersistentTopic 和 ManagerLedger)。基于这个协议,用户无需更改代码,只需将服务请求转发到 RoP 中,RoP 利用 Protocol Handler 的插件将用户的请求转发到 Pulsar 中即可。
RoP 架构
通过对比 Pulsar 和 RocketMQ 之间的协议可以发现,二者在消息处理的思路上有不少相似之处,比如这两种协议都包含如下操作:
•Topic Lookup: 所有 Clients 与任意 Broker 建立连接之前,会先去查找当前 Topic 的 Owner Broker。获取到对应的 metadata 之后,Clients 会与 Owner Broker 之间建立 TCP 连接进行数据的交互。
•Produce: Clients 与 Topic 所在的所有 Owner Broker 之间进行通信并将消息 append 到对应的分布式日志中。•Consume: Clients 与 Topic 所在的所有 Owner Broker 之间进行通信并从分布式日志中读取指定的消息。
•Offset: Producer 生产到 topic 中的消息会分配一个唯一的 offset,Pulsar 中使用 MessageID 来标识 offset。消费者可以通过 offset 去日志中获取指定位置的消息。
Apache Pulsar 的存储层使用了 Apache BookKeeper,Pulsar 相当于 BookKeeper 的 Client,通过调用 ManagerLedger 对象能够很容易的达到为分布式日志操作的目的。基于此,RoP 可以很好的将 RocketMQ 中对 commitLog 和 queueLog 的操作映射到 BookKeeper 中来。
RoP 概念
Offset 和 MessageID
在 RocketMQ 中,使用 offset 来标识消息的位置,当消息被生产到指定的 Topic 之后,会为每一个消息分配一个唯一的 offset;在 Pulsar 中,使用 MessageID 来唯一标识每条消息,每一个 MessageID 由三部分组成,ledgerID
、entryID
和 partitionID
。我们通过合理的划分将 messageID 和 offset 进行映射,来唯一标识 Topic 中的每一条消息。
Message
对于一条消息,RocketMQ 和 Pulsar 都包含消息的 headers 和 payload 等字段,通过对消息协议的解析,我们可以轻松的将 RocketMQ message 转换为 Pulsar 的 message 格式。为了更好的兼容 Tag 消息的功能,在消息协议的处理方面增加了 8 字节的特殊字段,用来区分该消息是否属于 tag 消息。
Topic Lookup
在 Pulsar 中,client 与 broker 建立连接之前,会根据当前传入的 Topic 执行 Lookup 操作,在 Broker 集群中寻找当前 Topic 所在的 Owner Broker,然后将该 Owner Broker 的地址返回并与 client 建立 TCP 连接,再进行数据交互。在 RocketMQ 中,client 与 broker 建立连接之前,会先处理 GET_ROUTEINTO_BY_TOPIC
命令,获取 topic 所在的路由信息后,建立对应的 TCP 连接,再进行数据交互。
如何使用 RoP?
目前,RoP 发布了 0.1.0 版本,你可以用过以下任一方式参与该项目:
•想上手试试?
如需下载 RoP 和查阅用户指南,点击查看 RoP README。无论是快速启动 standalone RoP 或在现有 Pulsar 集群中部署 RoP,都可轻松实现。
另外,为了方便快速使用并验证 RoP, RoP 示例 提供了 RocketMQ 的常见使用场景和用例,你可以直接使用这些代码示例验证服务。
•想解决问题?
如有任何问题,可以在 RoP GitHub repo 中 创建 issue 或添加机器人 StreamNative_BJ 加入微信群进行讨论。无论哪种方式,RoP 资深专家都随时在线。
•想参与贡献?
RoP 源码开放并托管在 GitHub 上。如需改进功能或修复 bug,欢迎提交 PR。
腾讯与 StreamNative 的合作
在此特别鸣谢腾讯云中间件团队张勇华、冉小龙、韩明泽、夏子承等同学的支持,以及 StreamNative 在架构设计以及方案的良好建议,共同推进了 RoP 项目的顺利落地。希望今后双方继续携手并进、砥砺前行,为消息服务贡献更多力量!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Apache Kafka 2.7.1 发布
Apache Kafka 2.7.1 已发布,这是一个 bugfix 版本,其中包括来自 45 个 JIRA 的修复和改进,还修复了部分严重的错误。 改进 [KAFKA-10852] - 优化 AlterIsr 使用方式,不再被限制 [KAFKA-12310] - 将 zookeeper 升级至3.5.9 [KAFKA-12400] - 升级 jetty 版本以修复 CVE-2020-27223 [KAFKA-12409] - ReplicaManager 中的 Leaking gauge Bugfix [KAFKA-10021] - 当读取到配置日志的末端时,现在会检查 fetch.max.wait.ms 是否大于 worker.sync.timeout.ms [KAFKA-10192] - Flaky test BlockingConnectorTest#testBlockInConnectorStop [KAFKA-10340] - 修复尝试为不存在的 topic 生成记录而不是永久挂起时,源连接器没有报告错误的问题 [KAFKA-10417] - 修复与 cogroup() 搭...
- 下一篇
每日一博 | 从一段 Dubbo 源码到 CPU 分支预测的一次探险之旅
每个时代,都不会亏待会学习的人。 大家好,我是 yes。 这次本来是打算写一篇 RocketMQ 相关文章的,但是被插队了,我也是没想到的,对了本号也有留言了哟。 说来也是巧最近在看 Dubbo 源码,然后发现了一处很奇怪的代码,于是就有了这篇文章,让我们来看一下这段代码,它属于 ChannelEventRunnable,这个 runnable 是 Dubbo IO 线程创建,将此任务扔到业务线程池中处理。 看到没,把 state == ChannelState.RECEIVED 拎出来独立一个 if,而其他的 state 还是放在 switch 里面判断。 我当时脑子里就来回扫描,想想这个到底有什么花头,奈何知识浅薄一脸懵逼。 于是就开始了一波探险之旅! 原来是 CPU 分支预测 遇到问题当然是问搜索引擎了,一般而言我会同时搜索各大引擎,咱这也不说谁比谁好,反正有些时候度娘还是不错的,比如这次搜索度娘给的结果比较靠前,google 较靠后。 一般搜索东西我都喜欢先在官网上搜,找不到了再放开搜,所以先这么搜 site:xxx.com key。 你看这就有了,完美啊! 我们先来看看官网的...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8安装Docker,最新的服务器搭配容器使用
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7安装Docker,走上虚拟化容器引擎之路
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8编译安装MySQL8.0.19