SpringBoot RabbitMQ消息队列的重试、超时、延时、死信队列
今天介绍使用SpringBoot实现RabbitMQ消息队列的高级用法。
- MQ安装
- 自动创建
- 消息重试
- 消息超时
- 死信队列
- 延时队列
一、RabbitMQ的安装
众所周知,RabbitMQ的安装相对复杂,需要先安装Erlang,再按着对应版本的RabbitMQ的服务端,最后为了方便管理还需要安装rabbitmq_management管理端插件,偶尔还会出现一些安装配置问题,故十分复杂。 在开发测试环境下使用docker来安装就方便多了,省去了环境和配置的麻烦。
1. 拉取官方image
docker pull rabbitmq:management
2. 启动RabbitMQ
docker run -dit --name MyRabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management
rabbitmq:management: image:tag --name:指定容器名; -d:后台运行容器; -t:在新容器内指定一个伪终端或终端; -i:允许你对容器内的标准输入 (STDIN) 进行交互; -p:指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号); -e:指定环境变量;(RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码);
至此RabbitMQ就安装启动完成了,可以通过http://localhost:15672 登陆管理后台,用户名密码就是上面配置的admin/admin
二、使用SpringBoot自动创建队列
1. 引入amqp包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. MQ配置
bootstrap.yml 配置
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: admin
password: admin
listener:
simple:
concurrency: 5
direct:
prefetch: 10
concurrency:每个listener在初始化的时候设置的并发消费者的个数 prefetch:每次从一次性从broker里面取的待消费的消息的个数
rabbitmq-spring.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--接收消息的队列名-->
<rabbit:queue name="login-user-logined" />
<!--声明exchange的名称与类型-->
<rabbit:topic-exchange name="login_barryhome_fun">
<rabbit:bindings>
<!--queue与exchange的绑定和匹配路由-->
<rabbit:binding queue="login-user-logined" pattern="login.user.logined"/>
</rabbit:bindings>
</rabbit:topic-exchange>
</beans>
rabbit:topic-exchange:声明为topic消息类型 pattern="login.user.logined":此处是一个表达式,可使用“*”表示一个词,“#”表示一个或多个词
3. 消息生产端
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public LoginUser SendLoginSucceedMessage(){
LoginUser loginUser = getLoginUser("succeed");
// 发送消息
rabbitTemplate.convertAndSend(MessageConstant.MESSAGE_EXCHANGE,
MessageConstant.LOGIN_ROUTING_KEY, loginUser);
return loginUser;
}
@NoArgsConstructor
@AllArgsConstructor
public class LoginUser implements Serializable {
String userName;
String realName;
String userToken;
Date loginTime;
String status;
}
这里需要注意的是默认情况下消息的转换器为SimpleMessageConverter只能解析string和byte,故传递的消息对象必须是可序列化的,实现Serializable接口
SimpleMessageConverter only supports String, byte[] and Serializable payloads, received: fun.barryhome.cloud.dto.LoginUser
4. 消息消费端
@Component
public class ReceiverMessage {
@RabbitListener(queues = "login-user-logined")
public void receiveLoginMessage(LoginUser loginUser) {
System.err.println(loginUser);
}
}
@RabbitListener(queues = "login-user-logined"):用于监听名为login-user-logined 队列中的消息
5. 自动创建Queue
@SpringBootApplication
@ImportResource(value = "classpath:rabbitmq-spring.xml")
public class MQApplication {
public static void main(String[] args) {
SpringApplication.run(MQApplication.class, args);
}
}
在没有导入xml且MQ服务器上没有列队的情况下,会导致找不到相关queue的错误
channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'login-user-logined' in vhost '/', class-id=50, method-id=10)
而导入之后将自动创建 exchange和queue
三、消息重试
默认情况下如果有消息消费出错后会一直重试,造成消息堵塞 
消息堵塞之后也影响到后续消息的消费,时间越长越来越多的消息将无法及时消费处理。 如果是单条或极少量的消息有问题可通过多开节点concurrency将正常的消息消息掉,但如果较多则全部节点都将堵塞。
如果想遇到消息消费报错重试几次就舍弃,从而不影响后续消息的消费,如何实现呢?
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: admin
password: admin
listener:
simple:
concurrency: 5
prefetch: 10
retry:
enabled: true # 允许消息消费失败的重试
max-attempts: 3 # 消息最多消费次数3次
initial-interval: 2000 # 消息多次消费的间隔2秒
以上配置允许消息消费失败后重试3次,每次间隔2秒,如果还是失败则直接舍弃掉本条消息。 重试可解决因非消息体本身处理问题产生的临时性的故障,而将处理失败的消息直接舍弃掉只是为其它消息正常处理的权益之计而以,将业务操作降到相对低的影响。
四、消息超时
消息重试可解决因消息处理报错引起的问题。如果是消息处理过慢导致错过时效,除了可在处理逻辑中进行处理外,也可以通过消息的超时机制来处理,设定超时时间后将消息直接舍弃。
修改rabbitmq-spring.xml
<rabbit:queue name="login-user-logined">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Long" />
</rabbit:queue-arguments>
</rabbit:queue>
x-message-ttl:在消息服务器停留的时间(ms)

