详解RocketMQ 顺序消费机制
摘要:顺序消息是指对于一个指定的 Topic ,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。
本文分享自华为云社区《RocketMQ 顺序消费机制》,作者: 勇哥java实战分享 。
顺序消息是指对于一个指定的 Topic ,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。
顺序消息分为分区顺序消息和全局顺序消息。
1、分区顺序消息
对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。
- 适用场景:适用于性能要求高,以 Sharding Key 作为分区字段,在同一个区块中严格地按照先进先出(FIFO)原则进行消息发布和消费的场景。
- 示例:电商的订单创建,以订单 ID 作为 Sharding Key ,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。
2、全局顺序消息
对于指定的一个 Topic ,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。
- 适用场景:适用于性能要求不高,所有的消息严格按照 FIFO 原则来发布和消费的场景。
- 示例:在证券处理中,以人民币兑换美元为 Topic,在价格相同的情况下,先出价者优先处理,则可以按照 FIFO 的方式发布和消费全局顺序消息。
全局顺序消息实际上是一种特殊的分区顺序消息,即 Topic 中只有一个分区,因此全局顺序和分区顺序的实现原理相同。
因为分区顺序消息有多个分区,所以分区顺序消息比全局顺序消息的并发度和性能更高。
消息的顺序需要由两个阶段保证:
- 消息发送
如上图所示,A1、B1、A2、A3、B2、B3 是订单 A 和订单 B 的消息产生的顺序,业务上要求同一订单的消息保持顺序,例如订单 A 的消息发送和消费都按照 A1、A2、A3 的顺序。
如果是普通消息,订单A 的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保持顺序,而顺序消息发送时 RocketMQ 支持将 Sharding Key 相同(例如同一订单号)的消息序路由到一个队列中。
- 消息消费
消费者消费消息时,需要保证消息消费顺序和存储顺序一致,最终实现消费顺序和发布顺序的一致。
我们知道负载均衡服务是客户端开始消费的起点。在负载均衡阶段,并发消费和顺序消费并没有什么大的差别,最大的差别在于:向 Borker 申请锁 。
消费者根据分配的队列 messageQueue ,向 Borker 申请锁 ,如果申请成功,则会拉取消息,如果失败,则定时任务每隔20秒会重新尝试。
见上图,顺序消费核心流程如下:
1、 组装成消费对象
2、 将请求对象提交到消费线程池
和并发消费不同的是,这里的消费请求包含消费快照 processQueue ,消息队列 messageQueue 两个对象,并不对消息列表做任何处理。
3、 消费线程内,对消费队列加锁
4、 从消费快照中取得待消费的消息列表
消费快照 processQueue 对象里,创建了一个红黑树对象 consumingMsgOrderlyTreeMap 用于临时存储的待消费的消息。
5、 执行消息监听器
执行监听器逻辑容易理解,消费快照的消费锁 consumeLock的作用是:防止 Rebalance 线程把当前消费的 MessageQueue 对象移除掉。
6、 处理消费结果
消费成功时,首先计算需要提交的偏移量,然后更新本地消费进度。
消费失败时,分两种场景:
- 假如已消费次数小于最大重试次数,则将放入对象 consumingMsgOrderlyTreeMap 用例临时存储的待消费的消息,重新加入到消费快照红黑树msgTreeMap中,然后使用定时任务尝试重新消费。
- 假如已消费次数大于等于最大重试次数,则将失败消息发送到 Broker ,Broker 接收到消息后,会加入到死信队列里 , 最后计算需要提交的偏移量,然后更新本地消费进度。
我们做一个关于顺序消费的总结:
- 顺序消费需要由两个阶段消息发送和消息消费协同配合,底层支撑依靠的是 RocketMQ 的存储模型;
- 顺序消费服务启动后,通过三把锁的机制,消息队列 messageQueue 的数据都会被消费者实例单线程的执行消费;
- 假如消费者扩容,消费者重启,或者 Broker 宕机 ,顺序消费也会有一定几率较短时间内乱序,所以消费者的业务逻辑还是要保障幂等。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
如何用ReadWriteLock实现一个通用的缓存中心?
摘要:在并发场景中,Java SDK中提供了ReadWriteLock来满足读多写少的场景。 本文分享自华为云社区《【高并发】基于ReadWriteLock开了个一款高性能缓存》,作者:冰 河。 写在前面 在实际工作中,有一种非常普遍的并发场景:那就是读多写少的场景。在这种场景下,为了优化程序的性能,我们经常使用缓存来提高应用的访问性能。因为缓存非常适合使用在读多写少的场景中。而在并发场景中,Java SDK中提供了ReadWriteLock来满足读多写少的场景。本文我们就来说说使用ReadWriteLock如何实现一个通用的缓存中心。 本文涉及的知识点有: 读写锁 说起读写锁,相信小伙伴们并不陌生。总体来说,读写锁需要遵循以下原则: 一个共享变量允许同时被多个读线程读取到。 一个共享变量在同一时刻只能被一个写线程进行写操作。 一个共享变量在被写线程执行写操作时,此时这个共享变量不能被读线程执行读操作。 这里,需要小伙伴们注意的是:读写锁和互斥锁的一个重要的区别就是:读写锁允许多个线程同时读共享变量,而互斥锁不允许。所以,在高并发场景下,读写锁的性能要高于互斥锁。但是,读写锁的写操作是...
- 下一篇
倒计时5天|快来开源之夏 2023 递上你的项目申请!
时至5月底,开源之夏 2023 学生报名也进入了倒计时阶段!还未提交申请书的你赶紧行动起来吧,一起加入今年的开源之旅! 学生报名将于6月3日本周六15点截止! 项目申请书提交将于6月4日本周日18点截止! 报名地址:https://summer-ospp.ac.cn 报名要求:年满 18 周岁在校学生。 报名方式:点击官网右上角【学生登录】按钮,注册并提交个人资料,资料通过组委会审核后可进行项目申请,申请及审核流程均须登录后在系统内完成! 学生指南: https://summer-ospp.ac.cn/help/student/ #项目申请 1. 查看官网项目列表,选择自己感兴趣的项目任务; 2. 通过项目详情中的联络方式与导师沟通,进一步了解项目内容和要求; 3. 按要求准备项目申请材料; 4. 在系统内提交项目申请并排序。 注意事项: 1. 一名学生最多可以提交 3 个项目的申请书,但最终最多只能中选承担 1 个项目,每个项目最多由1名学生承担。 2. 为避免截止日当天因集中提交而导致网络拥堵,建议提前一天完成。 项目列表: https://summer-ospp.ac.cn/o...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2整合Redis,开启缓存,提高访问速度
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- MySQL8.0.19开启GTID主从同步CentOS8
- Mario游戏-低调大师作品
- Linux系统CentOS6、CentOS7手动修改IP地址
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7安装Docker,走上虚拟化容器引擎之路
- Docker快速安装Oracle11G,搭建oracle11g学习环境