RabbitMQ入门与使用篇
介绍
RabbitMQ是一个由erlang开发的基于AMQP(Advanced Message Queue)协议的开源实现。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面都非常的优秀。是当前最主流的消息中间件之一。
- 概念:
- Brocker:消息队列服务器实体。
- Exchange:消息交换机,指定消息按什么规则,路由到哪个队列。
- Queue:消息队列,每个消息都会被投入到一个或者多个队列里。
- Binding:绑定,它的作用是把exchange和queue按照路由规则binding起来。
- Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
- Vhost:虚拟主机,一个broker里可以开设多个vhost,用作不用用户的权限分离。
- Producer:消息生产者,就是投递消息的程序。
- Consumer:消息消费者,就是接受消息的程序。
- Channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
- 消息队列的使用过程大概如下:
- 消息接收
- 客户端连接到消息队列服务器,打开一个channel。
- 客户端声明一个exchange,并设置相关属性。
- 客户端声明一个queue,并设置相关属性。
- 客户端使用routing key,在exchange和queue之间建立好绑定关系。
- 消息发布
- 客户端投递消息到exchange。
- exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
- 消息接收
- AMQP 里主要要说两个组件:
- Exchange 和 Queue
- 绿色的 X 就是 Exchange ,红色的是 Queue ,这两者都在 Server 端,又称作 Broker
- 这部分是 RabbitMQ 实现的,而蓝色的则是客户端,通常有 Producer 和 Consumer 两种类型。
- Exchange通常分为四种:
- fanout:该类型路由规则非常简单,会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,相当于广播功能
- direct:该类型路由规则会将消息路由到binding key与routing key完全匹配的Queue中
- topic:与direct类型相似,只是规则没有那么严格,可以模糊匹配和多条件匹配
- headers:该类型不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配
- 使用场景
下载与安装
管理工具
- 参考官方文档
操作起来很简单,只需要在DOS下面,进入安装目录(安装路径RabbitMQ Server abbitmq_server-3.2.2sbin
)执行如下命令就可以成功安装。
rabbitmq-plugins enable rabbitmq_management
可以通过访问:http://localhost:15672
进行测试,默认的登陆账号为:guest,密码为:guest。
" class="reference-link">
其他配置
1. 安装完以后erlang需要手动设置ERLANG_HOME 的系统变量。
set ERLANG_HOME=F:Program Fileserl9.0 #环境变量`path`里加入:%ERLANG_HOME%in #环境变量`path`里加入: 安装路径RabbitMQ Server abbitmq_server-3.6.10sbin
2.激活Rabbit MQ’s Management Plugin
使用Rabbit MQ 管理插件,可以更好的可视化方式查看Rabbit MQ 服务器实例的状态,你可以在命令行中使用下面的命令激活。
rabbitmq-plugins.bat enable rabbitmq_management
3.创建管理用户
rabbitmqctl.bat add_user sa 123456
4. 设置管理员
rabbitmqctl.bat set_user_tags sa administrator
5.设置权限
rabbitmqctl.bat set_permissions -p / sa ".*" ".*" ".*"
6. 其他命令
#查询用户: rabbitmqctl.bat list_users #查询vhosts: rabbitmqctl.bat list_vhosts #启动RabbitMQ服务: net stop RabbitMQ && net start RabbitMQ
以上这些,账号、vhost、权限、作用域等基本就设置完了。
基于.net使用
RabbitMQ.Client 是RabbiMQ 官方提供的的客户端
EasyNetQ 是基于RabbitMQ.Client 基础上封装的开源客户端,使用非常方便
以下操作RabbitMQ的代码例子,都是基于EasyNetQ的使用和再封装,在文章底部有demo例子的×××地址
创建 IBus
/// <summary> /// 消息服务器连接器 /// </summary> public class BusBuilder { public static IBus CreateMessageBus() { // 消息服务器连接字符串 // var connectionString = ConfigurationManager.ConnectionStrings["RabbitMQ"]; string connString = "host=127.0.0.1:5672;virtualHost=TestQueue;username=sa;password=123456"; if (connString == null || connString == string.Empty) throw new Exception("messageserver connection string is missing or empty"); return RabbitHutch.CreateBus(connString); } }
Fanout Exchange
所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。
Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。
所以,Fanout Exchange 转发消息是最快的。
/// <summary> /// 消息消耗(fanout) /// </summary> /// <typeparam name="T">消息类型</typeparam> /// <param name="handler">回调</param> /// <param name="exChangeName">交换器名</param> /// <param name="queueName">队列名</param> /// <param name="routingKey">路由名</param> public static void FanoutConsume<T>(Action<T> handler, string exChangeName = "fanout_mq", string queueName = "fanout_queue_default", string routingKey = "") where T : class { var bus = BusBuilder.CreateMessageBus(); var adbus = bus.Advanced; var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Fanout); var queue = CreateQueue(adbus, queueName); adbus.Bind(exchange, queue, routingKey); adbus.Consume(queue, registration => { registration.Add<T>((message, info) => { handler(message.Body); }); }); } /// <summary> /// 消息上报(fanout) /// </summary> /// <typeparam name="T">消息类型</typeparam> /// <param name="topic">主题名</param> /// <param name="t">消息命名</param> /// <param name="msg">错误信息</param> /// <returns></returns> public static bool FanoutPush<T>(T t, out string msg, string exChangeName = "fanout_mq", string routingKey = "") where T : class { msg = string.Empty; try { using (var bus = BusBuilder.CreateMessageBus()) { var adbus = bus.Advanced; var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Fanout); adbus.Publish(exchange, routingKey, false, new Message<T>(t)); return true; } } catch (Exception ex) { msg = ex.ToString(); return false; } }
所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。
Direct模式,可以使用RabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。
/// <summary> /// 消息发送(direct) /// </summary> /// <typeparam name="T">消息类型</typeparam> /// <param name="queue">发送到的队列</param> /// <param name="message">发送内容</param> public static void DirectSend<T>(string queue, T message) where T : class { using (var bus = BusBuilder.CreateMessageBus()) { bus.Send(queue, message); } } /// <summary> /// 消息接收(direct) /// </summary> /// <typeparam name="T">消息类型</typeparam> /// <param name="queue">接收的队列</param> /// <param name="callback">回调操作</param> /// <param name="msg">错误信息</param> /// <returns></returns> public static bool DirectReceive<T>(string queue, Action<T> callback, out string msg) where T : class { msg = string.Empty; try { var bus = BusBuilder.CreateMessageBus(); bus.Receive<T>(queue, callback); } catch (Exception ex) { msg = ex.ToString(); return false; } return true; } /// <summary> /// 消息发送 /// <![CDATA[(direct EasyNetQ高级API)]]> /// </summary> /// <typeparam name="T"></typeparam> /// <param name="t"></param> /// <param name="msg"></param> /// <param name="exChangeName"></param> /// <param name="routingKey"></param> /// <returns></returns> public static bool DirectPush<T>(T t, out string msg, string exChangeName = "direct_mq", string routingKey = "direct_rout_default") where T : class { msg = string.Empty; try { using (var bus = BusBuilder.CreateMessageBus()) { var adbus = bus.Advanced; var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Direct); adbus.Publish(exchange, routingKey, false, new Message<T>(t)); return true; } } catch (Exception ex) { msg = ex.ToString(); return false; } } /// <summary> /// 消息接收 /// <![CDATA[(direct EasyNetQ高级API)]]> /// </summary> /// <typeparam name="T">消息类型</typeparam> /// <param name="handler">回调</param> /// <param name="exChangeName">交换器名</param> /// <param name="queueName">队列名</param> /// <param name="routingKey">路由名</param> public static bool DirectConsume<T>(Action<T> handler, out string msg, string exChangeName = "direct_mq", string queueName = "direct_queue_default", string routingKey = "direct_rout_default") where T : class { msg = string.Empty; try { var bus = BusBuilder.CreateMessageBus(); var adbus = bus.Advanced; var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Direct); var queue = CreateQueue(adbus, queueName); adbus.Bind(exchange, queue, routingKey); adbus.Consume(queue, registration => { registration.Add<T>((message, info) => { handler(message.Body); }); }); } catch (Exception ex) { msg = ex.ToString(); return false; } return true; }
Topic Exchange
- 消息发布(Publish)
要使用主题发布,只需使用带有主题的重载的Publish方法:
var bus = RabbitHutch.CreateBus(...); bus.Publish(message, "X.A");
订阅者可以通过指定要匹配的主题来过滤邮件。
- 这些可以包括通配符:
- *=>匹配一个字。
- #=>匹配到零个或多个单词。
所以发布的主题为“X.A.2”的消息将匹配“#”,“X.#”,“ .A.”,而不是“X.B. *”或“A”。
警告,Publish只顾发送消息到队列,但是不管有没有消费端订阅,所以,发布之后,如果没有消费者,该消息将不会被消费甚至丢失。
- 消息订阅(Subscribe)
EasyNetQ提供了消息订阅,当调用Subscribe方法时候,EasyNetQ会创建一个用于接收消息的队列,不过与消息发布不同的是,消息订阅增加了一个参数,subscribe_id.代码如下:
bus.Subscribe("my_id", handler, x => x.WithTopic("X.*"));
警告: 具有相同订阅者但不同主题字符串的两个单独订阅可能不会产生您期望的效果。 subscriberId有效地标识个体AMQP队列。 具有相同subscriptionId的两个订阅者将连接到相同的队列,并且两者都将添加自己的主题绑定。 所以,例如,如果你这样做:
bus.Subscribe("my_id", handlerOfXDotStar, x => x.WithTopic("X.*")); bus.Subscribe("my_id", handlerOfStarDotB, x => x.WithTopic("*.B"));
匹配“x.”或“ .B”的所有消息将被传递到“XXX_my_id”队列。 然后,RabbitMQ将向两个消费者传递消息,其中handlerOfXDotStar和handlerOfStarDotB轮流获取每条消息。
现在,如果你想要匹配多个主题(“X. ”OR“ .B”),你可以使用另一个重载的订阅方法,它采用多个主题,如下所示:
bus.Subscribe("my_id", handler, x => x.WithTopic("X.*").WithTopic("*.B"));
/// <summary> /// 获取主题 /// </summary> /// <typeparam name="T">主题内容类型</typeparam> /// <param name="subscriptionId">订阅者ID</param> /// <param name="callback">消息接收响应回调</param> /// <param name="topics">订阅主题集合</param> public static void TopicSubscribe<T>(string subscriptionId, Action<T> callback, params string[] topics) where T : class { var bus = BusBuilder.CreateMessageBus(); bus.Subscribe(subscriptionId, callback, (config) => { foreach (var item in topics) config.WithTopic(item); }); } /// <summary> /// 发布主题 /// </summary> /// <typeparam name="T">主题内容类型</typeparam> /// <param name="topic">主题名称</param> /// <param name="message">主题内容</param> /// <param name="msg">错误信息</param> /// <returns></returns> public static bool TopicPublish<T>(string topic, T message, out string msg) where T : class { msg = string.Empty; try { using (var bus = BusBuilder.CreateMessageBus()) { bus.Publish(message, topic); return true; } } catch (Exception ex) { msg = ex.ToString(); return false; } } /// <summary> /// 发布主题 /// </summary> /// <![CDATA[(topic EasyNetQ高级API)]]> /// <typeparam name="T">消息类型</typeparam> /// <param name="t">消息内容</param> /// <param name="topic">主题名</param> /// <param name="msg">错误信息</param> /// <param name="exChangeName">交换器名</param> /// <returns></returns> public static bool TopicSub<T>(T t, string topic, out string msg, string exChangeName = "topic_mq") where T : class { msg = string.Empty; try { if (string.IsNullOrWhiteSpace(topic)) throw new Exception("推送主题不能为空"); using (var bus = BusBuilder.CreateMessageBus()) { var adbus = bus.Advanced; //var queue = adbus.QueueDeclare("user.notice.zhangsan"); var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Topic); adbus.Publish(exchange, topic, false, new Message<T>(t)); return true; } } catch (Exception ex) { msg = ex.ToString(); return false; } } /// <summary> /// 获取主题 /// </summary> /// <![CDATA[(topic EasyNetQ高级API)]]> /// <typeparam name="T">消息类型</typeparam> /// <param name="subscriptionId">订阅者ID</param> /// <param name="callback">回调</param> /// <param name="exChangeName">交换器名</param> /// <param name="topics">主题名</param> public static void TopicConsume<T>(Action<T> callback, string exChangeName = "topic_mq",string subscriptionId = "topic_subid", params string[] topics) where T : class { var bus = BusBuilder.CreateMessageBus(); var adbus = bus.Advanced; var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Topic); var queue = adbus.QueueDeclare(subscriptionId); foreach (var item in topics) adbus.Bind(exchange, queue, item); adbus.Consume(queue, registration => { registration.Add<T>((message, info) => { callback(message.Body); }); }); }
具体发布/订阅消息的Demo和相关测试看源码Demo
" class="reference-link">
注意
当在创建订阅者去消费队列的时候
/// <summary> /// 获取主题 /// </summary> /// <param name="topic"></param> public static void GetSub<T>(T topic, Action<T> callback) where T : class { using (var bus = BusBuilder.CreateMessageBus()) { bus.Subscribe<T>(topic.ToString(), callback, x => x.WithTopic(topic.ToString())); } }
using里的对象在执行完成后被回收了,导致刚连接上去就又断开了(刚开始写的时候,习惯性加using,排查了好久才发现,欲哭无泪)
源码项目运行前的准备与确认:
到RabbitMQ管理后台添加TestQueue
VHost,并且分配用户权限,然后到RabbitMQHelper.BusBuilder
类里配置RabbitMQ连接服务的相关信息host=127.0.0.1:5672;virtualHost=TestQueue;username=sa;password=123456
,(根据配置的内容和用户修改)
参考资料(鸣谢):
附:Demo源码GitHub地址
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
总有一种SQL执行计划绑定方式合适你
需要绑定SQL执行计划常见的几种情况: SQL执行计划突变,导致数据库性能下降,从历史执行计划找一个合理的,进行绑定。 SQL无法使用更优的执行计划,且无历史执行计划,可通过hint手工构造的方式,进行绑定。 某些Bug引起优化器生成较差的执行计划。在bug修复前,进行绑定。 ORACLE固定执行计划的3种方式: Oracle 9i使用outline (可跨版本10,11g均可使用) Oracle 10g使用sql profile (11g也可使用) Oracle 11g使用sql plan manage 接下来简述如何使用这3种方式进行执行计划的固定,并举例说明3种固定执行计划的优缺点,通过对比选择合适的固定执行计划来应对不同的业务场景。也就是什么场景下使用何种执行计划固定比较合适。 一、大纲(Stored Outline) 1、当SQL执行计划因新版本变更,统计信息不准确,新建索引,参数改变等发生改变时,存储大纲可以使SQL语句的执行计划保持不变。在创建某条语句的大纲时,ORACLE会将SQL语句的文本,执行计划和语句使用的hints存储在一个系统默认用户OUTLN的3个表O...
- 下一篇
PowerShell 发送美观的Vsphere DataStore警报
豆子今天登陆Vsphere VCenter的时候,无意中发现有DataStore的警报信息,个别DataStore的使用空间超过90%了,需要清空一下SAN Volume的Snapshot。这个是运维常见的问题,那么顺便就用PowerShell写个脚本,定期检查发送邮件好了。 脚本本身很容易,但是我想让他尽量的美观一些。 之前我写过一个博文可以自定义sytle的样式http://beanxyz.blog.51cto.com/5570417/1786712, 不过现在看起来有些麻烦,还是觉得找找如果有比较好看的现成的css文件可以直接调用就好了。 上网搜了搜 table有哪些现成的css模板,随便找了一个https://codepen.io/anon/pen/vJmLWL,看着还成 下载他的css下来 下载的css文件,保存在C:\tmp 目录 @importurl(https://fonts.googleapis.com/css?family=Roboto:400,500,700,300,100); body{ background-color:#3e94ec; font-family...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS关闭SELinux安全模块
- CentOS7设置SWAP分区,小内存服务器的救世主
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题