详细讲解!RabbitMQ防止数据丢失
思维导图
一、分析数据丢失的原因
分析RabbitMQ消息丢失的情况,不妨先看看一条消息从生产者发送到消费者消费的过程:
可以看出,一条消息整个过程要经历两次的网络传输:从生产者发送到RabbitMQ服务器,从RabbitMQ服务器发送到消费者。
在消费者未消费前存储在队列(Queue)中。
所以可以知道,有三个场景下是会发生消息丢失的:
-
存储在队列中,如果队列没有对消息持久化,RabbitMQ服务器宕机重启会丢失数据。 -
生产者发送消息到RabbitMQ服务器过程中,RabbitMQ服务器如果宕机停止服务,消息会丢失。 -
消费者从RabbitMQ服务器获取队列中存储的数据消费,但是消费者程序出错或者宕机而没有正确消费,导致数据丢失。
针对以上三种场景,RabbitMQ提供了三种解决的方式,分别是消息持久化,confirm机制,ACK事务机制。
二、消息持久化
RabbitMQ是支持消息持久化的,消息持久化需要设置:Exchange为持久化和Queue持久化,这样当消息发送到RabbitMQ服务器时,消息就会持久化。
首先看Exchange交换机的类图:
看这个类图其实是要说明上一篇文章介绍的四种交换机都是AbstractExchange抽象类的子类,所以根据java的特性,创建子类的实例会先调用父类的构造器,父类也就是AbstractExchange的构造器是怎么样的呢?
从上面的注释可以看到durable参数表示是否持久化。默认是持久化(true)。创建持久化的Exchange可以这样写:
@Bean
public DirectExchange rabbitmqDemoDirectExchange() {
//Direct交换机
return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
}
接着是Queue队列,我们先看看Queue的构造器是怎么样的:
也是通过durable参数设置是否持久化,默认是true。所以创建时可以不指定:
@Bean
public Queue fanoutExchangeQueueA() {
//只需要指定名称,默认是持久化的
return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A);
}
这就完成了消息持久化的设置,接下来启动项目,发送几条消息,我们可以看到:
怎么证明是已经持久化了呢,实际上可以找到对应的文件:
找到对应磁盘中的目录:
消息持久化可以防止消息在RabbitMQ Server中不会因为宕机重启而丢失。
三、消息确认机制
3.1 confirm机制
在生产者发送到RabbitMQ Server时有可能因为网络问题导致投递失败,从而丢失数据。我们可以使用confirm模式防止数据丢失。工作流程是怎么样的呢,看以下图解:从上图中可以看到是通过两个回调函数**confirm()、returnedMessage()**进行通知。
一条消息从生产者发送到RabbitMQ,首先会发送到Exchange,对应回调函数confirm()。第二步从Exchange路由分配到Queue中,对应回调函数则是returnedMessage()。
代码怎么实现呢,请看演示:
首先在application.yml配置文件中加上如下配置:
spring:
rabbitmq:
publisher-confirms: true
# publisher-returns: true
template:
mandatory: true
# publisher-confirms:设置为true时。当消息投递到Exchange后,会回调confirm()方法进行通知生产者
# publisher-returns:设置为true时。当消息匹配到Queue并且失败时,会通过回调returnedMessage()方法返回消息
# spring.rabbitmq.template.mandatory: 设置为true时。指定消息在没有被队列接收时会通过回调returnedMessage()方法退回。
有个小细节,publisher-returns和mandatory如果都设置的话,优先级是以mandatory优先。可以看源码:接着我们需要定义回调方法:
@Component
public class RabbitmqConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
private Logger logger = LoggerFactory.getLogger(RabbitmqConfirmCallback.class);
/**
* 监听消息是否到达Exchange
*
* @param correlationData 包含消息的唯一标识的对象
* @param ack true 标识 ack,false 标识 nack
* @param cause nack 投递失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
logger.info("消息投递成功~消息Id:{}", correlationData.getId());
} else {
logger.error("消息投递失败,Id:{},错误提示:{}", correlationData.getId(), cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.info("消息没有路由到队列,获得返回的消息");
Map map = byteToObject(message.getBody(), Map.class);
logger.info("message body: {}", map == null ? "" : map.toString());
logger.info("replyCode: {}", replyCode);
logger.info("replyText: {}", replyText);
logger.info("exchange: {}", exchange);
logger.info("routingKey: {}", exchange);
logger.info("------------> end <------------");
}
@SuppressWarnings("unchecked")
private <T> T byteToObject(byte[] bytes, Class<T> clazz) {
T t;
try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bis)) {
t = (T) ois.readObject();
} catch (Exception e) {
e.printStackTrace();
return null;
}
return t;
}
}
我这里就简单地打印回调方法返回的消息,在实际项目中,可以把返回的消息存储到日志表中,使用定时任务进行进一步的处理。
我这里是使用RabbitTemplate进行发送,所以在Service层的RabbitTemplate需要设置一下:
@Service
public class RabbitMQServiceImpl implements RabbitMQService {
@Resource
private RabbitmqConfirmCallback rabbitmqConfirmCallback;
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
//指定 ConfirmCallback
rabbitTemplate.setConfirmCallback(rabbitmqConfirmCallback);
//指定 ReturnCallback
rabbitTemplate.setReturnCallback(rabbitmqConfirmCallback);
}
@Override
public String sendMsg(String msg) throws Exception {
Map<String, Object> message = getMessage(msg);
try {
CorrelationData correlationData = (CorrelationData) message.remove("correlationData");
rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, message, correlationData);
return "ok";
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
private Map<String, Object> getMessage(String msg) {
String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
CorrelationData correlationData = new CorrelationData(msgId);
String sendTime = sdf.format(new Date());
Map<String, Object> map = new HashMap<>();
map.put("msgId", msgId);
map.put("sendTime", sendTime);
map.put("msg", msg);
map.put("correlationData", correlationData);
return map;
}
}
大功告成!接下来我们进行测试,发送一条消息,我们可以控制台:假设发送一条信息没有路由匹配到队列,可以看到如下信息:
这就是confirm模式。它的作用是为了保障生产者投递消息到RabbitMQ不会出现消息丢失。
3.2 事务机制(ACK)
最开始的那张图已经讲过,消费者从队列中获取到消息后,会直接确认签收,假设消费者宕机或者程序出现异常,数据没有正常消费,这种情况就会出现数据丢失。
所以关键在于把自动签收改成手动签收,正常消费则返回确认签收,如果出现异常,则返回拒绝签收重回队列。代码怎么实现呢,请看演示:
首先在消费者的application.yml文件中设置事务提交为manual手动模式:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 手动ack模式
concurrency: 1 # 最少消费者数量
max-concurrency: 10 # 最大消费者数量
然后编写消费者的监听器:
@Component
public class RabbitDemoConsumer {
enum Action {
//处理成功
SUCCESS,
//可以重试的错误,消息重回队列
RETRY,
//无需重试的错误,拒绝消息,并从队列中删除
REJECT
}
@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
public void process(String msg, Message message, Channel channel) {
long tag = message.getMessageProperties().getDeliveryTag();
Action action = Action.SUCCESS;
try {
System.out.println("消费者RabbitDemoConsumer从RabbitMQ服务端消费消息:" + msg);
if ("bad".equals(msg)) {
throw new IllegalArgumentException("测试:抛出可重回队列的异常");
}
if ("error".equals(msg)) {
throw new Exception("测试:抛出无需重回队列的异常");
}
} catch (IllegalArgumentException e1) {
e1.printStackTrace();
//根据异常的类型判断,设置action是可重试的,还是无需重试的
action = Action.RETRY;
} catch (Exception e2) {
//打印异常
e2.printStackTrace();
//根据异常的类型判断,设置action是可重试的,还是无需重试的
action = Action.REJECT;
} finally {
try {
if (action == Action.SUCCESS) {
//multiple 表示是否批量处理。true表示批量ack处理小于tag的所有消息。false则处理当前消息
channel.basicAck(tag, false);
} else if (action == Action.RETRY) {
//Nack,拒绝策略,消息重回队列
channel.basicNack(tag, false, true);
} else {
//Nack,拒绝策略,并且从队列中删除
channel.basicNack(tag, false, false);
}
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
解释一下上面的代码,如果没有异常,则手动确认回复RabbitMQ服务端basicAck(消费成功)。
如果抛出某些可以重回队列的异常,我们就回复basicNack并且设置重回队列。
如果是抛出不可重回队列的异常,就回复basicNack并且设置从RabbitMQ的队列中删除。
接下来进行测试,发送一条普通的消息"hello":解释一下ack返回的三个方法的意思。
①成功确认
void basicAck(long deliveryTag, boolean multiple) throws IOException;
消费者成功处理后调用此方法对消息进行确认。
-
deliveryTag:该消息的index -
multiple:是否批量.。true:将一次性ack所有小于deliveryTag的消息。
②失败确认
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
-
deliveryTag:该消息的index。 -
multiple:是否批量。true:将一次性拒绝所有小于deliveryTag的消息。 -
requeue:被拒绝的是否重新入队列。
③失败确认
void basicReject(long deliveryTag, boolean requeue) throws IOException;
-
deliveryTag:该消息的index。 -
requeue:被拒绝的是否重新入队列。
basicNack()和basicReject()的区别在于:basicNack()可以批量拒绝,basicReject()一次只能拒接一条消息。
四、遇到的坑
4.1 启用nack机制后,导致的死循环
上面的代码我故意写了一个bug。测试发送一条"bad",然后会抛出重回队列的异常。这就有个问题:重回队列后消费者又消费,消费抛出异常又重回队列,就造成了死循环。那怎么避免这种情况呢?
既然nack会造成死循环的话,我提供的一个思路是不使用basicNack(),把抛出异常的消息落库到一张表中,记录抛出的异常,消息体,消息Id。通过定时任务去处理。
如果你有什么好的解决方案,也可以留言讨论~
4.2 double ack
有的时候比较粗心,不小心开启了自动Ack模式,又手动回复了Ack。那就会报这个错误:
消费者RabbitDemoConsumer从RabbitMQ服务端消费消息:java技术爱好者
2020-08-02 22:52:42.148 ERROR 4880 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
2020-08-02 22:52:43.102 INFO 4880 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@f4a3a8d: tags=[{amq.ctag-8MJeQ7el_PNbVJxGOOw7Rw=rabbitmq.demo.topic}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,5), conn: Proxy@782a1679 Shared Rabbit Connection: SimpleConnection@67c5b175 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56938], acknowledgeMode=AUTO local queue size=0
出现这个错误,可以检查一下yml文件是否添加了以下配置:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
concurrency: 1
max-concurrency: 10
如果上面这个配置已经添加了,还是报错,有可能你使用@Configuration配置了SimpleRabbitListenerContainerFactory,根据SpringBoot的特性,代码优于配置,代码的配置覆盖了yml的配置,并且忘记设置手动manual模式:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//设置手动ack模式
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
如果你还是有报错,那可能是写错地方了,写在生产者的项目了。以上的配置应该配置在消费者的项目。因为ack模式是针对消费者而言的。我就是写错了,写在生产者,折腾了几个小时,泪目~
4.3 性能问题
其实手动ACK相对于自动ACK肯定是会慢很多,我在网上查了一些资料,性能相差大概有10倍。所以一般在实际应用中不太建议开手动ACK模式。不过也不是绝对不可以开,具体情况具体分析,看并发量,还有数据的重要性等等。
所以在实际项目中还需要权衡一下并发量和数据的重要性,再决定具体的方案。
4.4 启用手动ack模式,如果没有及时回复,会造成队列异常
如果开启了手动ACK模式,但是由于代码有bug的原因,没有回复RabbitMQ服务端,那么这条消息就会放到Unacked状态的消息堆里,只有等到消费者的连接断开才会转到Ready消息。如果消费者一直没有断开连接,那Unacked的消息就会越来越多,占用内存就越来越大,最后就会出现异常。
这个问题,我没法用我的电脑演示,我的电脑太卡了。
五、总结
通过上面的学习后,RabbitMQ防止数据丢失有三种方式:
-
消息持久化 -
生产者消息确认机制(confirm模式) -
消费者消息确认模式(ack模式)
上面所有例子的代码都上传github了:
https://github.com/yehongzhi/mall
如果你觉得这篇文章对你有用,点个赞吧~
你的点赞是我创作的最大动力~
想第一时间看到我更新的文章,可以微信搜索公众号「java技术爱好者
」,拒绝做一条咸鱼,我是一个努力让大家记住的程序员。我们下期再见!!!
能力有限,如果有什么错误或者不当之处,请大家批评指正,一起学习交流!
本文分享自微信公众号 - java技术爱好者(yehongzhi_java)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
HashCode为什么使用31作为乘数?
持续坚持原创输出,点击蓝字关注我吧 作者:小傅哥博客:https://bugstack.cn ❝ 沉淀、分享、成长,让自己和他人都能有所收获!😜 ❞ 目录 一、前言 二、面试题 三、资源下载 四、源码讲解 1. 固定乘积31在这用到了 2. 来自stackoverflow的回答 3. Hash值碰撞概率统计 4. Hash值散列分布 五、总结 一、前言 在面经手册的前两篇介绍了「《面试官都问我啥》」和「《认知自己的技术栈盲区》」,这两篇内容主要为了说明面试过程的考查范围,包括个人的自我介绍、技术栈积累、项目经验等,以及在技术栈盲区篇章中介绍了一个整套技术栈在系统架构用的应用,以此全方面的扫描自己有哪些盲区还需要补充。而接下来的章节会以各个系列的技术栈中遇到的面试题作为切入点,讲解技术要点,了解技术原理,包括;数据结构、数据算法、技术栈、框架等进行逐步展开学习。 在进入数据结构章节讲解之前可以先了解下,数据结构都有哪些,基本可以包括;数组(Array)、栈(Stack)、队列(Queue)、链表(LinkList)、树(Tree)、散列表(Hash)、堆(Heap)、图(Graph)。...
- 下一篇
前端生僻字显示
异名在一个游戏项目中遇到一个比较有意思的问题,在游戏的玩法设定中,当怪物在消失的时候会爆出一个中文字,这个效果在部分机型上会出现乱码符号 显示乱码的原因 一开始还以为是字符太多了,char的纹理不够用了,还尝试过手动调用游戏引擎的cc.Label.clearCharCache去清除;后来才认识到是生僻字的问题,这得从字符编码说起,Unicode为每种语言中的每个字符设定了统一并且唯一的二进制编码,以满足跨语言、跨平台进行文本转换、处理的要求,其中: ❝ 3400~4dffh:是中日韩认同表意文字扩充a区,总计收容6,582个中日韩汉字4e00~9fffh:是中日韩认同表意文字区,总计收容20,902个中日韩汉字 ❞ 通常情况下,我们日常所用到的中日韩非符号字符都会落在3400-9fff这个编码区间,因此当我们需要判断某一个字符是否是属于汉字的时候,就可以通过查看它的的Unicode编码是否落在这个区间,我们写一个正则去检查一下上面两个字符👇: 可以看到第二个字它并不处于常用汉字的编码区间,它是生僻字,生僻字的使用频率很低,我们日常高频用的的汉字其实也就是几千个而已。字体公司设计汉字字...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS8编译安装MySQL8.0.19
- CentOS8安装Docker,最新的服务器搭配容器使用
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Mario游戏-低调大师作品
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS6,7,8上安装Nginx,支持https2.0的开启