spring boot 集成RabbitMQ 接收消息及其高级特性
基本用法 连接和资源管理 管理MQ的核心类:ConnectionFactory ConnectionFactory封装了com.rabbitmq.client.Connection,官方提供的实现类为CachingConnectionFactory,它缓存了创建的信道(Channel),默认为25个,可以通过setChannelCacheSize来更改数量。 @Bean(name="cwagConnectionFactory") public ConnectionFactory cwagConnectionFactory(){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; } @RabbitListener的那些配置 从1.4版本开始,@RabbitListener(queues = "myQueue")可以接收来自myQueue队列的消息,但是这个队列必须在路由里面存在。 从1.5版本开始,queue可以自动声明和绑定。 一个配置的列子: @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "myQueue", durable = "true"), exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"), key = "orderRoutingKey") ) public void processOrder(String data) { ... } 其中,@QueueBinding包含有以下参数: value:类型为@Queue,如果只配置为@Queue,没有配置value属性,则会创建一个匿名队列(exclusive, auto-delete)例如: @RabbitListener(bindings = @QueueBinding( value = @Queue, exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true")) ) exchange:类型为@Exchange,Exchange支持的类型为DIRECT, FANOUT, TOPIC and HEADERS,例如: exchange = @Exchange(value = "auto.exch",type = ExchangeTypes.TOPIC,autoDelete = "tue",durable = "true"), 可能遇到的问题: 如何动态设置Qeueu的名称? @Queue支持properties或者SpEL表达式定义: 通过property定义:${...} @RabbitListener(queues = { "${queue1}", "${queue2}" } ) @RabbitListener(queues = "#{'${property.with.comma.delimited.queue.names}'.split(',')}" ) 通过SpEL表达式定义:#{...} @RabbitListener(queues = "#{queueName}" ) .... //在Bean中声明 @Bean public String queueName(){ return "helloQueue"; } 如何通过Exchange,RoutingKey进行连接绑定? @RabbitListener(bindings = @QueueBinding( value = @Queue,//设置一个匿名队列 exchange = @Exchange(value = "auto.exch"),key="routingKey") ) 自己手动配置MQ连接,例如:系统中要连接多个MQ地址;需要配置多个线程处理消息 ConnectionFactory:配置MQ连接 @Bean(name="cwagConnectionFactory") public ConnectionFactory cwagConnectionFactory(){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; } RabbitListenerContainerFactory 构造ListenerContainerFactory @Bean(name="myListenerFactory") public SimpleRabbitListenerContainerFactory firstFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("myConnectionFactory") ConnectionFactory connectionFactory ) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConcurrentConsumers(3);//默认开启3个线程的线程池 factory.setMaxConcurrentConsumers(10);//最大线程为10个 configurer.configure(factory, connectionFactory); return factory; } 引用构造ListenerContainerFactory: @RabbitHandler @RabbitListener(queues = "MyQueue",containerFactory = "myListenerFactory") public void receiveApply(@Payload byte[] msg) throws IOException { log.info("received from MyQueue" ); } 高级特性 队列长度 如何设置队列长度:设置最大消息数量(x-max-length)或者最大容量(x-max-length-bytes) Map<String,Object> map = new HashMap<>(); //设置最大长度 map.put("x-max-length",5); return new Queue("5-length",false,false,false,map); 投递消息数量超多队列长度 默认情况下(overflow:drop-head),当消息数量超多最大容量后,队列头部(最老的消息)消息会被丢弃或者变为死信(死信路由存在的时候才会发到对应的路由)。可以通过overflow来设置默认规则。 overflow的规则 drop-head reject-publish:最新发布的消息被丢弃。 死信队列 消息变为死信的情况: 消息被拒绝 消息过期 队列长度达到最大长度 如何设置死信队列 创建队列时设置队列的死信路由键即可。订阅此路由的队列就可以收到死信消息。 Map<String,Object> map = new HashMap<>(); //设置最大长度 map.put("x-max-length",5); //设置死信路由 map.put("x-dead-letter-exchange","my-dead-exchange"); map.put("x-dead-letter-routing-key","dead-r-5"); return new Queue("5-length",false,false,false,map); 延迟队列 延迟队列就是某个时间段过后消息被投递到此队列。可以利用消息超时后被进入到死信队列这个特点来实现。 如上图所示,绑定了x-dead-letter-exchage,x-dead-letter-routing-key的队列就是延迟队列。 Tips: 对于使用过程中存在的问题,仔细阅读官方文档是最有效的方法, 参考文档:https://docs.spring.io/spring-amqp/reference/html/_reference.html#broker-configuration 注解配置:https://docs.spring.io/spring-amqp/docs/1.7.11.RELEASE/reference/html/_reference.html#async-annotation-driven