RabbitMQ进阶使用-延时队列的配置(Spring Boot)
依赖
- MAVEN配置pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- Gradle配置build.gradle
compile('org.springframework.boot:spring-boot-starter-amqp')
连接配置
得益于spring boot的约定大于配置,只需要在application.yml加入下面配置即可。
spring:
rabbitmq:
host: host
port: port
username: admin
password: passwd
简单自定义RabbitTemplate和Queue配置
默认的配置还是略显不足,增加序列化配置如下:
@Configuration
public class QueueConfig {
/**
* 自动注入为SimpleRabbitListenerContainerFactory的消息序列化转换器
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 持久化交换机
*/
@Bean(name = "exchange")
public FanoutExchange exchange() {
return new FanoutExchange("exchange1", true, false);
}
/**
* 持久化队列
*/
@Bean
public Queue queue() {
return new Queue("queue", true);
}
/**
* 将队列和exchange绑定
*
* @return binding
*/
@Bean
Binding bindingSmsExchangeSmsQueue() {
return BindingBuilder.bind(queue()).to(exchange());
}
}
特殊延时队列的配置
延时队列的用法这里就不详细说了,参考Spring Boot与RabbitMQ结合实现延迟队列的示例,有些场景如未支付订单30分钟过期等,可通过延时队列实现
@Bean
public Queue delayQueue(){
return QueueBuilder.durable("delayQueue") //队列名称
.withArgument("x-message-ttl",10000) //死信时间
.withArgument("x-dead-letter-exchange", "") //死信重新投递的交换机
.withArgument("x-dead-letter-routing-key", "queue")//路由到队列的routingKey
.build();
}
启动应用测试一下
启动应用,在rabbit管理web查看所有队列
- 所有队列
- 查看delayQueue详情,框框中为延时配置
将"x-message-ttl"参数改成20000重启发现问题,控制队列里面的参数也没有修改成功
修改带参数队列失败的问题
问题分析
根据日志提示,队列已经存在而且参数不一致导致,然后查看源码在RabbitAdmin发现下面代码,在创建队列失败的时候会调用logOrRethrowDeclarationException方法,logOrRethrowDeclarationException方法中发布了一个DeclarationExceptionEvent事件,到这里解决思路有,监听这个事件,然后删除相应的队列
private DeclareOk[] declareQueues(final Channel channel, final Queue... queues) throws IOException {
List<DeclareOk> declareOks = new ArrayList<DeclareOk>(queues.length);
for (int i = 0; i < queues.length; i++) {
Queue queue = queues[i];
if (!queue.getName().startsWith("amq.")) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("declaring Queue '" + queue.getName() + "'");
}
try {
try {
DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(),
queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());
declareOks.add(declareOk);
}
catch (IllegalArgumentException e) {
if (this.logger.isDebugEnabled()) {
this.logger.error("Exception while declaring queue: '" + queue.getName() + "'");
}
try {
if (channel instanceof ChannelProxy) {
((ChannelProxy) channel).getTargetChannel().close();
}
}
catch (TimeoutException e1) {
}
throw new IOException(e);
}
}
catch (IOException e) {
logOrRethrowDeclarationException(queue, "queue", e);
}
}
else if (this.logger.isDebugEnabled()) {
this.logger.debug(queue.getName() + ": Queue with name that starts with 'amq.' cannot be declared.");
}
}
return declareOks.toArray(new DeclareOk[declareOks.size()]);
}
private <T extends Throwable> void logOrRethrowDeclarationException(Declarable element, String elementType, T t)
throws T {
DeclarationExceptionEvent event = new DeclarationExceptionEvent(this, element, t);
this.lastDeclarationExceptionEvent = event;
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(event);
}
if (this.ignoreDeclarationExceptions || (element != null && element.isIgnoreDeclarationExceptions())) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Failed to declare " + elementType
+ ": " + (element == null ? "broker-generated" : element)
+ ", continuing...", t);
}
else if (this.logger.isWarnEnabled()) {
Throwable cause = t;
if (t instanceof IOException && t.getCause() != null) {
cause = t.getCause();
}
this.logger.warn("Failed to declare " + elementType
+ ": " + (element == null ? "broker-generated" : element)
+ ", continuing... " + cause);
}
}
else {
throw t;
}
}
解决方法
写一个DeclarationExceptionEvent事件监听,处理创建失败的队列,既删除掉
@Component
public class DeclarationExceptionEventListener {
@Autowired
private AmqpAdmin rabbitAdmin;
@EventListener(classes = DeclarationExceptionEvent.class)
public void listen(DeclarationExceptionEvent event) {
final Declarable declarable = event.getDeclarable();
if (declarable instanceof Queue) {
final Queue queue = (Queue) declarable;
rabbitAdmin.deleteQueue(queue.getName());
}
}
}
改完重启应用,只有一条异常日志(原来4条),还有一条的原因是第一次创建失败发布事件,我们监听了事件进行处理。查看rabbit控制台,参数修改成功。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
fabio负载均衡入门
当开发完一个 HTTP Restful服务后,准备配置一个负载均衡。我想弄一个比较简单的方案, [1] 不要依赖于Docker/K8S。 [2] 在Windows 和 Linux 均可。 [3] Go语言实现优先。并且不要有负载的配置。 [4] 能支持服务器动态发现,健康检查。 通过Github搜索,找到fabioGithub。 有5K以上的Star 。eBay团队出品。依赖于Consul做服务发现。入门操作其实相当简单: 1. 启动consul。我是在Windows先直接运行: consul agent -dev 2. 编译并运行fabio-example fabio-example.exe --prefix /echo fabio-example.exe内部启动5000端口监听HTTP服务,并提供 http://localhost:5000/echo服务。并向consul注册这个服务。 3. 启动fabio fabio.exe 4. 测试 curl http://localhost:9999/echo
-
下一篇
开源分布式数据库SequoiaDB在去哪儿网的实践
编者注: 中国的数据库行业也迎来了一波新的热点事件。分布式数据库这块新消息不断,也让大家开始关注中国的分布式数据库。首先是短短一周内,Pingcap和SequoiaDB巨杉数据库陆续宣布了C轮的数千万美元融资,融资的消息在数据库和IT圈成功“刷屏”。此后,在杭州的云栖大会上,蚂蚁金服的Oceanbase也发布了 2.0。对于这些新消息,也侧面反映了国产的开源分布式数据库发展的迅速。那么这些国产分布式数据库,在互联网行业中的实践与使用上是如何呢?与传统开源数据库的对比如何?就由这篇文章作为去哪儿网这边的实践介绍。 引言:开源数据库百花齐放新时代 MySQL目前是全球最流行,用户最多的开源数据库这是无可非议的事实。而同时,开源数据库PostgreSQL也一直在不断发展壮大,当然还包括众多的新一代NoSQL、NewSQL数据库不断涌现。 此前,本人有幸参与“MariaDB/MySQL vs PostgreSQL世纪大决战”,现场火药味十足。作为为MySQL战队的一员,我个人认为,“大决战”可能并不准确,更多的应该是碰撞,因为有史以来,在数据库界,两家不同数据库被摆到台上公开对标,他们应该...
相关文章
文章评论
共有0条评论来说两句吧...