ActiveMQ (二)
今天继续给大家分享的是ActiveMQ,如有不足,敬请指教。
上次我们说到,我们发现消费者每次只能消费一条消息。当队列中有多条消息的时候,我们需要多次运行消费者,才能消费完这些消息。很麻烦!!!!如何解决这个问题呢?
这就需要使用ActiveMQ监听器来监听队列,持续消费消息。
一、ActiveMQ监听器
1.1 配置步骤说明
- 创建一个监听器对象。
- 修改消费者代码,加载监听器
1.2 配置步骤
1.2.1 创建监听器MyListener类
package com.xkt.listener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * @author lzx * */ public class MyListener implements MessageListener { @Override public void onMessage(Message message) { if (null != message) { if (message instanceof TextMessage) { try { TextMessage tMsg = (TextMessage) message; String content = tMsg.getText(); System.out.println("监听到的消息是 " + content); } catch (JMSException e) { e.printStackTrace(); } } } } }
1.2.2 修改MyConsumer代码,加载监听器
- 监听器需要持续加载,因此消费程序不能结束。这里我们使用输入流阻塞消费线程结束
package com.xkt.consumer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import com.xkt.listener.MyListener; /** * @author lzx * */ public class Myconsumer { private ConnectionFactory factory; private Connection connection; private Session session; private Destination destination; private MessageConsumer consumer; public void receiveFromMq() { try { factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616"); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建目的地, 目的地命名即队列命名, 消息消费者需要通过此命名访问对应的队列 destination = session.createQueue("queue"); // 5.创建消息消费者, 创建的消息消费者与某目的地对应, 即方法参数目的地 consumer = session.createConsumer(destination); // 7.加载监听器 consumer.setMessageListener(new MyListener()); // 监听器需要持续加载,这里我们使用输入流阻塞当前线程结束。监听指定队列,只要有消息进来,就消费这条消息 System.in.read(); // 在java项目中,可以通过IO阻塞程序,持续加载监听器 // 在web项目中,可以通过配置文件,直接加载监听器。 } catch (Exception e) { e.printStackTrace(); System.out.println("读取失败"); } finally { if (null != consumer) { try { consumer.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != session) { try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != connection) { try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } }
1.2.3 测试
- 多次运行生产者,发送多条消息到队列中
图示 |
---|
- 运行消费者。观察结果
图示 |
---|
- 查看ActiveMQ管理控制界面,所有消息都被消费了!
图示 |
---|
在以上示例中,只能向一个消费者发送消息。但是有一些场景,需求有多个消费者都能接收到消息,比如:美团APP每天的消息推送。该如何实现呢?
二、Topic模式实现
2.1 配置步骤说明
- 搭建ActiveMQ消息服务器。(略)
- 创建主题订阅者。
- 创建主题发布者。
2.2 配置步骤
2.2.1 创建主题订阅者MySubscriber
- 说明:主题订阅模式下,可以有多个订阅者。我们这里用多线程来模拟
package com.xkt.subscriber; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author lzx * */ public class MySubscriber implements Runnable { /** * 多线程的线程安全问题 解决方案: * * (1)加锁 --极不推荐 (2)不使用全局变量 ---> SpringMVC是线程安全的吗? 答:默认不是 解决办法:(1)使用原型模式--不推荐 * (2)不使用全局变量 (3)ThreadLocal (3)其它框架来代替,比如redis */ private TopicConnectionFactory factory; private TopicConnection connection; private TopicSession session; private Topic topic; private TopicSubscriber subscriber; private Message message; @Override public void run() { try { // 1、创建连接工厂 factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616"); // 2、创建连接 connection = factory.createTopicConnection(); connection.start(); // 3、创建会话 session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // 4、创建topic主题 topic = session.createTopic("topic-gzsxt"); // 5、创建订阅者 subscriber = session.createSubscriber(topic); // 6、订阅 while (true) { message = subscriber.receive(); if (null != message) { if (message instanceof TextMessage) { TextMessage tMsg = (TextMessage) message; String content = tMsg.getText(); System.out.println("订阅者: " + Thread.currentThread().getName() + " 接收的消息是:" + content); } } } } catch (JMSException e) { e.printStackTrace(); } } }
2.2.2 修改测试类
package com.xkt.test; import com.xkt.subscriber.MySubscriber; /** * @author lzx * */ public class TestMQ { public static void main(String[] args) { MySubscriber sub = new MySubscriber(); Thread t1 = new Thread(sub); Thread t2 = new Thread(sub); t1.start(); t2.start(); } }
2.2.3 查看测试结果
- 查看AcitveMQ管理界面 |图示 | | :------------: | |
|
2.2.4 创建主题发布者MyPublisher
package com.xkt.publish; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author lzx * */ public class MyPublisher { private TopicConnectionFactory factory; private TopicConnection connection; private TopicSession session; private Topic topic; private TopicPublisher publisher; private Message message; public void publish(String msg) { try { // 1、创建连接工厂 factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616"); // 2、创建连接 connection = factory.createTopicConnection(); connection.start(); // 3、创建会话 session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // 4、创建topic主题 topic = session.createTopic("topic-gzsxt"); // 5、创建发布者 publisher = session.createPublisher(topic); // 6、创建消息对象 message = session.createTextMessage(msg); // 7、发布消息 publisher.publish(message); } catch (Exception e) { e.printStackTrace(); } finally { if (null != publisher) { try { publisher.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != session) { try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != connection) { try { connection.stop(); connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != session) { try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } }
2.2.5 修改测试类
package com.xkt.test; import org.junit.Test; import com.xkt.publish.MyPublisher; import com.xkt.subscriber.MySubscriber; /** * @author lzx * */ public class TestMQ { public static void main(String[] args) { MySubscriber sub = new MySubscriber(); Thread t1 = new Thread(sub); Thread t2 = new Thread(sub); t1.start(); t2.start(); } @Test public void publish() { MyPublisher publisher = new MyPublisher(); publisher.publish("hello,欢迎收听FM 89.9频道-交通频道"); } }
2.2.6 查看测试结果
2.3 Topic小结
- Topic模式能够实现多个订阅者同时消费消息。
- Topic主题模式下,消息不会保存,只有在线的订阅者才会接收到消息。
- 通常可以用来解决公共消息推送的相关业务。
版权说明:欢迎以任何方式进行转载,但请在转载后注明出处!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Go Ticker资源泄露案例
前言 前面我们研究了Ticker的实现原理,已经知道Ticker如果不主动停止会有资源泄露的问题。 本节介绍一个真实的案例,重点分析产生资源泄露的现象以及排查思路。 应用背景 曾经做过一个产品,不经意间出现了CPU使用率缓慢升高,最后CPU使用率竟然达到了100%,严重影响了业务。经过排查,问题出在Ticker的使用方式上,创建了Ticker,在使用结束后没有释放导致的。 该产品需要监控其他服务器的健康状态,其中很常见的一种做法是心跳检测。简单的说,周期性的ping这些服务器,能在指定时间内收到ack说明与该服务器之间的网络没问题。 当时使用了一个小众的开源组件tatsushid/go-fastping来做ping。 该组件介绍如下图所示: 问题现象 在做性能测试时,管理了1000台服务器,差不多4天后发现系统越来越慢,查看CPU使用情况,结果发现CPU使用率已经达到100%。 排查性能问题主要使用pprof,关于pprof的使用方法及原理介绍在请参照相关章节。 使用pprof查看CPU使用情况,主要是查看CPU都在忙什么: 从上图可以看出,CPU主要是被runtime包占用了,其中第...
- 下一篇
Java的并发编程中的多线程问题到底是怎么回事儿?
在我之前的一篇《再有人问你Java内存模型是什么,就把这篇文章发给他。》文章中,介绍了Java内存模型,通过这篇文章,大家应该都知道了Java内存模型的概念以及作用,这篇文章中谈到,在Java并发编程中,通常会遇到三个问题,即原子性问题、一致性问题和有序性问题。 上面一篇文章简单介绍了一下,由于各种原因会导致多线程场景下可能存在原子性、一致性和有序性问题。但是并没有深入,这篇文章就来在之前的基础上,再来看一下,并发编程中,这些问题都是哪来的? 首先,我们还是从操作系统开始,先来了解一些基础知识。 CPU时间片 很多人都知道,现在我们用到操作系统,无论是Windows、Linux还是MacOS等其实都是多用户多任务分时操作系统。使用这些操作系统的“用户”是可以“同时”干多件事的,这已经是日常习惯了,并没觉得有什么特别。 但是实际上,对于单CPU的计算机来说,在CPU中,同一时间是只能干一件事儿的。 为了看起来像是“同时干多件事”,分时操作系统是把CPU的时间划分成长短基本相同的时间区间,即”时间片”,通过操作系统的管理,把这些时间片依次轮流地分配给各个“用户”使用。 如果某个“用户”在时...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2全家桶,快速入门学习开发网站教程
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,CentOS8安装Elasticsearch6.8.6
- 2048小游戏-低调大师作品
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- Windows10,CentOS7,CentOS8安装Nodejs环境
- CentOS7安装Docker,走上虚拟化容器引擎之路