您现在的位置是:首页 > 文章详情

RabbitMQ进阶使用-延时队列的配置(Spring Boot)

日期:2018-10-10点击:560

依赖

  • MAVEN配置pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 
  • Gradle配置build.gradle
compile('org.springframework.boot:spring-boot-starter-amqp') 

连接配置

得益于spring boot的约定大于配置,只需要在application.yml加入下面配置即可。

spring: rabbitmq: host: host port: port username: admin password: passwd 

简单自定义RabbitTemplate和Queue配置

默认的配置还是略显不足,增加序列化配置如下:

@Configuration public class QueueConfig { /** * 自动注入为SimpleRabbitListenerContainerFactory的消息序列化转换器 */ @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } /** * 持久化交换机 */ @Bean(name = "exchange") public FanoutExchange exchange() { return new FanoutExchange("exchange1", true, false); } /** * 持久化队列 */ @Bean public Queue queue() { return new Queue("queue", true); } /** * 将队列和exchange绑定 * * @return binding */ @Bean Binding bindingSmsExchangeSmsQueue() { return BindingBuilder.bind(queue()).to(exchange()); } } 

特殊延时队列的配置

延时队列的用法这里就不详细说了,参考Spring Boot与RabbitMQ结合实现延迟队列的示例,有些场景如未支付订单30分钟过期等,可通过延时队列实现

 @Bean public Queue delayQueue(){ return QueueBuilder.durable("delayQueue") //队列名称 .withArgument("x-message-ttl",10000) //死信时间 .withArgument("x-dead-letter-exchange", "") //死信重新投递的交换机 .withArgument("x-dead-letter-routing-key", "queue")//路由到队列的routingKey .build(); } 

启动应用测试一下

启动应用,在rabbit管理web查看所有队列

  • 所有队列
  • 查看delayQueue详情,框框中为延时配置 将"x-message-ttl"参数改成20000重启发现问题,控制队列里面的参数也没有修改成功

修改带参数队列失败的问题

问题分析

根据日志提示,队列已经存在而且参数不一致导致,然后查看源码在RabbitAdmin发现下面代码,在创建队列失败的时候会调用logOrRethrowDeclarationException方法,logOrRethrowDeclarationException方法中发布了一个DeclarationExceptionEvent事件,到这里解决思路有,监听这个事件,然后删除相应的队列

 private DeclareOk[] declareQueues(final Channel channel, final Queue... queues) throws IOException { List<DeclareOk> declareOks = new ArrayList<DeclareOk>(queues.length); for (int i = 0; i < queues.length; i++) { Queue queue = queues[i]; if (!queue.getName().startsWith("amq.")) { if (this.logger.isDebugEnabled()) { this.logger.debug("declaring Queue '" + queue.getName() + "'"); } try { try { DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), queue.getArguments()); declareOks.add(declareOk); } catch (IllegalArgumentException e) { if (this.logger.isDebugEnabled()) { this.logger.error("Exception while declaring queue: '" + queue.getName() + "'"); } try { if (channel instanceof ChannelProxy) { ((ChannelProxy) channel).getTargetChannel().close(); } } catch (TimeoutException e1) { } throw new IOException(e); } } catch (IOException e) { logOrRethrowDeclarationException(queue, "queue", e); } } else if (this.logger.isDebugEnabled()) { this.logger.debug(queue.getName() + ": Queue with name that starts with 'amq.' cannot be declared."); } } return declareOks.toArray(new DeclareOk[declareOks.size()]); } private <T extends Throwable> void logOrRethrowDeclarationException(Declarable element, String elementType, T t) throws T { DeclarationExceptionEvent event = new DeclarationExceptionEvent(this, element, t); this.lastDeclarationExceptionEvent = event; if (this.applicationEventPublisher != null) { this.applicationEventPublisher.publishEvent(event); } if (this.ignoreDeclarationExceptions || (element != null && element.isIgnoreDeclarationExceptions())) { if (this.logger.isDebugEnabled()) { this.logger.debug("Failed to declare " + elementType + ": " + (element == null ? "broker-generated" : element) + ", continuing...", t); } else if (this.logger.isWarnEnabled()) { Throwable cause = t; if (t instanceof IOException && t.getCause() != null) { cause = t.getCause(); } this.logger.warn("Failed to declare " + elementType + ": " + (element == null ? "broker-generated" : element) + ", continuing... " + cause); } } else { throw t; } } 

解决方法

写一个DeclarationExceptionEvent事件监听,处理创建失败的队列,既删除掉

@Component public class DeclarationExceptionEventListener { @Autowired private AmqpAdmin rabbitAdmin; @EventListener(classes = DeclarationExceptionEvent.class) public void listen(DeclarationExceptionEvent event) { final Declarable declarable = event.getDeclarable(); if (declarable instanceof Queue) { final Queue queue = (Queue) declarable; rabbitAdmin.deleteQueue(queue.getName()); } } } 

改完重启应用,只有一条异常日志(原来4条),还有一条的原因是第一次创建失败发布事件,我们监听了事件进行处理。查看rabbit控制台,参数修改成功。

原文链接:https://my.oschina.net/junjunyuanyuankeke/blog/2240786
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章