ActiveMQ高可用+负载均衡集群之功能测试
1.基础功能测试
ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。JMS的全称是Java Message Service,即Java消息服务。它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。而消息的传递有两种类型,主要如下:
- 一种是点对点的,即一个生产者和一个消费者一一对应。
- 另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
ActiveMQ和JMS的消息类型对应如下
JMS消息模型 | P2P消息模型 | Pub/Sub消息模型 |
---|---|---|
ActiveMQ | Queue队列 | Topic队列 |
特点 | 一对一,生产者生产了一个消息,只能由一个消费者进行消费 | 一对多,生产者生产了一个消息,可以由多个消费者进行消费 |
接下来将对两种类型的场景进行分别验证。
1.1点对点模式(Queue)
点对点的模式主要建立在某个queue上,消息可以被同步或异步的发送和接收。点对点的消息模式可以有多个发送端,多个接收端,但是每个消息只会给一个Consumer传送一次。
1.1.1引入依赖
1.ActiveMQ依赖
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.12</version> </dependency>
2.Springboot-ActiveMQ连接池依赖
如果要启用连接池,且使用springboot2.0+及以下版本的时候,maven配置依赖是:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>
如果要启用连接池,且使用springboot2.1+的时候,maven配置依赖是:
<dependency> <groupId>org.messaginghub</groupId> <artifactId>pooled-jms</artifactId> </dependency>
1.1.2生产者发布Queue消息
由于ActiveMQ的客户端只能访问Master的Broker,其他处于Slave的Broker不能被访问,所以客户端连接Broker应该使用failover协议,具体代码如下。
String url="failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";//服务地址 String queueName = "queue-testqq";//要创建的消息名称 //1.创建ConnectiongFactory,绑定地址 ConnectionFactory factory = new ActiveMQConnectionFactory(url); //2.创建Connection Connection connection = factory.createConnection(); //3.启动连接 connection.start(); //4.创建会话 //第一个参数:是否开启事务 //第二个参数:消息是否自动确认 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建一个队列 Destination destination = session.createQueue(queueName); //6.创建一个生产者 MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 20; i++) { //7.创建消息 TextMessage textMessage = session.createTextMessage("我是Queue消息生产者:" + i); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //8.发送消息 producer.send(textMessage); System.out.println("发送第一组消息:" + i); } connection.close();
运行代码后,可在ActiveMQ控制台查看对应的Queue信息,此时有消息待接收。
1.1.3消费者接收Queue消息
Consumer 可以使用 MessageConsumer.receive() 同步地接收消息,也可以通过使用MessageConsumer.setMessageListener() 注册一个 MessageListener 实现异步接收。
这里使用异步接收的方式消费消息,具体代码如下:
String url="failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";//服务地址 String queueName="queue-testqq";//要消费的消息名称 //1.创建ConnectiongFactory,绑定地址 ConnectionFactory factory=new ActiveMQConnectionFactory(url); //2.创建Connection Connection connection= factory.createConnection(); //3.启动连接 connection.start(); //4.创建会话 /** 第一个参数,是否使用事务 如果设置true,操作消息队列后,必须使用 session.commit(); 如果设置false,操作消息队列后,不使用session.commit(); */ Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建一个目标 Destination destination=session.createQueue(queueName); //6.创建一个消费者 MessageConsumer consumer=session.createConsumer(destination); //7.创建一个监听器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message arg0) { TextMessage textMessage=(TextMessage)arg0; try { System.out.println("接收消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } });
执行代码后,可以在控制台如下输出:
在ActiveMQ控制台查看对应的Queue信息,此时消息已被消费。
1.1.4多消费者模式
同时开启2个以上的消费者,再次运行生产者,观察每个消费者控制台的输出。
观察后,得出结论:一条消息只会被一个消费者会接收消费,不可重复消费。同时还发现,多个消费者的情况下消息会被均分,即负载均衡策略。
生产者发送消息情况:
消费者1接收消息情况:
消费者2接收消息情况:
1.2发布/订阅模式(Topic)
Pub/Sub(发布/订阅,Publish/Subscribe)消息域使用topic作为Destination,发布者向topic发送消息,订阅者注册接收来自topic的消息。发送到topic的任何消息都将自动传递给所有订阅者。接收方式(同步和异步)与P2P域相同。
1.2.1生产者发布Topic消息
Pub/Sub模式与P2P模式在代码实现层面基本一样,变化的只有Queue与Topic。生产者发布Topic消息的具体代码如下:
String url="failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";//服务地址,端口默认61616 String topicName="topic-testqq";//要创建的消息名称 //1.创建ConnectiongFactory,绑定地址 ConnectionFactory factory=new ActiveMQConnectionFactory(url); //2.创建Connection Connection connection= factory.createConnection(); //3.启动连接 connection.start(); //4.创建会话 (参数1:是否启动事务,参数2:消息确认模式) Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建一个目标 Destination destination=session.createTopic(topicName); // Topic topic = session.createTopic(topicName); //6.创建一个生产者 MessageProducer producer=session.createProducer(destination); for (int i = 0; i < 15; i++) { //7.创建消息 TextMessage textMessage=session.createTextMessage("我是topic类型消息生产者:"+i); //8.发送消息 producer.send(textMessage); System.out.println("发送消息:"+i); } connection.close();
此时,消息生产者先不启动。Pub/Sub模式下必须先启动sub,否则在启动sub之前发布的消息是不能消费的,就像你今天开始订报纸,那今天之前的报纸你肯定是收不到了,发布/订阅模式与此同理。
1.2.2消费者接收Topic消息
与P2P模式相同,Pub/Sub模式的消息接收方式也有两种:同步接收和异步接收。这里采用异步接收的方式消费Topic消息,具体代码如下:
String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";//服务地址,端口默认61616 String topicName = "topic-testqq";//要创建的消息名称 //1.创建ConnectiongFactory,绑定地址 ConnectionFactory factory = new ActiveMQConnectionFactory(url); //2.创建Connection Connection connection = factory.createConnection(); //3.启动连接 connection.start(); //4.创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建一个目标 Destination destination = session.createTopic(topicName); //6.创建一个消费者 MessageConsumer consumer = session.createConsumer(destination); //7.接收消息,可选择同步接收或者异步接收 /*//消费者同步接收,receive(long timeout)主线程阻塞式等待下一个消息到来,可设置超时时间,超时则返回null。 TextMessage message = (TextMessage) consumer.receive(1000); System.out.println("同步接收Topic消息: " + message);*/ //消费者异步接收,创建一个监听器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message arg0) { TextMessage textMessage = (TextMessage) arg0; try { System.out.println("异步接收Topic消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } });
运行代码,可以看到控制台输出已消费Topic消息。
ActiveMQ控制台也可以看到对应的Topic发布、订阅信息。如下
1.2.3多消费者模式
同时开启2个以上的消费者,再次运行生产者,观察每个消费者控制台的输出.
两个消费者运行情况如下:
可以发现:
- topic发布/订阅模式,一个消息可以被多个消费者消费
- topic发布/订阅模式要求消费者必须即时消费,即生产者发布消息时,消费者必须同时在线才可接收消费消息。
2.高可用测试
2.1测试方案一
2.1.1测试用例
1.生产者连接集群发送50条消息并设置每发送一条消息,sleep1秒
2.观察生产者发送消息所连接的节点,并将所在的节点停掉
3.观察生产者发送消息日志,查看所有消息是不是正常发送
2.1.2测试代码
1.生产者发布消息
String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)"; String queueName = "queue-testqq";//要创建的消息名称 //1.创建ConnectiongFactory,绑定地址 ConnectionFactory factory = new ActiveMQConnectionFactory(url); //2.创建Connection Connection connection = factory.createConnection(); //3.启动连接 connection.start(); //4.创建会话 //第一个参数:是否开启事务 //第二个参数:消息是否自动确认 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建一个目标 Destination destination = session.createQueue(queueName); // Queue queue = session.createQueue("test-Queue"); //6.创建一个生产者 MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 50; i++) { //7.创建消息 TextMessage textMessage = session.createTextMessage("我是第一组消息生产者:" + i); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //8.发送消息 producer.send(textMessage); System.out.println("发送第一组消息:" + i); } connection.close();
2.1.3测试过程
1.运行生产者发布消息,观察生产者控制台
2.停止mq1(61616)节点
3.继续观察生产者控制台
(1)此时61616节点已无法连接
(2)生产者已成功连接61619节点,并继续发送消息
2.2测试方案二
2.2.1测试用例
1.生产者连接集群发送20条消息
2.观察生产者发送消息所连接的节点,并将所在的节点停掉
3.消费者连接集群消费消息,观察消息消费情况
2.2.2测试代码
1.生产者发布消息
String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)"; String queueName = "queue-testqq";//要创建的消息名称 //1.创建ConnectiongFactory,绑定地址 ConnectionFactory factory = new ActiveMQConnectionFactory(url); //2.创建Connection Connection connection = factory.createConnection(); //3.启动连接 connection.start(); //4.创建会话 //第一个参数:是否开启事务 //第二个参数:消息是否自动确认 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建一个目标 Destination destination = session.createQueue(queueName); // Queue queue = session.createQueue("test-Queue"); //6.创建一个生产者 MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 20; i++) { //7.创建消息 TextMessage textMessage = session.createTextMessage("我是第一组消息生产者:" + i); //8.发送消息 producer.send(textMessage); System.out.println("发送第一组消息:" + i); } connection.close();
2.消费者接收消息
String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)"; String queueName = "queue-testqq";//要创建的消息名称 //1.创建ConnectiongFactory,绑定地址 ConnectionFactory factory = new ActiveMQConnectionFactory(url); //2.创建Connection Connection connection = factory.createConnection(); //3.启动连接 connection.start(); //4.创建会话 /** 第一个参数,是否使用事务 如果设置true,操作消息队列后,必须使用 session.commit(); 如果设置false,操作消息队列后,不使用session.commit(); */ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建一个目标 Destination destination = session.createQueue(queueName); //6.创建一个消费者 MessageConsumer consumer = session.createConsumer(destination); //7.创建一个监听器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message arg0) { TextMessage textMessage = (TextMessage) arg0; try { System.out.println("接收消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } });
2.2.3测试过程
1.运行生产者发布消息,观察生产者控制台
2.停止mq4(61619)节点
3.启动消费者,观察消费情况
观察发现,消费者连接61618节点,并成功接收消费消息。
2.3测试结论
经测试,当前集群在启动消息生产者发送消息时,使生产者所在节点宕机的情况下,得出如下结论:
1.高可用架构的ActiveMQ集群,在生产消息的过程中生产者所在节点挂掉,客户端会暂时阻塞无法发送消息,但整体可用性不受影响。
2.高可用架构的ActiveMQ集群,在消息生产者所在节点挂掉后,消费者仍可正常消费消息
3.当前ActiveMQ集群若其中一个节点挂掉,ActiveMQ正常提供服务,不影响服务可用性
3.负载均衡测试
最终的架构就是两个master-slave集群相互连通,两个集群可以相互消费对方的消息,但是如果客户端所连接的集群挂掉客户端依然是不能发送消息的,也就是说activemq的负载均衡只是做到消费的负载均衡,高可用是靠master-slave来保证的。
3.1测试用例
1.启动两个消费者监听相同的queue,且服务地址均配置集群所有节点
2.生产者连接集群向指定的queue连续发送20条消息
3.观察两个生产者消费消息的日志
3.2测试代码
1.生产者发布消息
String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)"; String queueName = "queue-testqq";//要创建的消息名 //1.创建ConnectiongFactory,绑定地址 ConnectionFactory factory = new ActiveMQConnectionFactory(url); //2.创建Connection Connection connection = factory.createConnection(); //3.启动连接 connection.start(); //4.创建会话 //第一个参数:是否开启事务 //第二个参数:消息是否自动确认 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建一个目标 Destination destination = session.createQueue(queueName); //Queue queue = session.createQueue("test-Queue"); //6.创建一个生产者 MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 20; i++) { //7.创建消息 TextMessage textMessage = session.createTextMessage("我是第一组消息生产者:" + i); //8.发送消息 producer.send(textMessage); System.out.println("发送第一组消息:" + i); } connection.close();
2.消费者接收消费消息
String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618," + "tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)"; String queueName = "queue-testqq";//要创建的消息名称 //1.创建ConnectiongFactory,绑定地址 ConnectionFactory factory = new ActiveMQConnectionFactory(url); //2.创建Connection Connection connection = factory.createConnection(); //3.启动连接 connection.start(); //4.创建会话 /** 第一个参数,是否使用事务 如果设置true,操作消息队列后,必须使用 session.commit(); 如果设置false,操作消息队列后,不使用session.commit(); */ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建一个目标 Destination destination = session.createQueue(queueName); //6.创建一个消费者 MessageConsumer consumer = session.createConsumer(destination); //7.创建一个监听器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message arg0) { TextMessage textMessage = (TextMessage) arg0; try { /* if (textMessage.getText().contains("10")) { System.out.println("======消息异常======="); throw new Exception(); }*/ System.out.println("接收消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } });
3.3测试过程
- 同时开启2个消费者,再运行生产者,观察每个消费者控制台的输出。
- 再运行生产者,观察每个消费者控制台的输出。
生产者发布消息情况:
消费者1接收消息情况:
消费者2接收消息情况:
观察发现,多个消费者的情况下消息会被均分,即负载均衡策略。且同一条消息只会被一个消费者会接收消费。
3.4测试结论
经测试,当前集群在多个消费者消费相同队列的情况下,可以实现消息消费的负载均衡,从而实现ActiveMQ集群的分流,提高集群吞吐率。
到这里,ActiveMQ集群的功能测试、高可用测试及负载均衡测试已完成,当前ActiveMQ集群高可用+负载均衡功能正常。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
基于X-Engine引擎的实时历史数据库解决方案揭秘
实时历史库需求背景 在当今的数字化时代,随着业务的迅速发展,每天产生的数据量会是一个惊人的数量,数据库存储的成本将会越来越大,通常的做法是对历史数据做归档,即将长期不使用的数据迁移至以文件形式存储的廉价存储设备上,比如阿里云OSS或者阿里云数据库DBS服务。 然而在部分核心业务的应用场景下,针对几个月甚至几年前的“旧”数据依旧存在实时的,低频的查询甚至更新需求,比如淘宝/天猫的历史订单查询,企业级办公软件钉钉几年前的聊天信息查询,菜鸟海量物流的历史物流订单详情等。 • 如果这时从历史备份中还原后查询,那么查询时间将会是以天为单位,可接受度为0 • 如果将这些低频但实时的查询需求的历史数据与近期活跃存储在同一套分布式数据库集群下,那么又会带来以下两大挑战 存储成本巨大,进而导致成本远大于收益,比如钉钉聊天信息数据量在高度压缩后接近50PB,很
- 下一篇
用了这么多年的 Java 泛型,你对它到底有多了解?
云栖号资讯:【点击查看更多行业资讯】在这里您可以找到不同行业的第一手的上云资讯,还在等什么,快来! 作为一个 Java 程序员,日常编程早就离不开泛型。泛型自从 JDK1.5 引进之后,真的非常提高生产力。一个简单的泛型 T,寥寥几行代码, 就可以让我们在使用过程中动态替换成任何想要的类型,再也不用实现繁琐的类型转换方法。 虽然我们每天都在用,但是还有很多同学可能并不了解其中的实现原理。今天这篇我们从以下几点聊聊 Java 泛型: Java 泛型实现方式 类型擦除带来的缺陷 Java 泛型发展史 Java 泛型实现方式 Java 采用类型擦除(Type erasure generics)的方式实现泛型。用大白话讲就是这个泛型只存在源码中,编译器将源码编译成字节码之时,就会把泛型『擦除』,所以字节码中并不存在泛型。 对于下面这段代码,编译之后,我们使用 javap -s class 查看字节码。 观察setParam 部分的字节码,从 descriptor 可以看到,泛型 T 已被擦除,最终替换成了 Object。 ps:并不是每一个泛型参数被擦除类型后都会变成 Object 类,如果泛...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Linux系统CentOS6、CentOS7手动修改IP地址
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7安装Docker,走上虚拟化容器引擎之路