Spring Boot 中使用 RabbitMQ
RabbitMQ是一个开源的
AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:
Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
AMQP,即Advanced message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
常用概念
通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。
准备
环境安装
任选其一
CentOs7.3 搭建 RabbitMQ 3.6 单机服务与使用
http://www.ymq.io/2017/08/16/rabbit-install
CentOs7.3 搭建 RabbitMQ 3.6 Cluster 集群服务与使用
http://www.ymq.io/2017/08/17/rabbit-install-cluster
Github 代码
代码我已放到 Github ,导入spring-boot-rabbitmq 项目
github https://github.com/souyunku/spring-boot-examples/tree/master/spring-boot-rabbitmq
添加依赖
在项目中添加 spring-boot-starter-amqp 依赖
org.springframework.bootspring-boot-starter-amqp
参数配置
spring.application.name=ymq-rabbitmq-spring-boot spring.rabbitmq.host=10.4.98.15 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin
交换机(Exchange)
1.Direct Exchange 根据route key 直接找到队列
2.Topic Exchange 根据route key 匹配队列
3.Topic Exchange 不处理route key 全网发送,所有绑定的队列都发送
Direct Exchange
Direct Exchange 是RabbitMQ默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。
任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue。
1.一般情况可以使用rabbitMQ自带的Exchange:""(该Exchange的名字为空字符串,下文称其为default Exchange)。
2.这种模式下不需要将Exchange进行任何绑定(binding)操作
3.消息传递时需要一个RouteKey,可以简单的理解为要发送到的队列名字。
4.如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。
配置队列
@Configurationpublic class RabbitDirectConfig { @Bean public Queue helloQueue() { return new Queue("hello"); } @Bean public Queue directQueue() { return new Queue("direct"); } //-------------------配置默认的交换机模式,可以不需要配置以下----------------------------------- @Bean DirectExchange directExchange() { return new DirectExchange("directExchange"); } //绑定一个key "direct",当消息匹配到就会放到这个队列中 @Bean Binding bindingExchangeDirectQueue(Queue directQueue, DirectExchange directExchange) { return BindingBuilder.bind(directQueue).to(directExchange).with("direct"); } // 推荐使用 helloQueue() 方法写法,这种方式在 Direct Exchange 模式 多此一举,没必要这样写 //---------------------------------------------------------------------------------------------}
监听队列
@Component@RabbitListener(queues = "hello")public class helloReceiver { @RabbitHandler public void process(String message) { System.out.println("接收者 helloReceiver," + message); } }
@Component@RabbitListener(queues = "direct")public class DirectReceiver { @RabbitHandler public void process(String message) { System.out.println("接收者 DirectReceiver," + message); } }
发送消息
package io.ymq.rabbitmq.test;import io.ymq.rabbitmq.run.Startup;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;/** * 描述: 默认的交换机模式 * * @author: yanpenglei * @create: 2017/10/25 1:03 */@RunWith(SpringRunner.class)@SpringBootTest(classes = Startup.class)public class RabbitDirectTest { @Autowired private AmqpTemplate rabbitTemplate; @Test public void sendHelloTest() { String context = "此消息在,默认的交换机模式队列下,有 helloReceiver 可以收到"; String routeKey = "hello"; context = "routeKey:" + routeKey + ",context:" + context; System.out.println("sendHelloTest : " + context); this.rabbitTemplate.convertAndSend(routeKey, context); } @Test public void sendDirectTest() { String context = "此消息在,默认的交换机模式队列下,有 DirectReceiver 可以收到"; String routeKey = "direct"; String exchange = "directExchange"; context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context; System.out.println("sendDirectTest : " + context); // 推荐使用 sendHello() 方法写法,这种方式在 Direct Exchange 多此一举,没必要这样写 this.rabbitTemplate.convertAndSend(exchange, routeKey, context); } }
按顺序执行:响应
接收者 helloReceiver,routeKey:hello,context:此消息在,默认的交换机模式队列下,有 helloReceiver 可以收到 接收者 DirectReceiver,context:directExchange,routeKey:direct,context:此消息在,默认的交换机模式队列下,有 DirectReceiver 可以收到
Fanout Exchange
任何发送到Fanout Exchange 的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。
1.可以理解为路由表的模式
2.这种模式不需要 RouteKey
3.这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。
4.如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。
配置队列
@Configurationpublic class RabbitFanoutConfig { final static String PENGLEI = "fanout.penglei.net"; final static String SOUYUNKU = "fanout.souyunku.com"; @Bean public Queue queuePenglei() { return new Queue(RabbitFanoutConfig.PENGLEI); } @Bean public Queue queueSouyunku() { return new Queue(RabbitFanoutConfig.SOUYUNKU); } /** * 任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有队列上。 */ @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeQueuePenglei(Queue queuePenglei, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queuePenglei).to(fanoutExchange); } @Bean Binding bindingExchangeQueueSouyunku(Queue queueSouyunku, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueSouyunku).to(fanoutExchange); } }
监听队列
@Component@RabbitListener(queues = "fanout.penglei.net")public class FanoutReceiver1 { @RabbitHandler public void process(String message) { System.out.println("接收者 FanoutReceiver1," + message); } }
@Component@RabbitListener(queues = "fanout.souyunku.com")public class FanoutReceiver2 { @RabbitHandler public void process(String message) { System.out.println("接收者 FanoutReceiver2," + message); } }
发送消息
package io.ymq.rabbitmq.test;import io.ymq.rabbitmq.run.Startup;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;/** * 描述: 广播模式或者订阅模式队列 * * @author: yanpenglei * @create: 2017/10/25 1:08 */@RunWith(SpringRunner.class)@SpringBootTest(classes = Startup.class)public class RabbitFanoutTest { @Autowired private AmqpTemplate rabbitTemplate; @Test public void sendPengleiTest() { String context = "此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到"; String routeKey = "topic.penglei.net"; String exchange = "fanoutExchange"; System.out.println("sendPengleiTest : " + context); context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context; this.rabbitTemplate.convertAndSend(exchange, routeKey, context); } @Test public void sendSouyunkuTest() { String context = "此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到"; String routeKey = "topic.souyunku.com"; String exchange = "fanoutExchange"; context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context; System.out.println("sendSouyunkuTest : " + context); this.rabbitTemplate.convertAndSend(exchange, routeKey, context); } }
按顺序执行:响应
接收者 FanoutReceiver1,context:fanoutExchange,routeKey:topic.penglei.net,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到 接收者 FanoutReceiver2,context:fanoutExchange,routeKey:topic.penglei.net,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到 接收者 FanoutReceiver2,context:fanoutExchange,routeKey:topic.souyunku.com,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到 接收者 FanoutReceiver1,context:fanoutExchange,routeKey:topic.souyunku.com,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到
Topic Exchange
任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上
1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个标题``(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。
2.这种模式需要RouteKey,也许要提前绑定Exchange与Queue。
3.在进行绑定时,要提供一个该队列关心的主题,如#.log.#表示该队列关心所有涉及log的消息(一个RouteKey为MQ.log.error的消息会被转发到该队列)。
4.#表示0个或若干个关键字,*表示一个关键字。如topic.*能与topic.warn匹配,无法与topic.warn.timeout匹配;但是topic.#能与上述两者匹配。
5.同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。
配置队列
@Configurationpublic class RabbitTopicConfig { final static String MESSAGE = "topic.message"; final static String MESSAGES = "topic.message.s"; final static String YMQ = "topic.ymq"; @Bean public Queue queueMessage() { return new Queue(RabbitTopicConfig.MESSAGE); } @Bean public Queue queueMessages() { return new Queue(RabbitTopicConfig.MESSAGES); } @Bean public Queue queueYmq() { return new Queue(RabbitTopicConfig.YMQ); } /** * 交换机(Exchange) 描述:接收消息并且转发到绑定的队列,交换机不存储消息 */ @Bean TopicExchange topicExchange() { return new TopicExchange("topicExchange"); } //綁定队列 queueMessages() 到 topicExchange 交换机,路由键只接受完全匹配 topic.message 的队列接受者可以收到消息 @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange topicExchange) { return BindingBuilder.bind(queueMessage).to(topicExchange).with("topic.message"); } //綁定队列 queueMessages() 到 topicExchange 交换机,路由键只要是以 topic.message 开头的队列接受者可以收到消息 @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange topicExchange) { return BindingBuilder.bind(queueMessages).to(topicExchange).with("topic.message.#"); } //綁定队列 queueYmq() 到 topicExchange 交换机,路由键只要是以 topic 开头的队列接受者可以收到消息 @Bean Binding bindingExchangeYmq(Queue queueYmq, TopicExchange topicExchange) { return BindingBuilder.bind(queueYmq).to(topicExchange).with("topic.#"); } }
监听队列
@Component@RabbitListener(queues = "topic.message")public class TopicReceiver1 { @RabbitHandler public void process(String message) { System.out.println("接收者 TopicReceiver1," + message); } }
@Component@RabbitListener(queues = "topic.message.s")public class TopicReceiver2 { @RabbitHandler public void process(String message) { System.out.println("接收者 TopicReceiver2," + message); } }
@Component@RabbitListener(queues = "topic.ymq")public class TopicReceiver3 { @RabbitHandler public void process(String message) { System.out.println("接收者 TopicReceiver3," + message); } }
发送消息
package io.ymq.rabbitmq.test;import io.ymq.rabbitmq.run.Startup;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;/** * 描述: 配置转发消息模式队列 * * @author: yanpenglei * @create: 2017/10/25 1:20 */@RunWith(SpringRunner.class)@SpringBootTest(classes = Startup.class)public class RabbitTopicTest { @Autowired private AmqpTemplate rabbitTemplate; @Test public void sendMessageTest() { String context = "此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到"; String routeKey = "topic.message"; String exchange = "topicExchange"; context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context; System.out.println("sendMessageTest : " + context); this.rabbitTemplate.convertAndSend(exchange, routeKey, context); } @Test public void sendMessagesTest() { String context = "此消息在,配置转发消息模式队列下,有 TopicReceiver2 TopicReceiver3 可以收到"; String routeKey = "topic.message.s"; String exchange = "topicExchange"; context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context; System.out.println("sendMessagesTest : " + context); this.rabbitTemplate.convertAndSend(exchange, routeKey, context); } @Test public void sendYmqTest() { String context = "此消息在,配置转发消息模式队列下,有 TopicReceiver3 可以收到"; String routeKey = "topic.ymq"; String exchange = "topicExchange"; context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context; System.out.println("sendYmqTest : " + context); this.rabbitTemplate.convertAndSend(exchange, routeKey, context); } }
按顺序执行:响应
接收者 TopicReceiver2,context:topicExchange,routeKey:topic.message,context:此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到 接收者 TopicReceiver1,context:topicExchange,routeKey:topic.message,context:此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到 接收者 TopicReceiver3,context:topicExchange,routeKey:topic.message,context:此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到 接收者 TopicReceiver3,context:topicExchange,routeKey:topic.message.s,context:此消息在,配置转发消息模式队列下,有 TopicReceiver2 TopicReceiver3 可以收到 接收者 TopicReceiver2,context:topicExchange,routeKey:topic.message.s,context:此消息在,配置转发消息模式队列下,有 TopicReceiver2 TopicReceiver3 可以收到 接收者 TopicReceiver3,context:topicExchange,routeKey:topic.ymq,context:此消息在,配置转发消息模式队列下,有 TopicReceiver3 可以收到
代码我已放到 Github ,导入spring-boot-rabbitmq 项目
github https://github.com/souyunku/spring-boot-examples/tree/master/spring-boot-rabbitmq
Contact
-
作者:鹏磊
-
出处:http://www.ymq.io/2017/10/26/rabbitmq-spring-boot-example
-
Email:admin@souyunku.com
-
版权归作者所有,转载请注明出处
-
Wechat:关注公众号,搜云库,专注于开发技术的研究与知识分享
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
MongoDB(一)简介与安装
1.mongodb简介 MongoDB是一个基于分布式文件存储的数据库。由C++语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案。 MongoDB是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的。他支持的数据结构非常松散,是类似json的bson格式,因此可以存储比较复杂的数据类型。Mongo最大的特点是他支持的查询语言非常强大,其语法有点类似于面向对象的查询语言,几乎可以实现类似关系数据库单表查询的绝大部分功能,而且还支持对数据建立索引。 它的特点是高性能、易部署、易使用,存储数据非常方便。 主要功能特性有: *面向集合存储,易存储对象类型的数据。 *模式自由。 *支持动态查询。 *支持完全索引,包含内部对象。 *支持查询。 *支持复制和故障恢复。 *使用高效的二进制数据存储,包括大型对象(如视频等)。 *自动处理碎片,以支持云计算层次的扩展性。 *支持RUBY,PYTHON,JAVA,C++,PHP,C#等多种语言。 *文件存储格式为BSON(一种JSON的扩展)。 *可通过网络访问。 2.mongodb安装 (1)解压mon...
- 下一篇
Docker-Swarm-下-NATS-集群的构建与使用
NATS 是一个开源的、轻量级的、高性能的分布式消息通信系统,实现了高可伸缩性和优雅的发布/订阅模型。 NATS 适合云基础设施的消息通信系统、物联网设备消息通信和微服务架构。 环境准备 三台服务器,建立 Docker Swarm 集群,一个 Manager,两个 Worker。 docker 版本:17-09 nats 版本:1.0.4 Nats 集群架构设计 搭建集群 1、【Manager】创建集群网络 dockernetworkcreate-doverlay--attachablemongo --attachable 允许其他容器加入此网络 2、创建 Nats 集群 2.1、【Manager】创建 stack.yml version:'3.3' services: nats1: image:registry.docker-cn.com/library/nats command:--clusternats://0.0.0.0:5222-routesnats://nats2:5222,nats://nats3:5222 ports: -4222 networks: -nats d...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Red5直播服务器,属于Java语言的直播服务器
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS8编译安装MySQL8.0.19
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS6,CentOS7官方镜像安装Oracle11G
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- CentOS8安装Docker,最新的服务器搭配容器使用