深入解读RabbitMQ工作原理及简单使用
RabbitMQ系列目录
1、RabbitMQ在Ubuntu上的环境搭建
2、深入解读RabbitMQ工作原理及简单使用
3、Rabbit的几种工作模式介绍与实践
4、Rabbit事务与消息确认
5、Rabbit集群搭建
6、使用HAProxy为RabbitMQ搭建负载均衡
7、REST API控制Rabbit
RabbitMQ简介
在介绍RabbitMQ之前实现要介绍一下MQ,MQ是什么?
MQ全称是Message Queue,可以理解为消息队列的意思,简单来说就是消息以管道的方式进行传递。
RabbitMQ是一个实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的消息队列服务,用Erlang语言的。
使用场景
在我们秒杀抢购商品的时候,系统会提醒我们稍等排队中,而不是像几年前一样页面卡死或报错给用户。
像这种排队结算就用到了消息队列机制,放入通道里面一个一个结算处理,而不是某个时间断突然涌入大批量的查询新增把数据库给搞宕机,所以RabbitMQ本质上起到的作用就是削峰填谷,为业务保驾护航。
为什么选择RabbitMQ
现在的市面上有很多MQ可以选择,比如ActiveMQ、ZeroMQ、Appche Qpid,那问题来了为什么要选择RabbitMQ?
1、除了Qpid,RabbitMQ是唯一一个实现了AMQP标准的消息服务器;
2、可靠性,RabbitMQ的持久化支持,保证了消息的稳定性;
3、高并发,RabbitMQ使用了Erlang开发语言,Erlang是为电话交换机开发的语言,天生自带高并发光环,和高可用特性;
4、集群部署简单,正是应为Erlang使得RabbitMQ集群部署变的超级简单;
5、社区活跃度高,根据网上资料来看,RabbitMQ也是首选;
工作机制
生产者、消费者和代理
在了解消息通讯之前首先要了解3个概念:生产者、消费者和代理。
生产者:消息的创建者,负责创建和推送数据到消息服务器;
消费者:消息的接收方,用于处理数据和确认消息;
代理:就是RabbitMQ本身,用于扮演“快递”的角色,本身不生产消息,只是扮演“快递”的角色。
消息发送原理
首先你必须连接到Rabbit才能发布和消费消息,那怎么连接和发送消息的呢?
你的应用程序和Rabbit Server之间会创建一个TCP连接,一旦TCP打开,并通过了认证,认证就是你试图连接Rabbit之前发送的Rabbit服务器连接信息和用户名和密码,有点像程序连接数据库,使用Java有两种连接认证的方式,后面代码会详细介绍,一旦认证通过你的应用程序和Rabbit就创建了一条AMQP信道(Channel)。
信道是创建在“真实”TCP上的虚拟连接,AMQP命令都是通过信道发送出去的,每个信道都会有一个唯一的ID,不论是发布消息,订阅队列或者介绍消息都是通过信道完成的。
为什么不通过TCP直接发送命令?
对于操作系统来说创建和销毁TCP会话是非常昂贵的开销,假设高峰期每秒有成千上万条连接,每个连接都要创建一条TCP会话,这就造成了TCP连接的巨大浪费,而且操作系统每秒能创建的TCP也是有限的,因此很快就会遇到系统瓶颈。
如果我们每个请求都使用一条TCP连接,既满足了性能的需要,又能确保每个连接的私密性,这就是引入信道概念的原因。
你必须知道的Rabbit
想要真正的了解Rabbit有些名词是你必须知道的。
包括:ConnectionFactory(连接管理器)、Channel(信道)、Exchange(交换器)、Queue(队列)、RoutingKey(路由键)、BindingKey(绑定键)。
ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用;
Channel(信道):消息推送使用的通道;
Exchange(交换器):用于接受、分配消息;
Queue(队列):用于存储生产者的消息;
RoutingKey(路由键):用于把生成者的数据分配到交换器上;
BindingKey(绑定键):用于把交换器的消息绑定到队列上;
看到上面的解释,最难理解的路由键和绑定键了,那么他们具体怎么发挥作用的,请看下图:
消息持久化
Rabbit队列和交换器有一个不可告人的秘密,就是默认情况下重启服务器会导致消息丢失,那么怎么保证Rabbit在重启的时候不丢失呢?答案就是消息持久化。
当你把消息发送到Rabbit服务器的时候,你需要选择你是否要进行持久化,但这并不能保证Rabbit能从崩溃中恢复,想要Rabbit消息能恢复必须满足3个条件:
1、投递消息的时候durable设置为true,消息持久化;
2、消息已经到达持久化交换器上;
3、消息已经到达持久化的队列;
持久化工作原理
Rabbit会将你的持久化消息写入磁盘上的持久化日志文件,等消息被消费之后,Rabbit会把这条消息标识为等待垃圾回收。
持久化的缺点
消息持久化的优点显而易见,但缺点也很明显,那就是性能,因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量,尽管使用SSD硬盘可以使事情得到缓解,但他仍然吸干了Rabbit的性能,当消息成千上万条要写入磁盘的时候,性能是很低的。
所以使用者要根据自己的情况,选择适合自己的方式。
虚拟主机
每个Rabbit都能创建很多vhost,我们称之为虚拟主机,每个虚拟主机其实都是mini版的RabbitMQ,拥有自己的队列,交换器和绑定,拥有自己的权限机制。
环境搭建
前文我们已经介绍了Ubuntu搭建RabbitMQ的步骤:RabbitMQ在Ubuntu上的环境搭建
如果你是在Windows10上去安装那就更简单了,先放下载地址:
Erlang/Rabbit Server百度网盘链接:https://pan.baidu.com/s/1TnKDV-ZuXLiIgyK8c8f9dg 密码:wct9
当然也可去Erlang和Rabbit官网去下,就是速度比较慢。我的百度云Rabbit最新版本:3.7.6,Erlang版本:20.2,注意:不要下载最新的Erlang,在Windows10上打开扩展插件有问题,打不开。
1、安装Erlang;
2、安装Rabbit Server;
3、进入安装目录\sbin下,使用命令“rabbitmq-plugins enable rabbitmq_management”启动网页管理插件;
4、重启Rabbit服务;
使用:http://localhost:15672进行测试,默认的登陆账号为:guest,密码为:guest
重复安装Rabbit Server的坑
如果不是第一次在Windows上安装Rabbit Server一定要把Rabbit和Erlang卸载干净之后,找到注册表:HKEY_LOCAL_MACHINE\SOFTWARE\Ericsson\Erlang\ErlSrv 删除其下的所有项。
不然会出现Rabbit安装之后启动不了的情况,理论上卸载的顺序也是先Rabbit在Erlang。
代码实现
java版实现,使用maven项目,创建可以查看:MyEclipse2017破解设置与maven项目搭建
项目创建成功之后,添加Rabbit Client jar包,只需要在pom.xml里面配置,如下信息:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.7.0</version> </dependency>
java实现代码分为两个类,第一个是创建Rabbit连接,第二是应用类使用最简单的方式发布和消费消息。
Rabbit的连接,两种方式:
方式一:
public static Connection GetRabbitConnection() { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(Config.UserName); factory.setPassword(Config.Password); factory.setVirtualHost(Config.VHost); factory.setHost(Config.Host); factory.setPort(Config.Port); Connection conn = null; try { conn = factory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return conn; }
方式二:
public static Connection GetRabbitConnection2() { ConnectionFactory factory = new ConnectionFactory(); // 连接格式:amqp://userName:password@hostName:portNumber/virtualHost String uri = String.format("amqp://%s:%s@%s:%d%s", Config.UserName, Config.Password, Config.Host, Config.Port, Config.VHost); Connection conn = null; try { factory.setUri(uri); factory.setVirtualHost(Config.VHost); conn = factory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return conn; }
第二部分:应用类,使用最简单的方式发布和消费消息
public static void main(String[] args) { Publisher(); // 推送消息 Consumer(); // 消费消息 } /** * 推送消息 */ public static void Publisher() { // 创建一个连接 Connection conn = ConnectionFactoryUtil.GetRabbitConnection(); if (conn != null) { try { // 创建通道 Channel channel = conn.createChannel(); // 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】 channel.queueDeclare(Config.QueueName, false, false, false, null); String content = String.format("当前时间:%s", new Date().getTime()); // 发送内容【参数说明:参数一:交换机名称;参数二:队列名称,参数三:消息的其他属性;参数四:消息主体】 channel.basicPublish("", Config.QueueName, null, content.getBytes("UTF-8")); System.out.println("已发送消息:" + content); // 关闭连接 channel.close(); conn.close(); } catch (Exception e) { e.printStackTrace(); } } } /** * 消费消息 */ public static void Consumer() { // 创建一个连接 Connection conn = ConnectionFactoryUtil.GetRabbitConnection(); if (conn != null) { try { // 创建通道 Channel channel = conn.createChannel(); // 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】 channel.queueDeclare(Config.QueueName, false, false, false, null); // 创建订阅器,并接受消息 channel.basicConsume(Config.QueueName, false, "", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); // 队列名称 String contentType = properties.getContentType(); // 内容类型 String content = new String(body, "utf-8"); // 消息正文 System.out.println("消息正文:" + content); channel.basicAck(envelope.getDeliveryTag(), false); // 手动确认消息【参数说明:参数一:该消息的index;参数二:是否批量应答,true批量确认小于index的消息】 } }); } catch (Exception e) { e.printStackTrace(); } } }
代码里面已经写了很详细的注释,在这里也不过多的介绍了。
执行效果,如图:
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
zabbix实战--mysql监控详解
视频资源:http://www.roncoo.com/course/view/fb3050a5b34b42f39ccad83ebebc89c1 龙果开源运维项目地址:https://github.com/roncoo/roncoo-cmdb 一、监控采集依据:主要基于show global status对数据进行采集: 二、对用户进行授权,然后使用show global status进行采集分析。 mysql -uroot -p”xxxx” -e "show global status" ###查看所有的值 监控项注释: Aborted_clients ##客户端不能正常连接,失败的连接数量。 Aborted_connects ##客户端中断数量,可能有恶意连接。 ###吞吐量 Bytes_received ##从所有客户端接收到的字节数。 Bytes_sent ##发送给所有客户端的字节数。 ###com admin 语句执行数量 Com_commit ##统计提交语句次数 com_delete ##统计删除语句 com_delete_multi ##最小 com_inse...
- 下一篇
深入理解 Java 动态代理机制
Java 有两种代理方式,一种是静态代理,另一种是动态代理。对于静态代理,其实就是通过依赖注入,对对象进行封装,不让外部知道实现的细节。很多 API 就是通过这种形式来封装的。 代理模式结构图(图片来自《大话设计模式》) 下面看下两者在概念上的解释: 静态代理 静态代理类:由程序员创建或者由第三方工具生成,再进行编译;在程序运行之前,代理类的.class文件已经存在了。 静态代理类通常只代理一个类。 静态代理事先知道要代理的是什么。 动态代理 动态代理类:在程序运行时,通过反射机制动态生成。 动态代理类通常代理接口下的所有类。 动态代理事先不知道要代理的是什么,只有在运行的时候才能确定。 动态代理的调用处理程序必须事先InvocationHandler接口,及使用Proxy类中的newProxyInstance方法动态的创建代理类。 Java动态代理只能代理接口,要代理类需要使用第三方的CLIGB等类库。 动态代理的好处 Java动态代理的优势是实现无侵入式的代码扩展,也就是方法的增强;让你可以在不用修改源码的情况下,增强一些方法;在方法的前后你可以做你任何想做的事情(甚至不去执行这个...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Windows10,CentOS7,CentOS8安装Nodejs环境
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- Red5直播服务器,属于Java语言的直播服务器
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- CentOS8安装Docker,最新的服务器搭配容器使用
- Docker快速安装Oracle11G,搭建oracle11g学习环境