每日一博 | RocketMQ 事务消息初体验
事务消息是 RocketMQ 的高级特性之一 。这篇文章,笔者会从应用场景、功能原理、实战例子三个模块慢慢为你揭开事务消息的神秘面纱。
1 应用场景
举一个电商场景的例子:用户购物车结算时,系统会创建支付订单。
用户支付成功后支付订单的状态会由未支付修改为支付成功,然后系统给用户增加积分。
通常我们会使用普通消费方案,该方案能够发挥 MQ 的优势:异步和解耦 , 同时架构设计非常简单。
- 用户购物车结算时,系统创建支付订单;
- 支付成功后,更新订单的状态从未支付修改为支付成功;
- 发送一条普通消息到消息队列服务端;
- 积分服务消费消息,添加积分记录。
但该方案有个非常直观的缺点:容易出现不一致的现象。
-
假如先发送消息,后修改订单状态,消息发送成功,订单没有执行成功,需要回滚整个事务(订单数据事务回滚,积分服务消费时,需要先反查事务状态,若事务提交,才插入积分记录)。
-
假如先修改订单状态,后发送消息,订单状态修改成功,但消息发送失败,需要补偿操作才能保持最终一致。
-
假如先修改订单,后发送消息,订单状态修改成功,但消息发送超时,此时无法判断需要回滚订单还是提交订单变更。
我们看到,为了完善普通消费方案,业务层还需要做到两点:补偿机制和提供事务状态查询接口。
要做到这两点,难不难呢?
不难,但是业务层代码会比较混乱,更优的方案还是得从中间件层面解决。
2 功能原理
RocketMQ 事务消息是支持在分布式场景下保障消息生产和本地事务的最终一致性。交互流程如下图所示:
1、生产者将消息发送至 Broker 。
2、Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
3、生产者开始执行本地事务逻辑。
4、生产者根据本地事务执行结果向服务端提交二次确认结果( Commit 或是 Rollback ),Broker 收到确认结果后处理逻辑如下:
- 二次确认结果为 Commit :Broker 将半事务消息标记为可投递,并投递给消费者。
- 二次确认结果为 Rollback :Broker 将回滚事务,不会将半事务消息投递给消费者。
5、在断网或者是生产者应用重启的特殊情况下,若 Broker 未收到发送者提交的二次确认结果,或 Broker 收到的二次确认结果为 Unknown 未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
- 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
笔者认为事务消息的精髓在于:
- 本地事务执行成功,消费者才能消费事务消息;
- 消息回查本身就是补偿机制的实现,事务生产者需提供了事务状态查询接口。
3 实战例子
为了便于大家理解事务消息 ,笔者新建一个工程用于模拟支付订单创建、支付成功、赠送积分的流程。
首先,我们创建一个真实的订单主题:order-topic 。
然后在数据库中创建三张表 订单表、事务日志表、积分表。
最后我们创建一个 Demo 工程,生产者模块用于创建支付订单、修改支付订单成功,消费者模块用于新增积分记录。
接下来,我们展示事务消息的实现流程。
<strong style="font-size: 15px;line-height: inherit;color: black;">1、创建支付订单</strong>
调用订单生产者服务创建订单接口 ,在 t_order 表中插入一条支付订单记录。
<strong style="font-size: 15px;line-height: inherit;color: black;">2、调用生产者服务修改订单状态接口</strong>
接口的逻辑就是执行事务生产者的 sendMessageInTransaction
方法。
生产者端需要配置事务生产者和事务监听器。
发送事务消息的方法内部包含三个步骤 :
事务生产者首先发送半事务消息,发送成功后,生产者才开始执行本地事务逻辑。
事务监听器实现了两个功能:执行本地事务和供 Broker 回查事务状态 。
执行本地事务的逻辑内部就是执行 orderService.updateOrder
方法。
方法执行成功则返回 LocalTransactionState.COMMIT_MESSAGE
, 若执行失败则返回 LocalTransactionState.ROLLBACK_MESSAGE
。
需要注意的是: orderService.updateOrder
方法添加了事务注解,并将修改订单状态和插入事务日志表放进一个事务内,避免订单状态和事务日志表的数据不一致。
最后,生产者根据本地事务执行结果向 Broker 提交二次确认结果。
Broker 收到生产者确认结果后处理逻辑如下:
- 二次确认结果为 Commit :Broker 将半事务消息标记为可投递,并投递给消费者。
- 二次确认结果为 Rollback :Broker 将回滚事务,不会将半事务消息投递给消费者。
<strong style="font-size: 15px;line-height: inherit;color: black;">3、积分消费者消费消息,添加积分记录</strong >
当 Broker 将半事务消息标记为可投递时,积分消费者就可以开始消费主题 order-topic 的消息了。
积分消费者服务,我们定义了消费者组名,以及订阅主题和消费监听器。
在消费监听器逻辑里,幂等非常重要
。当收到订单信息后,首先判断该订单是否有积分记录,若没有记录,才插入积分记录。
而且我们在创建积分表时,订单编号也是唯一键,数据库中也必然不会存在相同订单的多条积分记录。
4 总结
RocketMQ 事务消息是支持在分布式场景下保障消息生产和本地事务的最终一致性。
编写一个实战例子并不复杂,但使用事务消息时需要注意如下三点:
1、事务生产者和消费者共同协作才能保证业务数据的最终一致性;
2、事务生产者需要实现事务监听器,并且保存事务的执行结果(比如事务日志表) ;
3、消费者要保证幂等。消费失败时,通过重试、告警+人工介入等手段保证消费结果正确。
本文涉及到的工程源码,笔者已上传到 Github ,感兴趣的同学可以了解一下,若有疑问直接加笔者好友,一起交流技术,一起成长。
笔者会在后续的文章里,详细解析事务消息的实现原理,敬请期待。
实战代码地址:
如果我的文章对你有所帮助,还请帮忙点赞、在看、转发一下,你的支持会激励我输出更高质量的文章,非常感谢!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
推动开源人才培养,openKylin 高校站授牌仪式成功举行!
7月5日,由麒麟软件主办的2023操作系统产业大会正式拉开帷幕。在大会开源建设与人才生态发展分论坛上,openKylin高校站授牌仪式成功举行!开放原子开源基金会运营部部长李博,openKylin社区理事长朱晨、秘书长余杰,上海交通大学网络空间学院副院长邱卫东等20+高校领导出席会议。 此次参与授牌的高校有: \ 授牌现场 openKylin高校站是什么 openKylin高校站是openKylin社区在高校建立的合作站点,通过推进高校开源技术的产学研融合,构建起学生Linux的基础知识架构,再通过开源活动+项目实践的方式,为学生积累实践经验,并对职业规划等方面进行详细讲解,通过理论+实践的形式,培养卓越创新能力的开源人才。 当前,我国已成为开源参与者数量全球排名第二、增长速度最快的国家,下一步就是需要加快推进开源进校园,积极宣传和推广开源文化,探索开源教学模式。夯实好开源发展的人才基础,对提升软件源头创新和供给能力具有重要意义和现实意义。因此,围绕人才培养、联合研究、学术交流三条主线,openKylin社区启动了开源高校站项目。 openKylin高校站发展现状 截止目前,已有北...
- 下一篇
WeMQ —— 物联网调试管理管理平台
WeMQ是一款基于SpringBoot开发的一款物联网设备调试管理平台,其功能主要是对客户MQTT调试页面进行集中管理(连接信息、发送信息),系统管理员可在后台添加客户和调试页面,并设置调试页面的连接信息、发送消息和对应的发送按钮文字,并设置分享链接以及页面的开启状态,用户可通过分享链接打开配置好对应信息的页面,实现对自己设备的管理调试。 技术选型 1. 系统环境 Java 8 Servlet 3.0 Apache Maven 3 2. 主框架 Spring Boot 2.7.x Spring Framework 5.3.x Spring MVC 5.3.x 3. 持久层 Mybatis 3.5.x Alibaba Druid 1.2.x Hibernate Validation 6.0.x Java MySQL Connector 8.0.x 4. 视图层 Thymeleaf 3.x Bootstrap 5.x 5. 工具类 Apache Commons Hutool 5.x 主要功能 系统管理员管理 客户管理 调试页面管理 对接Nmqs(NicholasLD's Message Q...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Red5直播服务器,属于Java语言的直播服务器
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS7安装Docker,走上虚拟化容器引擎之路
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS关闭SELinux安全模块
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- Linux系统CentOS6、CentOS7手动修改IP地址