RocketMQ事务消息
什么是事务消息?
事务消息可以认为是两阶段提交消息的实现,用来确保分布式系统中的最终一致性。事务消息保证执行本地事务的执行和消息发送的原子性。
使用约束
- 消息事务不支持定时和批量。
- 为了避免一个消息被多次检查,导致半数队列消息堆积,我们限制单个消息的默认检查次数为15次,但用户可以改变这个限制通过修改broker的配置文件中的 transactionCheckMax参数。如果一个消息检查次数超过transactionCheckMax,默认情况下,broker将会丢弃这个消息并同时打印错误日志。用户可以改变这种行为通过覆盖 AbstractTransactionCheckListener 类。
- 由broker的配置文件中参数 transactionTimeou t决定的特点时间段之后检查事务消息。当发送事务消息时,通过设置用户配置CHECK_IMMUNITY_TIME_IN_SECONDS,用户也可以改变这个限制。这个参数优先于 transactionMsgTimeout 参数。
- 一个事务消息可能被检查或消费多次。
- 提交过的消息重新放到用户目标主题可能会失败。目前,它依赖日志记录。通过RocketMQ自身高可用机制确保高可用。如果你想确保事务消息不丢失并且保证事务完整性,建议使用同步双写机制。
- 事务消息的生产者ID不能与其他类型消息的生产者ID共享。不像其他类型消息,事务消息允许回查。MQ server通过生产者ID查询客户端。
应用
事务状态
三种事务消息状态:
- TransactionStatus.CommitTransaction:提交事务,允许消费者消费这个消息。
- TransactionStatus.RollbackTransaction:回滚事务,消息将会被删除或不再允许消费。
- TransactionStatus.Unknown:中间状态,MQ需要重新检查来确定状态。
发送事务消息
创建事务生产者
使用TransactionMQProducer类创建producer客户端,指定唯一producerGroup,你可以设置一个自定义线程池来处理检查请求。执行本地事务后,你需要根据执行结果恢复MQ,并回复上面描述的状态。
实现事务监听器接口
当发送半消息成功时,使用 executeLocalTransaction 方法执行本地事务。它返回三种事务状态的一种。
使用 checkLocalTransaction 方法检查本地事务状态和响应MQ检查请求。它同样返回三种事务状态的一种。
代码
@RequestMapping(value = "/sendTransaction") public String sendTransaction(HttpServletRequest request)throws Exception{ TransactionMQProducer producer=new TransactionMQProducer("sendTransaction_producer_group"); try{ String mesage=request.getParameter("message"); producer.setNamesrvAddr(NAMESERVER_ADDR); //设置线程池 producer.setExecutorService(new ThreadPoolExecutor(2,5,100, TimeUnit.SECONDS,new ArrayBlockingQueue(2000),new ThreadFactory(){ @Override public Thread newThread(Runnable r) { Thread thread=new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } })); //事务监听器 producer.setTransactionListener(new TransactionListener() { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value=transactionIndex.getAndIncrement(); Integer status=value%3; localTrans.put(msg.getTransactionId(),status); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status =localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { //case 0: //return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; default: return LocalTransactionState.UNKNOW; } } return LocalTransactionState.COMMIT_MESSAGE; } }); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for(int i=0;i<10;i++){ Message msg=new Message("Test_filter",tags[i % tags.length],"KEY" + i,(mesage+i).getBytes()); msg.putUserProperty("a",String.valueOf(i)); SendResult sendResult = producer.sendMessageInTransaction(msg,null); logger.error("返回结果:"+sendResult); Thread.sleep(10); } return "发送成功"; }catch (Exception e){ logger.error(e.getMessage()); return "发送失败"; }finally { for (int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); } }
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
微服务设计指南
2018年,每个人都听说过微服务。但你知道怎么设计吗? 微服务是当今软件工程师的一个热门话题。让我们了解如何使用微服务架构风格构建真正模块化、业务敏捷的IT系统。 一、微服务概念 微服务体系结构由轻量级、松散耦合的服务集合组成。每个服务都实现了单个业务功能。理想情况下,这些服务应该是具有足够的内聚性,可以独立地开发、测试、发布、部署、扩展、集成和维护。 正式定义 “微服务架构风格是一种将单个应用程序开发为一组小型服务的方法,每个小服务运行在自己的进程中,并且以轻量级机制(通常是HTTP REST API)通信。这些服务是围绕业务能力建立的,并且可以由完全自动化的部署机构独立部署。这些服务的集中管理只有最低限度,可以用不同的编程语言编写并使用不同的数据存储技术。” —— James Lewis and Martin Fowler 定义微服务的特性 ●每个服务都是一个轻量级、独立和松散耦合的业务单元。 ●每个服务都有自己的代码库,由一个小团队管理和开发(主要是用于敏捷环境中)。 ●每个服务负责一部分功能或者说业务能力,并且做得很好。 ●每个服务都可以为其用例选择最佳的技术栈(无需将整个应用...
- 下一篇
2018收官蓉城,探秘多媒体开发新趋势
2018年收官在即,音视频技术生态在这一年中也并不平静,Codec的争夺愈加激烈,AV1的持续优化以及国产AVS2在工业界的探索对HEVC的挑战更加紧迫;WebRTC的定稿打通了浏览器、移动端乃至IoT的多媒体通信;人工智能的快速发展,在计算机视觉、Codec、网络传输等多方面与多媒体技术发生着化学反应,成为推动生态发展的新力量。与此同时,在线教育、医疗、金融、新零售的新业务场景探索,赋予了新技术最好的实践环境。 12月15日·成都 | LiveVideoStack联合FCC、小样青年社区,邀请三体云、七牛云、即构科技、TeeVid、企鹅医生等多位技术大咖,共同探索新机技术在音视频领域的实践,以及新兴应用场景和传统行业的突破。 讲师与话题 时杰 三体云系统架构师 拥有超过10年的广电网络工作经验,曾参与CCTV、青岛、昌平等多家电视台播出项目的音视频相关工作的设计、研发,并获得多项专利;在多家上市公司担任过研发项目经理,系统分析师,系统架构师,现场项目管理等重要职位,积累了丰富的研发、现场实操经验。 Topic:直播系统中编码之后插入自定义SEI的方法 众所周知,在直播系统中,除了音视...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Hadoop3单机部署,实现最简伪集群
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS8安装Docker,最新的服务器搭配容器使用
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Windows10,CentOS7,CentOS8安装Nodejs环境
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- CentOS7安装Docker,走上虚拟化容器引擎之路