RabbitMQ 可靠投递
- 背景
- confirmCallback 确认模式
- returnCallback 未投递到 queue 退回模式
- shovel-plugin 跨机房可靠投递
背景
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两个选项用来控制消息的投递可靠性模式。
rabbitmq 整个消息投递的路径为:
producer->rabbitmq broker cluster->exchange->queue->consumer
message 从 producer 到 rabbitmq broker cluster 则会返回一个 confirmCallback 。
message 从 exchange->queue 投递失败则会返回一个 returnCallback 。我们将利用这两个 callback 控制消息的最终一致性和部分纠错能力。
confirmCallback 确认模式
在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启 confirmcallback 。
CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setPublisherConfirms(true);//开启confirm模式
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory); rabbitTemplate.setConfirmCallback((data, ack, cause) -> { if (!ack) { log.error("消息发送失败!" + cause + data.toString()); } else { log.info("消息发送成功,消息ID:" + (data != null ? data.getId() : null)); } });
我们来看下 ConfirmCallback 接口。
public interface ConfirmCallback { /** * Confirmation callback. * @param correlationData correlation data for the callback. * @param ack true for ack, false for nack * @param cause An optional cause, for nack, when available, otherwise null. */ void confirm(CorrelationData correlationData, boolean ack, String cause); }
重点是 CorrelationData 对象,每个发送的消息都需要配备一个 CorrelationData 相关数据对象,CorrelationData 对象内部只有一个 id 属性,用来表示当前消息唯一性。
发送的时候创建一个 CorrelationData 对象。
User user = new User(); user.setID(1010101L); user.setUserName("plen"); rabbitTemplate.convertAndSend(exchange, routing, user, message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); return message; }, new CorrelationData(user.getID().toString()));
这里将 user ID 设置为当前消息 CorrelationData id 。当然这里是纯粹 demo,真实场景是需要做业务无关消息 ID 生成,同时要记录下这个 id 用来纠错和对账。
消息只要被 rabbitmq broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才会调用 confirmCallback。
被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback 。
returnCallback 未投递到queue退回模式
confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到 return 退回模式。
同样创建 ConnectionFactory 到时候需要设置 PublisherReturns(true) 选项。
CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setPublisherReturns(true);//开启return模式
rabbitTemplate.setMandatory(true);//开启强制委托模式 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info(MessageFormat.format("消息发送ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode, replyText, exchange, routingKey)));
这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据。
shovel-plugin 跨机房可靠投递
RabbitMQ 在跨机房集成提供了一个不错的插件 shovel 。使用 shovel-plugin 插件非常方便,shovel 可以接受机房之间的网络断开、机器下线等不稳定因素。
这里有两个 broker :
10.211.55.3 rabbit_node1
10.211.55.4 rabbit_node2
我们希望将发送给 rabbit_node1 plen.queue 的消息传输到 rabbit_node2 plen.queue 中。我们先开启 __rabbit_node1 的 shovel-plugin__。
先看下当前 RabbitMQ 版本是否安装了 shovel-plugin,如果有的话直接开启。
rabbitmq-plugins list rabbitmq-plugins enable rabbitmq_shovel rabbitmq-plugins enable rabbitmq_shovel_management
然后就可以在 Admin 面板里看到这个设置选项,怎么设置这里就不介绍了。主要就是配置下 amqp 协议地址,amqp://user:password@server-name/my-vhost 。
如果配置没有问题的话,应该是这样的一个状态,说明已经顺利连接到 __rabbit_node2 broker__ 。
我们来看下 rabbit_node1 和 rabbit_node2 的 Connections 面板。
__rabbit_node1(10.211.55.3):__
__rabbit_node2(10.211.55.4):__
RabbitMQ shovel-plugin 插件在 rabbit_node1 broker 创建了两个 tcp 连接,端口 39544 连接是用来消费 plen.queue 里的消息,端口 55706 连接是用来推送消息给 rabbit_node2 。
我们来看下 __rabbit_node1 tcp__ 连接状态:
tcp6 0 0 10.211.55.3:5672 10.211.55.3:39544 ESTABLISHED tcp 0 0 10.211.55.3:55706 10.211.55.4:5672 ESTABLISHED
__rabbit_node2 tcp__ 连接状态:
tcp6 0 0 10.211.55.4:5672 10.211.55.3:55706 ESTABLISHED
为了验证 shovel-plugin 稳定性,我们将 __rabbit_node2__ 下线。
然后再发送消息,发现消息会现在 rabbit_node1 plen.queue 里待着,一旦 shovel-plugin 连接恢复将消费 rabbit_node1 plen.queue 消息,然后投递给 __rabbit_node2 plen.queue__ 。
作者:王清培 (沪江集团资深JAVA架构师)
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
从零构建ipa-server.实现ldap+kerberos网络用户验证
redhat7 linux domain identity,authentication,and policy linux 的域标识,认证,和策略 IPA-server服务,通过网络用户和组连接系统。 1,用户信息和认证服务介绍 1.1随着现在网络的发展,在企业中主机也越来越多,主机用户管理变成一件很艰难的任务,一种解决方式,账号信息不存放在本地系统中,而是账号信息存储在一个中心位置,实现用户的集中管理。单点登录(single sign on )简称SSO,是目前比较流行的企业业务整合的解决方案之一,SSO的定义是定义在多个应用系统中,用户只需要登录一次就可以访问所有信任的应用系统。 1.2构建一个集中认证管理系统需要提供:账户信息和认证信息 1.2.1账户信息:包含如,用户名,UID,GID等 存储账号信息流行的解决方案:LADP,NIS,AD或IPA-server 1.2.2认证信息:密码,指纹等。 ldap服务 kerberos 是一种网络认证协议,仅提供SSO认证服务,通常和LDAP一起使用。 典型的实现方案:AD(微软活动目录)和IPA-server 2,从零搭建IPA-se...
- 下一篇
Grafana+Telegraf+Influxdb监控Tomcat集群方案
前言 前一段时间自家养的几只猫经常出问题,由于没有有效的监控预警手段,以至于问题出现或者许久一段时间才会被通知到。凌晨一点这个锅可谁都不想背,为此基于目前的情况搭建了以下这么一套监控预警系统。 相关软件 Nginx:代理访问 Grafana Grafana: 可视化面板(Dashboard),有着非常漂亮的图表和布局展示 Influxdb:开源的时间序列数据库,适用于记录度量,事件及执行分析 Telegraf:收集系统和服务的统计数据 Docker:开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中 监控架构 GTI监控预警系统,架构流程说明: 第一步:数据采集,Telegraf 采集 Tomcat 相关参数数据 第二步:数据存储,Influxdb 存储 Telegraf 采集的数据 第三步:数据可视化,Grafana 配置 Tomcat 监控面板 第四步:预警通知,配置钉钉、邮件等预警 安装配置 这里只对Grafana、Telegraf、Influxdb、Tomcat 做相应的安装说明,Nginx 以及 Docker 请自行查阅资料。 Grafana Gra...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7安装Docker,走上虚拟化容器引擎之路
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Windows10,CentOS7,CentOS8安装Nodejs环境
- Linux系统CentOS6、CentOS7手动修改IP地址
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS8编译安装MySQL8.0.19
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装