五、死信队列
死信队列就是当业务队列处理失败后,将消息根据routingKey转投到另一队列,这样的情况有:
- 消息被拒绝 (basic.reject or basic.nack) 且带 requeue=false不重新入队参数或达到的retry重新入队的上限次数
- 消息的TTL(Time To Live)-存活时间已经过期
- 队列长度限制被超越(队列满,queue的"x-max-length"参数)
1. 修改rabbitmq-spring.xml
<!--接收消息的队列名-->
<rabbit:queue name="login-user-logined">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Long"/>
<!--死信的交换机-->
<entry key="x-dead-letter-exchange" value="login_barryhome_fun"/>
<!--死信发送的路由-->
<entry key="x-dead-letter-routing-key" value="login.user.login.dlq"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue name="login-user-logined-dlq"/>
<!--申明exchange的名称与类型-->
<rabbit:topic-exchange name="login_barryhome_fun">
<rabbit:bindings>
<!--queue与exchange的绑定和匹配路由-->
<rabbit:binding queue="login-user-logined" pattern="login.user.logined"/>
<rabbit:binding queue="login-user-logined-dlq" pattern="login.user.login.dlq"/>
</rabbit:bindings>
</rabbit:topic-exchange>
通过对死信发送的交换机和路由的的设置,可将消息转向具体的queue中。这里交换机可以和原业务队列不是一个。 当login-user-logined中的消息处理失败后将直接转投向login-user-logined-dlq队列中。 当程序逻辑修复后可再将消息再移回业务队列中move messages
2. 安装插件

3. 移动消息

六、延时队列
延时队列除了可以做一般的延时处理外,还可以当作单个job的定时任务处理,比起一般通过定时器去轮询的方式更优雅。
1. 修改rabbitmq-spring.xml
<rabbit:topic-exchange name="login_barryhome_fun" delayed="true">
初次配置时,如果报以下错误,则是服务器不支持此命令,需要安装插件
Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)
2. 安装插件
-
下载插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0
-
上传插件到docker容器中/plugins
docker ps查询rabbitmq的 CONTAINER ID
docker cp rabbitmq_delayed_message_exchange-3.8.0.ez 2c248563a2b0:/plugins
- 进入docker容器内部
docker exec -it 2c248563a2b0 /bin/bash
- 安装插件
cd /plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
具体安装教程可参考:https://blog.csdn.net/magic_1024/article/details/103840681
安装成功后重启程序,观察mq管理端的exchange可发现
3. 发送延时消息
@GetMapping("/sendDelay")
public LoginUser SendDelayLoginSucceedMessage() {
LoginUser loginUser = getLoginUser("succeed");
MessagePostProcessor messagePostProcessor = message -> {
// 延时10s
message.getMessageProperties().setHeader("x-delay", 10000);
return message;
};
// 发送消息
rabbitTemplate.convertAndSend(MessageConstant.MESSAGE_EXCHANGE,
MessageConstant.LOGIN_ROUTING_KEY, loginUser, messagePostProcessor);
return loginUser;
}
需要注意的是消息的发送是
实时的,消息服务器接收到消息待延时时间后再投到对应的queue中
七、完整代码
https://gitee.com/hypier/barry-cloud/tree/master/cloud-mq
八、请关注我的公众号
关注公众号
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
MySQL5.6升级5.7时,出现主从延迟问题排查过程
最近在做zabbix的数据库MySQL5.6升级5.7时,出现主从延迟问题,这个问题困扰了很久没有解决,昨天终于解决了,整理了一下整个排查过程,分享给大家。 环境说明:mysql主库为5.6的版本,有四个从库,三个为5.6的版本,一个为5.7的版本,所有主从的库表结构均一致,5.7的从库出现大量延迟,5.6的没问题,业务为zabbix监控,基本全部为insert批量插入操作,每条insert SQL插入数据为400-1000行左右。 问题:MySQL5.7的从库大量延迟,relaylog落盘正常,应用到数据库比较慢,磁盘IO和CPU没有压力,sync_binlog为20000或是0没有区别,max_allowed_packet=128M,innodb_flush_log_at_trx_commit=0,bulk_insert_buffer_size = 128M,binlog_format=row,sync_relay_log=10000,没有使用并行复制,没有开启SSL,没有开启GDID,没有开启半同步。 排查过程: 1:检查各个核对各个和性能相关的参数,没有发现异常。 2:检查网卡...
-
下一篇
MySQL的锁到底有多少内容 ?再和腾讯大佬的技术面谈,我还是小看锁了!
对酒当歌,人生几何! 朝朝暮暮,唯有己脱。 苦苦寻觅找工作之间,殊不知今日之时乃我心之痛,难到是我不配拥有工作嘛。自面试后他所谓的等待都过去一段时日,可惜在下京东上的小金库都要见低啦。每每想到不由心中一紧。正处为难之间,手机忽然来了个短信预约后续面试。 我即刻三下五除二拎包踢门而出。飞奔而去。 此刻面试门外首先映入眼帘的是一个白色似皮球的东西,似圆非圆。好奇冬瓜落地一般。上半段还有一段湿湿的部分,显得尤为入目。这是什么情况? 紧接着现身一名中年男子。他身着纯白色T桖衫的,一灰色宽松的休闲西裤,腰围至少得三十好几。外加一双夏日必备皮制凉鞋。只见,他正低头看着手上的一张A4纸。透过一头黑色短发。满脸的赘肉横生。外加上那大腹便便快要把那T桖衫给撑爆的肚子。 看得我好生害怕,不由得咽了咽口水,生怕自己说错话。这宛如一颗肉粽呀。不在职场摸滚打拼8、9年,也不会有当前这景象。 什么是锁 面试官:: 你是来参加面试的吧? 吒吒辉: 不 不 不,我是来参加复试呢。 面试官:: 看到上次别人点评,MySQL优化还阔以。那你先谈谈对锁的理解? 吒吒辉: 嘿嘿,还好! 锁是计算机在进行多 进程、线程执行调度...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- Crontab安装和使用
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- MySQL数据库在高并发下的优化方案
- Dcoker安装(在线仓库),最新的服务器搭配容器使用




微信收款码
支付宝收款码