您现在的位置是:首页 > 文章详情

精通SpringBoot——第八篇:整合RabbitMQ消息队列

日期:2018-08-01点击:333

最近由于个人原因,好几天没有更新博客文章了。今天来和朋友们一起学习下,SpringBoot怎么整合RabbitMQ。目前消息组件大致有三种:.activemq, rabbitmq, kafka。这三者各有优缺点,RabbitMQ相比之下是处于其他二者之间的一个消息组件。RabbitMQ依赖于erlang,在linux下安装的话,要先安装erlang环境。下面来看看怎么SpringBoot 怎么整合RabbitMQ吧。

  1. 想要使用RabbitMQ ,pom依赖是少不了的~
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

2.再来看看application.yml文件的内容

spring: rabbitmq: username: rabbit password: 123456 host: localhost port: 5672 virtual-host: / #手动ACK 不开启自动ACK模式,目的是防止报错后未正确处理消息丢失 默认 为 none listener: simple: acknowledge-mode: manual

RabbitMQConfig的内容

import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { public static final String DEFAULT_MAIL_QUEUE = "dev.mail.register.default.queue"; public static final String MANUAL_MAIL_QUEUE = "dev.mail.register.manual.queue"; @Bean public Queue defaultMailQueue (){ // Queue queue = new Queue(Queue名称,消息是否需要持久化处理) return new Queue(DEFAULT_MAIL_QUEUE, true); } @Bean public Queue manualMailQueue(){ return new Queue(MANUAL_MAIL_QUEUE, true); } }

搞两个监听器(使用@RabbitListener注解)来监听下这两种消息 (怎么感觉自己现在说话一股土味儿,最近吃土吃多了么~ 好吧,写的代码估计也是土味的吧)

 import com.developlee.rabbitmq.config.RabbitMQConfig; import com.developlee.rabbitmq.entity.MailEntity; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class MailHandler { private static final Logger logger = LoggerFactory.getLogger(MailHandler.class); /** * <p>TODO 该方案是 spring-boot-data-amqp 默认的方式,不太推荐。具体推荐使用 listenerManualAck()</p> * 默认情况下,如果没有配置手动ACK, 那么Spring Data AMQP 会在消息消费完毕后自动帮我们去ACK * 存在问题:如果报错了,消息不会丢失,但是会无限循环消费,一直报错,如果开启了错误日志很容易将磁盘空间耗完 * 解决方案:手动ACK,或者try-catch 然后在 catch 里面讲错误的消息转移到其它的系列中去 * spring.rabbitmq.listener.simple.acknowledge-mode=manual * <p> * * @param mail 监听的内容 */ @RabbitListener(queues = {RabbitMQConfig.DEFAULT_MAIL_QUEUE}) public void listenerAutoAck(MailEntity mail, Message message, Channel channel) { //TODO 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉 final long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { logger.info("listenerAutoAck 监听的消息-{}", mail.toString()); //TODO 通知MQ 消息已被成功消费,可以ACK了 channel.basicAck(deliveryTag, false); } catch (IOException e) { //处理失败, 重新压入MQ. try { channel.basicRecover(); } catch (IOException e1) { e1.printStackTrace(); } } } @RabbitListener(queues = {RabbitMQConfig.MANUAL_MAIL_QUEUE}) public void listenerManualAck(MailEntity mail, Message message, Channel channel) { logger.info("listenerManualAck 监听的消息-{}", mail.toString()); try { //TODO 通知MQ 消息已被成功消费,可以ACK了 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { //如果报错,容错处理, } } }

再来一波测试代码,测试下......

import com.developlee.rabbitmq.config.RabbitMQConfig; import com.developlee.rabbitmq.entity.MailEntity; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author Lee * @// TODO 2018/6/22-11:20 * @description */ @RestController @RequestMapping(value = "/mail") public class MailController { private final RabbitTemplate rabbitTemplate; @Autowired public MailController(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } /** * this.rabbitTemplate.convertAndSend(RabbitConfig.DEFAULT_MAIL_QUEUE, mailEntity); * 对应 {@link MailHandler#listenerAutoAck}; * this.rabbitTemplate.convertAndSend(RabbitConfig.MANUAL_MAIL_QUEUE, mailEntity); * 对应 {@link MailHandler#listenerManualAck}; */ @GetMapping("/default") public void defaultMailMsg() { MailEntity mailEntity = new MailEntity(); mailEntity.setId("1"); mailEntity.setName("First Mail Message"); mailEntity.setTitle("RabbitMQ with Spring boot!"); mailEntity.setContent("Come on! Let's study Micro-Service together!"); this.rabbitTemplate.convertAndSend(RabbitMQConfig.DEFAULT_MAIL_QUEUE, mailEntity); this.rabbitTemplate.convertAndSend(RabbitMQConfig.MANUAL_MAIL_QUEUE, mailEntity); } }

MailEntity.java

import java.io.Serializable; public class MailEntity implements Serializable { private static final long serialVersionUID = -2164058270260403154L; private String id; private String name; private String title; private String content; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }

启动项目 ,浏览器地址栏输入http://localhost:8080/mail。 something you will find in your heart。

今天台风暴雨,明天一样。但后天可能会天晴,也许会有彩虹。—— By 一个挣扎的程序猿。

最后,以上示例代码可在我的github.com中找到。
我的个人公众号:developlee的潇洒人生。
关注了也不一定更新,更新就不得了了。
qrcode_for_gh_2bd3f44efa21_258

原文链接:https://yq.aliyun.com/articles/622175
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章