ActiveMQ(13):ActiveMQ的集群
一、简介
1.1 消费者集群(Queue consumer clusters)
ActiveMQ支持Consumer对消息高可靠性的负载平衡消费,如果一个Consumer死掉,该消息会转发到其它的Consumer消费的Queue上。
如果一个Consumer获得消息比其它Consumer快,那么他将获得更多的消息。
因此推荐ActiveMQ的Broker和Client使用failover://transport的方式来配置链接。
1.2 Broker clusters
大部情况下是使用一系列的Broker和Client链接到一起。如果一个Broker死掉了,Client可以自动链接到其它Broker上。
实现以上行为需要用failover协议作为Client。
如果启动了多个Broker,Client可以使用static discover或者 Dynamic discovery容易的从一个broker到另一个broker直接链接。
这样当一个broker上没有Consumer的话,那么它的消息不会被消费的,然而该broker会通过存储和转发的策略来把该消息发到其它broker上。
特别注意:
ActiveMQ默认的两个broker,static链接后是单方向的,broker-A可以访问消费broker-B的消息,如果要支持双向通信,需要在
netWorkConnector配置的时候,设置duplex=true 就可以了。
操作,服务端搭建静态网络连接与消息回流
消息者1:
public void test1() throws Exception { ConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","tcp://192.168.175.13:61676"); Connection connection = cf.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); for (int i = 0; i < 1; i++) { MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage m = (TextMessage) message; try { System.out.println("===收到11111111:" + m.getText()); session.commit(); } catch (JMSException e) { e.printStackTrace(); } } }); } }
消息者2:
public void test1() throws Exception { ConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","tcp://192.168.175.13:61616"); Connection connection = cf.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); for (int i = 0; i < 1; i++) { MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage m = (TextMessage) message; try { System.out.println("===收到222222222:" + m.getText()); session.commit(); } catch (JMSException e) { e.printStackTrace(); } } }); } }
生产者:
public void test1() throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 30; i++) { TextMessage message = session.createTextMessage("message--" + i); Thread.sleep(1000); producer.send(message); } session.commit(); session.close(); connection.close(); }
效果:
二、主从节点的集群(Master Slave)
在5.9的版本里面,废除了Pure Master Slave的方式,目前支持:
1:Shared File System Master Slave:
基于共享储存的Master-Slave:多个broker实例使用一个存储文件,谁拿到文件锁就是master,其他处于待启动状态,如果master挂掉了,
某个抢到文件锁的slave变成master
2:JDBC Master Slave:基于JDBC的Master-Slave:使用同一个数据库,拿到LOCK表的写锁的broker成为master
3:Replicated LevelDB Store:基于ZooKeeper复制LevelDB存储的Master-Slave机制,这个是5.9新加的
具体的可以到官方察看: http://activemq.apache.org/masterslave.html
注意:这里可以不要静态连接与回流了
2.1 JDBC Master Slave的方式
2.1.1 简介
利用数据库作为数据源,采用Master/Slave模式,其中在启动的时候Master首先获得独有锁,其它Slaves Broker则等待获取独有锁。
推荐客户端使用Failover来链接Brokers。
具体如下图所示:
Master失败
如果Master失败,则它释放独有锁,其他Slaver则获取独有锁,其它Slaver立即获得独有锁后此时它将变成Master,并且启动所有的传输链接。
同时,Client将停止链接之前的Master和将会轮询链接到其他可以利用的Broker即新Master。如上中图所示
Master重启
任何时候去启动新的Broker,即作为新的Slave来加入集群,如上右图所示
2.1.2 JDBC Master Slave的配置
使用<jdbcPersistenceAdapter/>来配置消息的持久化,自动就会使用JDBC Master Slave的方式。
参考:ActiveMQ消息存储持久化 里的jdbc
去掉静态连接:参考ActiveMQ的静态网络链接
去掉回流:参考集群下的消息回流功能
注意:在配置JDBC时,注意配置useDatabaseLock="true",如下
<jdbcPersistenceAdapter dataSource="#mysql-ds" useDatabaseLock="true" />
必需设置,不然在保存数据时会报数据主键重复异常
2.1.3 测试
生产30个消息:
public void test1() throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 30; i++) { TextMessage message = session.createTextMessage("message--" + i); Thread.sleep(1000); producer.send(message); } session.commit(); session.close(); connection.close(); }
消费3个消息:
public void test1() throws Exception { ConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)"); Connection connection = cf.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); MessageConsumer consumer = session.createConsumer(destination); int i=0; while(i<3) { i++; TextMessage message = (TextMessage) consumer.receive(); session.commit(); System.out.println("收到消 息:" + message.getText()); } session.close(); connection.close(); }
然后关闭61616这台mq,进入到61676界面:
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
OpenNMS与智和网管平台开发性能评估
随着网络系统结构的日益庞大复杂,运维人员有必要使用一套网络管理系统来方便快捷的管理好网络。为了更好的管理各种设备,便于对网络中的故障进行排查,公司需要一款满足需求的网管软件。目前市面上的网管软件众多,这些软件经过适当的配置即可对通用设备进行管理。但这些通用的功能无法满足我们100%的需求,必须进行二次开发。出于公司所使用技术(Java)、市场占用率和功能全面性,选择了两款提供两次开发的平台,OpenNMS与智和网管平台,通过对两款产品进行全方位的对比,选择出一款适合的网管软件,在此基础上进行整合和二次开发,构建部门设备网管平台。 一、选型标准 1.功能性 网管通用的功能可以直接拿来使用,或者简单的改造,可以节约成本; 2.完备的扩展开发接口 被选择的开源软件是否已经提供完备的二次开发接口,满足二次开发的要求; 3.易于扩展的架构设计 网管平台的架构应该是易于进行二次开发的,或者在设计平台时,就考虑到了在平台基础上进行二次开发; 4.开发技术 选择网管平台的另外一个层面考虑是尽量考虑使用部门成员最熟悉的开发技术,尽量避免涉及相对部门来说的新技术、开发语言,这样可以进一步研发降低成本; 5...
- 下一篇
mysql-proxy代理加mysql主从实现读写分离
mysql-proxy代理加mysql主从实现读写分离 一:实验目标 MySQL Proxy(代理) 服务概述 MySQL Proxy 实现读写分离 MySQL Proxy+mysql主从实现读写分离 二:实验环境: 关闭防火墙 mysql-proxy服务端:xuegod62.cn IP:192.168.1.62 mysql服务器(主,负责写)服务端:xuegod63.cn IP:192.168.1.63 mysql服务器(从,负责读)客户端:xuegod64.cn IP:192.168.1.64 端口: mysql-proxy 默认端口:4040 三:实验概述 主从复制(Master-Slave)的方式来同步数据,再通过读写分离(MySQL-Proxy)来提升数据库的并发负载能力这样的方案来进行部署实施的。 工作原理: 1:MySQL Proxy概述 MySQL Proxy是一个处于你的client端和MySQL server端之间的简单程序,它可以监测、分析或改变它们的通信。它使用灵活,没有限制,常见的用途包括:负载平衡,故障、查询分析,查询过滤和修改等等。 MySQL P...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS6,CentOS7官方镜像安装Oracle11G
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS8编译安装MySQL8.0.19
- Red5直播服务器,属于Java语言的直播服务器
- CentOS7安装Docker,走上虚拟化容器引擎之路
- Linux系统CentOS6、CentOS7手动修改IP地址