您现在的位置是:首页 > 文章详情

RocketMQ事务消息

日期:2018-11-08点击:347

什么是事务消息?

事务消息可以认为是两阶段提交消息的实现,用来确保分布式系统中的最终一致性。事务消息保证执行本地事务的执行和消息发送的原子性。

使用约束

  1. 消息事务不支持定时和批量。
  2. 为了避免一个消息被多次检查,导致半数队列消息堆积,我们限制单个消息的默认检查次数为15次,但用户可以改变这个限制通过修改broker的配置文件中的 transactionCheckMax参数。如果一个消息检查次数超过transactionCheckMax,默认情况下,broker将会丢弃这个消息并同时打印错误日志。用户可以改变这种行为通过覆盖 AbstractTransactionCheckListener 类。
  3. 由broker的配置文件中参数 transactionTimeou t决定的特点时间段之后检查事务消息。当发送事务消息时,通过设置用户配置CHECK_IMMUNITY_TIME_IN_SECONDS,用户也可以改变这个限制。这个参数优先于 transactionMsgTimeout 参数。
  4. 一个事务消息可能被检查或消费多次。
  5. 提交过的消息重新放到用户目标主题可能会失败。目前,它依赖日志记录。通过RocketMQ自身高可用机制确保高可用。如果你想确保事务消息不丢失并且保证事务完整性,建议使用同步双写机制。
  6. 事务消息的生产者ID不能与其他类型消息的生产者ID共享。不像其他类型消息,事务消息允许回查。MQ server通过生产者ID查询客户端。

应用

事务状态

三种事务消息状态:

  • TransactionStatus.CommitTransaction:提交事务,允许消费者消费这个消息。
  • TransactionStatus.RollbackTransaction:回滚事务,消息将会被删除或不再允许消费。
  • TransactionStatus.Unknown:中间状态,MQ需要重新检查来确定状态。

发送事务消息

创建事务生产者

使用TransactionMQProducer类创建producer客户端,指定唯一producerGroup,你可以设置一个自定义线程池来处理检查请求。执行本地事务后,你需要根据执行结果恢复MQ,并回复上面描述的状态。

实现事务监听器接口

当发送半消息成功时,使用 executeLocalTransaction 方法执行本地事务。它返回三种事务状态的一种。
使用 checkLocalTransaction 方法检查本地事务状态和响应MQ检查请求。它同样返回三种事务状态的一种。

代码

@RequestMapping(value = "/sendTransaction") public String sendTransaction(HttpServletRequest request)throws Exception{ TransactionMQProducer producer=new TransactionMQProducer("sendTransaction_producer_group"); try{ String mesage=request.getParameter("message"); producer.setNamesrvAddr(NAMESERVER_ADDR); //设置线程池 producer.setExecutorService(new ThreadPoolExecutor(2,5,100, TimeUnit.SECONDS,new ArrayBlockingQueue(2000),new ThreadFactory(){ @Override public Thread newThread(Runnable r) { Thread thread=new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } })); //事务监听器 producer.setTransactionListener(new TransactionListener() { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value=transactionIndex.getAndIncrement(); Integer status=value%3; localTrans.put(msg.getTransactionId(),status); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status =localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { //case 0: //return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; default: return LocalTransactionState.UNKNOW; } } return LocalTransactionState.COMMIT_MESSAGE; } }); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for(int i=0;i<10;i++){ Message msg=new Message("Test_filter",tags[i % tags.length],"KEY" + i,(mesage+i).getBytes()); msg.putUserProperty("a",String.valueOf(i)); SendResult sendResult = producer.sendMessageInTransaction(msg,null); logger.error("返回结果:"+sendResult); Thread.sleep(10); } return "发送成功"; }catch (Exception e){ logger.error(e.getMessage()); return "发送失败"; }finally { for (int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); } }
原文链接:https://yq.aliyun.com/articles/666606
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章