TDMQ RocketMQ 版事务消息原理解析
引言
在分布式架构系统中,确保跨服务应用的数据一致性始终是系统设计的核心挑战之一。TDMQ RocketMQ 版作为一款基于 Apache RocketMQ 构建的企业级消息中间件,凭借其高可用性和高可靠性特点,通过提供完善的事务消息机制,为这一难题提供了专业的解决方案。本文将结合核心源码,深入解析 RocketMQ 事务消息的实现原理,希望能帮助开发者去构建更健壮的分布式事务系统。
事务消息的概念与应用场景
事务消息是 RocketMQ 提供的一种高级特性消息,通过将二阶段提交的动作和本地事务绑定,来保障分布式场景下消息生产和本地事务的最终一致性,相比普通消息,主要是扩展了二次确认和本地事务状态回查补偿的机制。
在电商平台中,积分兑换商品这一常见功能就涉及到分布式事务,用户发起兑换后,可能涉及创建兑换订单、扣除用户积分、通知发货服务、扣减库存等一系列动作,此时就要保证订单服务和多个下游业务执行结果的最终一致性,如果积分扣除成功但订单创建失败,会导致用户积分被扣但未获得商品,如果订单创建成功但积分扣除失败,会导致用户获得商品但未扣除积分。 我们可以采用 TDMQ RocketMQ 版事务消息来实现这一功能,具体分为以下三个阶段:
1、阶段一:发送事务消息(准备核销积分)
用户提交订单并选择使用积分兑换后,订单服务向 RocketMQ 服务端对应的业务 Topic 发送一条事务消息,内容包含 “用户 user-001 发起订单 Order-001 并使用 1000 积分兑换商品 A”。此时,该消息对下游的积分服务和库存服务等均不可见,避免在订单服务事务完成前,积分服务提前扣减积分,确保积分不会被误扣。
2、阶段二:执行本地事务(创建订单)
事务消息发送成功后,订单服务继续执行本地事务,创建订单并预占积分,若本地事务成功,则提交二次确认 Commit 到 RocketMQ 服务端,消息被继续投递到下游,反之,提交 Rollback,事务结束,积分状态保持不变。
3、阶段三:下游服务消费(扣减积分并更新库存)
积分服务和库存服务预先订阅上面的 Topic,接收到消息后,积分服务扣减积分,库存服务更新库存。若消费过程中因网络异常、服务不可用等问题导致失败,RocketMQ 将自动触发重试机制,若多次重试仍未成功,消息将转入死信队列,后续由人工介入核对,通过补偿流程保障积分和库存数据的最终一致性。
通过以上三个阶段,RocketMQ 事务消息机制在积分核销场景中,保障了订单本地事务和消息发送的同时成功/失败,成功实现了分布式事务的最终一致性。类似的,在金融交易、企业多系统数据同步等场景中,RocketMQ 事务消息都能凭借其可靠的机制,保障跨服务操作的最终一致性。那么,RocketMQ 事务消息究竟是如何在底层实现这些复杂操作,确保最终一致性的呢?接下来,我们深入探究其背后的原理。
事务消息实现原理详解
术语说明
在分析 RocketMQ 事务消息的实现原理之前,有必要先了解一下这些概念和术语:
1、半消息 half message:
生产者发送事务消息到 RocketMQ 服务端后,消息会被持久化并标记为“暂不可投递”的状态,直到本地事务执行完成并确认后,消息才会决定是否对消费者可见,此状态下的消息,称为半消息(Half Message)。
2、二阶段提交
实现事务最终一致性的关键机制,一阶段为发送 Half Message,二阶段为生产者执行本地事务,并根据执行结果向 RocketMQ 服务端提交 Commit(允许投递)或 Rollback(丢弃消息)的确认结果,以此来决定 Half Message 的去留。
3、OP 消息:
用于给消息状态打标记,没有对应 OP 消息的 Half Message,就说明二阶段确认状态未知,需要 RocketMQ 服务端进行本地事务状态主动回查,OP 消息的内容为对应的 Half Message 的存储的 Offset。
4、相关 Topic:
- Real Topic:业务真实 Topic,生产者发消息时指定的 Topic 值。
- Half Topic:系统 Topic,Topic名称为 RMQ_SYS_TRANS_HALF_TOPIC,用于存储 Half Message。
- OP Topic:系统 Topic,Topic名称为 MQ_SYS_TRANS_OP_HALF_TOPIC,用于存储 OP 消息, Half Message 二次状态确认后,不管是 Commit 还是 Rollback,都会写入一条对应的 OP 消息到这个 Topic。
事务消息处理流程
了解完基本概念,结合上面的业务场景,我们来看 RocketMQ 事务消息的实现流程:
步骤说明:
- 生产者发送事务消息到 RocketMQ 服务端。
- 服务端存储这条消息后返回发送成功的响应,此时消息对下游消费者不可见,处于Half Message 状态。
- 生产者收到半消息成功的响应后,继续往下执行本地事务(如更新业务数据库)。
- 根据本地事务的执行结果,生产者会向 RocketMQ 服务端提交最终状态,也就是二次确认。
- 确认结果为 Commit 时,服务端会将事务消息继续向下投递给消费者,确认结果为 Rollback 时,服务端将会丢弃该消息,不再向下投递。
- 确认结果是 Unknown 或一直没有收到确认结果时,一定时间后,将会触发事务状态主动回查。
- 当生产者未提交最终状态或者二次确认的结果为 Unknown 时,RocketMQ 服务端将会主动发起事务结果查询请求到生产者服务。
- 生产者收到请求后提交二次确认结果,逻辑再次回到第5步,此时如果生产者服务暂时不可用,则 RocketMQ 服务端会在指定时间间隔后,继续主动发起回查请求,直到超过最大回查次数后,回滚消息。
如此,不管本地事务是否执行成功,都能实现事务状态的最终一致性。以上步骤,可用时序图直观体现为:
半消息的具体实现
了解了事务消息基本的实现流程后,你可能会有疑问,半消息为什么对消费者不可见?二次确认 Commit 或者 Rollback 后,服务端如何投递或者删除半消息?前面提到,Half Message 在服务端做了持久化,但在消费端却不可见,实现这一效果的方式,就是 Topic 替换:首先将事务消息的 Real Topic 和队列信息作为属性暂存起来,以便后续二阶段提交结果为 Commit 时,能正确地投递到下游消费者,然后将消息的 Topic 改为系统 Topic RMQ_SYS_TRANS_HALF_TOPIC,队列 ID 改为0,用户的消费者正常不会订阅这个系统 Topic,自然也就不能看到 Half Message。 Half Message 被成功投递到上面的系统 Topic 后,开始执行本地事务,如果生产者提交的本地事务二次确认结果为 Commit,则在消息属性中获取消息的 Real Topic、队列等信息,设置 Topic = Real Topic后,再投递下游,最后删除 Half Message(逻辑删除),如果二次确认结果为 Rollback,则只需要逻辑删除对应的 Half Message 即可。这里逻辑删除的实现,就是前面提到的 OP Topic,OP 队列中的消息,记录了 Half Message 对应的二次确认状态,根据这个状态,RocketMQ 服务端会进行第二个核心机制:事务状态主动回查。
事务回查的具体实现
Half Message 写入后,可能会因为种种原因,导致 RocketMQ 服务端一直收不到二次确认结果,比如网络异常、生产者服务暂时不可用、本地事务死锁导致执行时间超长等,此时,就需要 RocketMQ 服务端主动去询问生产者服务本地事务是否执行成功,以决定 Half Message 的最终去留。 RocketMQ 服务端会启动事务检查定时任务,默认每60秒执行一次,最大回查15次,可通过 TransactionCheckInterval 和 TransactionCheckMax 这两项配置按业务实际情况进行定制化调整。回查时,会对比 Half 队列和 OP 队列的偏移量,若发现 Half 消息未在 OP 队列中有对应的记录且 Half Message 的留存时间超过了事务超时时间(前面分析过,Half Message 是否被二次确认过,是根据 OP 队列来判断的),则触发主动回查动作,向生产者服务发起事务状态检查请求,如此,就解决了部分事务消息状态悬而未决的问题,实现了本地事务和消息发送之间的最终一致性。
事务消息核心源码解析
分析完具体的实现原理,接下来我们对照 Half Message 发送、二次确认提交、事务主动回查这三个关键部分的源码实现,来具体看看以上理论在代码中的体现:
发送事务消息
首先看看事务消息的发送的具体实现,核心代码为org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl 下的sendMessageInTransaction 方法:
public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter localTransactionExecuter, Object arg) throws MQClientException { // 检查事务监听器对象是否为空,后续本地事务的执行和回查都依靠它 TransactionListener transactionListener = this.getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", (Throwable)null); } else { if (msg.getDelayTimeLevel() != 0) { // 延迟消息属性在这里不生效 MessageAccessor.clearProperty(msg, "DELAY"); } // 消息内容必要性检查 Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; // 添加事务消息相关属性 MessageAccessor.putProperty(msg, "TRAN_MSG", "true"); MessageAccessor.putProperty(msg, "PGROUP", this.defaultMQProducer.getProducerGroup()); try { // 开始发送消息 sendResult = this.send(msg); } catch (Exception var11) { Exception e = var11; throw new MQClientException("send message Exception", e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; // 如果消息发送成功,则继续处理本地事务 switch (sendResult.getSendStatus()) { case SEND_OK: try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty("UNIQ_KEY"); if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } if (null != localTransactionExecuter) { // 开始执行本地事务,并得到一个本地事务状态的结果 localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null) { this.log.debug("Used new transaction API"); localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { this.log.info("executeLocalTransactionBranch return {}", localTransactionState); this.log.info(msg.toString()); } } catch (Throwable var10) { Throwable e = var10; this.log.info("executeLocalTransactionBranch exception", e); this.log.info(msg.toString()); localException = e; } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: // 消息发送都未成功,则本地事务状态直接为rollback localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; } try { // 根据本地事务状态结果,进行下一步,决定half message是继续投递还是删除 this.endTransaction(msg, sendResult, localTransactionState, localException); } catch (Exception var9) { Exception e = var9; this.log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } TransactionSendResult transactionSendResult = new TransactionSendResult(); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); // 返回消息发送的结果 return transactionSendResult; } }
对照以上源码,可以看到,生产者方法发送消息后,首先会对事务监听器做非空校验,因为后面本地事务的执行以及事务状态的主动回查,都需要依赖它来完成,接下来的主要逻辑有四点:
- 给原始消息加一个 TRAN_MSG=true 的属性,这是后面判定一条消息为事务消息的条件。
- 同步发送 Half Message,若发送失败,则不再执行本地事务,保证了“同失败”的事务一致性。
- 若发送成功,则开始执行我们设置的本地事务,并根据执行结果修改本地事务状态值。
- 根据事务状态值,来 endTransaction 做收尾工作,这里包含了下面我们要说的事务回查和 Half Message 的删除。
二次确认提交
Half Message 发送成功后,开始执行本地事务,并根据执行结果,提交二次确认结果Commit/Rollback 到 RocketMQ 服务端。
/** * @param msg: 原始消息对象 * @param sendResult: 消息发送结果(包含事务ID、消息ID等) * @param localTransactionState: 本地事务执行状态(COMMIT/ROLLBACK/UNKNOWN) * @param localException: 本地事务执行异常 */ public void endTransaction( final Message msg, final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; // 优先使用offsetMsgId解析消息ID,获取偏移量 if (sendResult.getOffsetMsgId() != null) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); } else { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } // 事务ID String transactionId = sendResult.getTransactionId(); final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); // 构建二次确认的请求头 EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); requestHeader.setBname(sendResult.getMessageQueue().getBrokerName()); // 不同的本地事务执行结果,设置不同的请求头 switch (localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } // 执行事务结束钩子,设置一些必要的上下文信息 doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; // 发送请求,请求Command为END_TRANSACTION this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout()); }
从上面源码可以看出:本地事务执行结束后,事务收尾的工作内容主要包括:
- 解析消息 ID,获取 Offset。
- 根据本地事务执行结果,构建不同的请求头。
- 发送请求到 RocketMQ 服务端。
二次确认消息发送到 RocketMQ 服务端后,由核心类 EndTransactionProcessor 的 processRequest 方法来处理 Commit 或者 Rollback 消息,考虑到篇幅问题,这里只分析逻辑删除 Half Message 的部分:
// 二次确认结果是commit if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // 从RMQ_SYS_TRANS_HALF_TOPIC(Half消息队列)查询待提交的消息,验证消息的有效性 result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { // 二次确认,判读事务ID一直想、消息存储时间有效性等 RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { // 核心逻辑msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); // 即从属性中恢复消息一开始的真实topic MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); // 清理掉事务消息的标志性属性TRAN_MSG == true,下面要将消息做为普通消息向下投递 MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED); // 投递消息到real topic RemotingCommand sendResult = sendFinalMessage(msgInner); if (sendResult.getCode() == ResponseCode.SUCCESS) { // 删除half message,本质是向op topic写入一条消息,标识这个half message已经确认过了 this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return sendResult; } return res; } // 二次确认结果为rollback } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { // 同上,必要性检查 RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { // 既然是需要回滚,那就只需要删除对应的half message,同样是向op topic 写入一条消息 this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return res; } }
// Delete prepare message when this message has been committed or rolled back. @Override public boolean deletePrepareMessage(MessageExt msgExt) { // 向op topic 写入一条消息,消息内容是给对应的half message打了一个remove标记 if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) { log.debug("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt); return true; } else { log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId()); return false; } }
可以看到,不管生产者提交的二次确认结果是 Commit 还是 Rollback,都会执行 deletePrepareMessage 方法,向 OP 队列写入消息,标识这条 Half Message 已经被处理过了,而并不是把这条消息物理删除掉。和 Rollback 不同的是,Commit 时,需要先获取并设置消息的 Real Topic 和 Real QueueId(这个在第一步发送 Half Message 时已经记录在了消息属性中),然后向下投递,此时,消息对下游消费者可见。
事务状态回查
对于始终没有收到二次确认的消息,RocketMQ 服务端会主动发起事务回查,对于事务超时未确认的核心逻辑在org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl 的Check 方法,Check 方法被前面提到的定时任务每隔60秒调用一次,回查时,首先从 Half Topic 下的所有队列开始:
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC; // 获取存储half message的topic下的所有队列 Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic); if (msgQueues == null || msgQueues.size() == 0) { // 没有half message则直接退出 log.warn("The queue of topic is empty :" + topic); return; } log.debug("Check topic={}, queues={}", topic, msgQueues);
然后遍历每个队列,和 op topic 下的数据做对比:
// 获取对应的Op队列(RMQ_SYS_TRANS_OP_HALF_TOPIC) MessageQueue opQueue = getOpQueue(messageQueue); // Half队列消费位点 long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue); // Op队列消费位点 long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
如果 OP Topic 中有 Half Message 的相关记录,就不再回查,否则,判读消息是否需要被跳过回查(消息超过最大检查次数、超过了消息最大保留时间、刚写入的消息),并且对于超出最大检查次数的消息,丢弃操作其实是将消息转移到 TRANS_CHECK_MAX_TIME_TOPIC 这个系统 Topic。
// Half消息已处理过了(存在对应的Op记录) if (removeMap.containsKey(i)) { log.debug("Half offset {} has been committed/rolled back", i); Long removedOpOffset = removeMap.remove(i); doneOpOffset.add(removedOpOffset); } else { // Half消息未被二次确认,根据偏移量查询对应的消息 GetResult getResult = getHalfMsg(messageQueue, i); MessageExt msgExt = getResult.getMsg(); // 消息还存在,判断是否需要丢弃或者跳过,丢弃是满足检查次数超出了最大检查次数,跳过则是满足消息留存时长超过了最大保留时间 if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) { // 执行丢弃逻辑,发往TRANS_CHECK_MAXTIME_TOPIC这个系统Topic listener.resolveDiscardMsg(msgExt); newOffset = i + 1; i++; continue; } // 消息是刚写入不久的,先跳过,一会儿再检查 if (msgExt.getStoreTimestamp() >= startTime) { log.debug("Fresh stored. the miss offset={}, check it later, store={}", i, new Date(msgExt.getStoreTimestamp())); break; }
直到判断出 Half Message 没有对应的 OP 记录,并且消息留存时长超过了事务超时时间,开始组装发送回查请求到生产者端。
// 获取Op消息,查看消息二次确认的标识 List<MessageExt> opMsg = pullResult.getMsgFoundList(); // 没有对应的Op消息,且留存时常超过了事务超时时间,触发回查逻辑 boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)) || (valueOfCurrentMinusBorn <= -1); if (isNeedCheck) { // 重新保存half message,避免写放大 if (!putBackHalfMsgQueue(msgExt, i)) { continue; } // 调用sendCheckMessage方法,里面在组装和发送回查请求 listener.resolveHalfMsg(msgExt); }
事务消息实践指南
使用示例
这里以 TDMQ 版 RocketMQ 5.x 版本集群为例,演示事务消息的使用方式和效果。 1、首先登录腾讯云控制台,新建一个消息类型为事务消息的 Topic。 2、以 Java 语言为例,引入 5.x 对应版本的依赖。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>5.0.6</version> </dependency>
3、启动生产者。
public class ProducerTransactionMessageDemo { private static final Logger log = LoggerFactory.getLogger(ProducerTransactionMessageDemo.class); private static boolean executeLocalTransaction() { // 模拟本地事务(如数据库插入操作),这里假设执行成功 return true; } private static boolean checkTransactionStatus(String orderId) { // 模拟查询本地事务执行结果,如查询订单ID是否已入库,查到则return true return true; } public static void main(String[] args) throws ClientException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); // 在控制台权限管理页面获取ak和sk String accessKey = "your-ak"; String secretKey = "your-sk"; SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); // 在控制台获取并填写腾讯云提供的接入地址 String endpoints = "https://your-endpoints"; ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .enableSsl(false) .setCredentialProvider(sessionCredentialsProvider) .build(); String topic = "tran_topic"; TransactionChecker checker = messageView -> { log.info("Receive transactional result check request, message={}", messageView); // 服务端主动回查本地事务状态 String orderId = messageView.getProperties().get("orderId"); boolean isSuccess = checkTransactionStatus(orderId); return isSuccess ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK; }; // 创建生产着并设置回查的checker对象 Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(topic) .setTransactionChecker(checker) .build(); // 开启事务 final Transaction transaction = producer.beginTransaction(); byte[] body = "This is a transaction message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); String tag = "tagA"; final Message message = provider.newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys("your-key-565ef26f5727") //一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验 .addProperty("orderId", "0001") .setBody(body) .build(); // 发送半消息 try { final SendReceipt sendReceipt = producer.send(message, transaction); log.info("Send transaction message successfully, messageId={}", sendReceipt.getMessageId()); } catch (Throwable t) { log.error("Failed to send message", t); return; } // 执行本地事务 boolean localTxSuccess = executeLocalTransaction(); if (localTxSuccess) { // 本地事务执行成功,二次确认为Commit transaction.commit(); } else { // 本地事务执行失败,二次确认为Rollback transaction.rollback(); } // producer.close(); } }
4、运行代码后,在控制台的消息查询页面,可以看到已经有一条投递完成等待消费的消息。 5、启动消费者,订阅这个 Topic,成功消费消息后,在腾讯云控制台查看消息轨迹:
6、修改代码,假设本地事务执行失败,使处于 Half Message 状态的事务消息回滚。
private static boolean executeLocalTransaction() { // 本地事务执行失败 return false; } private static boolean checkTransactionStatus(String orderId) { // 回查结果自然也是rollback,返回false return false; }
7、此时,可以发现消息发送成功了,但在控制台的消息消息查询页面是不可见的,启动消费者也不能消费到这条消息。
注意事项
使用事务消息过程中,需注意以下几点:
- Topic 类型必须为事务 TRANSACTION,否则生产消息会报错,关键错误信息:current message type not match with topic accept message types。
- 事务消息不支持延迟,若设置了延迟属性,在发送消息前会被清除延迟属性。
- 如果本地事务执行较慢,此时服务端进行事务回查时,应返回 Unknown,且如果确认本地事务执行耗时会很长,应修改第一次事务回查的时间,以避免产生大量结果未知的事务。
总结
本文从理论与源码双视角剖析了 TDMQ RocketMQ 版事务消息的三大核心流程——半消息的发送存储、二阶段提交及事务状态回查的实现机制。在实际生产中,建议开发者通过幂等设计规避重复消费,合理设置事务超时时间,并关注 Topic 类型限制等约束条件,以充分发挥事务消息在分布式场景中的价值。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
AI 体验走查 - 火山引擎存储的 AI UX 探索之路
01 概述 火山引擎存储技术团队驱动 AI 自主完成用户体验走查 / 可用性测试的执行与评价,帮助业务改善交互体验。 立项“故事走查”的背景诉求和 AI 机遇 如何搭建“AI 评价”能力,精准识别交互问题 让交互体验故事走查变为技术产品,讲解系统设计,包括流程、User Story 维护、框架和 AI 模型选型、Midscene.js 的集成技巧等 02 AI 体验走查,究竟是什么? 对于互联网产品,除了满足用户基本需求以外,使用过程中的交互体验也十分重要。为了优化用户体验,业务往往需要投入大量的精力来进行繁琐的走查与优化。 想象一下,如果无需投入人力,完全靠 AI 自身就能全面走查并发现用户体验问题,提升产品质量是怎样的体验。这正是火山引擎存储业务 AI 用户体验走查系统的探索方向。 先给大家看一些真实案例 Demo。 03 案例 1 - 简单任务 名称: 资源管理-创建日志项目 步骤:进入日志服务概览页,创建一个名称为 aiux_project_${uuid} 的日志项目 AI 执行过程 https://www.bilibili.com/video/BV1Lo3LzSE3J/?vd...
- 下一篇
主键分布不均 SeaTunnel CDC 同步卡顿多日?这样优化终于通了!
遇到的问题 我们项目使用seatunnel从业务库抽数到数仓(StarRocks),已经成功使用Mysql-CDC做了大量的实时同步。但最近在抽一个MySQL表的时候遇到了异常情况,作业启动之后,日志显示读写数量一直为0,且长时间不停止,运行6小时之后以checkpoint timeout异常停止。 作业模型如下(已擦除涉密信息): 运行关键日志: 问题背景 场景:使用mysql-cdc进行数据实时抽取到StarRocks seatunnel版本:2.3.9 Mysql版本:8.x starrocks版本:3.2 源表数据量:6000W-7000W 提出疑问 为什么读写数量一直为0? 为什么运行这么长时间才报超时? 分析过程 由于之前已大量使用mysql-cdc进行抽数,模型配置基本一致,没有出现过这种问题,大概率不是seatunnel的问题。 对比之前的表,看源表和之前正常接入的表是否有什么不一样。 对比之下果然发现猫腻: 之前的表基本都是有自增主键的;本次同步的表没有自增主键,仅设有多个唯一索引 疑问就来了:SeaTunnel到底是怎样同步数据的? 根据已有的认知,在同步cdc数据...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8编译安装MySQL8.0.19
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Mario游戏-低调大师作品
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS8安装Docker,最新的服务器搭配容器使用
- CentOS7安装Docker,走上虚拟化容器引擎之路