《RabbitMQ》如何保证消息的可靠性
一条消费成功被消费经历了生产者->MQ->消费者,因此在这三个步骤中都有可能造成消息丢失。
一 消息生产者没有把消息成功发送到MQ
1.1 事务机制
AMQP
协议提供了事务机制,在投递消息时开启事务支持,如果消息投递失败,则回滚事务。
自定义事务管理器
@Configuration
public class RabbitTranscation {
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){
return new RabbitTransactionManager(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
return new RabbitTemplate(connectionFactory);
}
}
修改yml
spring:
rabbitmq:
# 消息在未被队列收到的情况下返回
publisher-returns: true
开启事务支持
rabbitTemplate.setChannelTransacted(true);
消息未接收时调用ReturnCallback
rabbitTemplate.setMandatory(true);
生产者投递消息
@Service
public class ProviderTranscation implements RabbitTemplate.ReturnCallback {
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
// 设置channel开启事务
rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("这条消息发送失败了"+message+",请处理");
}
@Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
public void publishMessage(String message) throws Exception {
rabbitTemplate.setMandatory(true);
rabbitTemplate.convertAndSend("javatrip",message);
}
}
但是,很少有人这么干,因为这是同步操作,一条消息发送之后会使发送端阻塞,以等待RabbitMQ-Server的回应,之后才能继续发送下一条消息,生产者生产消息的吞吐量和性能都会大大降低。
1.2 发送方确认机制
发送消息时将信道设置为confirm
模式,消息进入该信道后,都会被指派给一个唯一ID,一旦消息被投递到所匹配的队列后,RabbitMQ
就会发送给生产者一个确认。
开启消息确认机制
spring:
rabbitmq:
# 消息在未被队列收到的情况下返回
publisher-returns: true
# 开启消息确认机制
publisher-confirm-type: correlated
消息未接收时调用ReturnCallback
rabbitTemplate.setMandatory(true);
生产者投递消息
@Service
public class ConfirmProvider implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
System.out.println("确认了这条消息:"+correlationData);
}else{
System.out.println("确认失败了:"+correlationData+";出现异常:"+cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("这条消息发送失败了"+message+",请处理");
}
public void publisMessage(String message){
rabbitTemplate.setMandatory(true);
rabbitTemplate.convertAndSend("javatrip",message);
}
}
如果消息确认失败后,我们可以进行消息补偿,也就是消息的重试机制。当未收到确认信息时进行消息的重新投递。设置如下配置即可完成。
spring:
rabbitmq:
# 支持消息发送失败后重返队列
publisher-returns: true
# 开启消息确认机制
publisher-confirm-type: correlated
listener:
simple:
retry:
# 开启重试
enabled: true
# 最大重试次数
max-attempts: 5
# 重试时间间隔
initial-interval: 3000
二 消息发送到MQ后,MQ宕机导致内存中的消息丢失
消息在MQ中有可能发生丢失,这时候我们就需要将队列和消息都进行持久化。
@Queue注解为我们提供了队列相关的一些属性,具体如下:
name: 队列的名称;
durable: 是否持久化;
exclusive: 是否独享、排外的;
autoDelete: 是否自动删除;
arguments:队列的其他属性参数,有如下可选项,可参看图2的arguments:
x-message-ttl:消息的过期时间,单位:毫秒;
x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;
x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;
x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;
x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;
x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)
x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;
x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;
x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。
持久化队列
创建队列的时候将持久化属性durable设置为true,同时要将autoDelete设置为false
@Queue(value = "javatrip",durable = "true",autoDelete = "false")
持久化消息
发送消息的时候将消息的deliveryMode设置为2,在Spring Boot中消息默认就是持久化的。
三 消费者消费消息的时候,未消费完毕就出现了异常
消费者刚消费了消息,还没有处理业务,结果发生异常。这时候就需要关闭自动确认,改为手动确认消息。
修改yml为手动签收模式
spring:
rabbitmq:
listener:
simple:
# 手动签收模式
acknowledge-mode: manual
# 每次签收一条消息
prefetch: 1
消费者手动签收
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))
public class Consumer {
@RabbitHandler
public void receive(String message, @Headers Map<String,Object> headers, Channel channel) throws Exception{
System.out.println(message);
// 唯一的消息ID
Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 确认该条消息
if(...){
channel.basicAck(deliverTag,false);
}else{
// 消费失败,消息重返队列
channel.basicNack(deliverTag,false,true);
}
}
}
四 总结
消息丢失的原因?
生产者、MQ、消费者都有可能造成消息丢失
如何保证消息的可靠性?
发送方采取发送者确认模式
MQ进行队列及消息的持久化
消费者消费成功后手动确认消息
本文分享自微信公众号 - Java旅途(Javatrip)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Flutter Dojo设计之道——利用Github打造完善的开源项目
Flutter Dojo从最开始就准备打造成一个专业的GitHub开源项目。 一个好的GitHub开源项目,不仅仅是一个开发者专业技术的体现,更是一个自我展示的平台,专业的GitHub开源项目,可以吸引更多的开发者参与到项目的协同开发中来,让项目能够健康持续的发展。 下面我将根据Flutter Dojo的开发经历,来讲下如何借助GitHub打造完善的开源项目。 个性化个人主页 GitHub主页给了开发者一个公开的个人展示界面,不用搭建服务器,你就可以免费获得一个属于自己的展示页面,不过这也是GitHub的一个彩蛋功能,首先,你需要创建一个新的repository,并将其命名为你的用户名,如图所示。 这时候你就会发现,同名的repository是一个GitHub彩蛋,你只需要在这个同名的Repo中的README.md中创建自己的主页即可,例如我的主页,如图所示。 这个页面实际上就是通过README.md方式来进行展示的,实际上功能比较有限,但是通过 github-readme-stats 这个项目,也可以给简单的md界面创建一些有意思的东西,例如我的界面中的GitHub Stats和项目...
- 下一篇
c++之对象构造顺序和销毁(析构函数)
一、对象的构造顺序: 1、对于局部对象: 当程序执行流到达对象的定义语句时进行构造。下面还是用代码来解析这句话: #include<stdio.h>classTest{private:intmi;public:Test(inti){mi=i;printf("Test(inti)is%d\n",mi);}Test(constTest&obj){mi=obj.mi;printf("Test(constTest&)objis%d\n",mi);}};intmain(){inti=0;Testa1=i;//Test(inti):0while(i<3){Testa2=++i;//Test(inti):1,2,3}if(i<4){Testa=a1;//Test(constTest&objis:0}else{Testa(100);}return0;} 输出结果: Test(inti)is0Test(inti)is1Test(inti)is2Test(inti)is3Test(constTest&obj)is0 这里我们可以看出当程序流执行到相应...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8安装Docker,最新的服务器搭配容器使用
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库