您现在的位置是:首页 > 文章详情

ActiveMQ (二)

日期:2019-05-06点击:435


今天继续给大家分享的是ActiveMQ,如有不足,敬请指教。

上次我们说到,我们发现消费者每次只能消费一条消息。当队列中有多条消息的时候,我们需要多次运行消费者,才能消费完这些消息。很麻烦!!!!如何解决这个问题呢?

这就需要使用ActiveMQ监听器来监听队列,持续消费消息。

一、ActiveMQ监听器

1.1 配置步骤说明

  • 创建一个监听器对象。
  • 修改消费者代码,加载监听器

1.2 配置步骤

1.2.1 创建监听器MyListener类

package com.xkt.listener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * @author lzx * */ public class MyListener implements MessageListener { @Override public void onMessage(Message message) { if (null != message) { if (message instanceof TextMessage) { try { TextMessage tMsg = (TextMessage) message; String content = tMsg.getText(); System.out.println("监听到的消息是 " + content); } catch (JMSException e) { e.printStackTrace(); } } } } } 

1.2.2 修改MyConsumer代码,加载监听器

  • 监听器需要持续加载,因此消费程序不能结束。这里我们使用输入流阻塞消费线程结束
package com.xkt.consumer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import com.xkt.listener.MyListener; /** * @author lzx * */ public class Myconsumer { private ConnectionFactory factory; private Connection connection; private Session session; private Destination destination; private MessageConsumer consumer; public void receiveFromMq() { try { factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616"); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建目的地, 目的地命名即队列命名, 消息消费者需要通过此命名访问对应的队列 destination = session.createQueue("queue"); // 5.创建消息消费者, 创建的消息消费者与某目的地对应, 即方法参数目的地 consumer = session.createConsumer(destination); // 7.加载监听器 consumer.setMessageListener(new MyListener()); // 监听器需要持续加载,这里我们使用输入流阻塞当前线程结束。监听指定队列,只要有消息进来,就消费这条消息 System.in.read(); // 在java项目中,可以通过IO阻塞程序,持续加载监听器 // 在web项目中,可以通过配置文件,直接加载监听器。 } catch (Exception e) { e.printStackTrace(); System.out.println("读取失败"); } finally { if (null != consumer) { try { consumer.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != session) { try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != connection) { try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } } 

1.2.3 测试

  • 多次运行生产者,发送多条消息到队列中
图示
  • 运行消费者。观察结果
图示
  • 查看ActiveMQ管理控制界面,所有消息都被消费了!
图示

在以上示例中,只能向一个消费者发送消息。但是有一些场景,需求有多个消费者都能接收到消息,比如:美团APP每天的消息推送。该如何实现呢?

二、Topic模式实现

2.1 配置步骤说明

  1. 搭建ActiveMQ消息服务器。(略)
  2. 创建主题订阅者。
  3. 创建主题发布者。

2.2 配置步骤

2.2.1 创建主题订阅者MySubscriber

  • 说明:主题订阅模式下,可以有多个订阅者。我们这里用多线程来模拟
package com.xkt.subscriber; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author lzx * */ public class MySubscriber implements Runnable { /** * 多线程的线程安全问题 解决方案: * * (1)加锁 --极不推荐 (2)不使用全局变量 ---> SpringMVC是线程安全的吗? 答:默认不是 解决办法:(1)使用原型模式--不推荐 * (2)不使用全局变量 (3)ThreadLocal (3)其它框架来代替,比如redis */ private TopicConnectionFactory factory; private TopicConnection connection; private TopicSession session; private Topic topic; private TopicSubscriber subscriber; private Message message; @Override public void run() { try { // 1、创建连接工厂 factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616"); // 2、创建连接 connection = factory.createTopicConnection(); connection.start(); // 3、创建会话 session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // 4、创建topic主题 topic = session.createTopic("topic-gzsxt"); // 5、创建订阅者 subscriber = session.createSubscriber(topic); // 6、订阅 while (true) { message = subscriber.receive(); if (null != message) { if (message instanceof TextMessage) { TextMessage tMsg = (TextMessage) message; String content = tMsg.getText(); System.out.println("订阅者: " + Thread.currentThread().getName() + " 接收的消息是:" + content); } } } } catch (JMSException e) { e.printStackTrace(); } } } 

2.2.2 修改测试类

package com.xkt.test; import com.xkt.subscriber.MySubscriber; /** * @author lzx * */ public class TestMQ { public static void main(String[] args) { MySubscriber sub = new MySubscriber(); Thread t1 = new Thread(sub); Thread t2 = new Thread(sub); t1.start(); t2.start(); } } 

2.2.3 查看测试结果

  • 查看AcitveMQ管理界面 |图示 | | :------------: | | |

2.2.4 创建主题发布者MyPublisher

package com.xkt.publish; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author lzx * */ public class MyPublisher { private TopicConnectionFactory factory; private TopicConnection connection; private TopicSession session; private Topic topic; private TopicPublisher publisher; private Message message; public void publish(String msg) { try { // 1、创建连接工厂 factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616"); // 2、创建连接 connection = factory.createTopicConnection(); connection.start(); // 3、创建会话 session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // 4、创建topic主题 topic = session.createTopic("topic-gzsxt"); // 5、创建发布者 publisher = session.createPublisher(topic); // 6、创建消息对象 message = session.createTextMessage(msg); // 7、发布消息 publisher.publish(message); } catch (Exception e) { e.printStackTrace(); } finally { if (null != publisher) { try { publisher.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != session) { try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != connection) { try { connection.stop(); connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != session) { try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } } 

2.2.5 修改测试类

package com.xkt.test; import org.junit.Test; import com.xkt.publish.MyPublisher; import com.xkt.subscriber.MySubscriber; /** * @author lzx * */ public class TestMQ { public static void main(String[] args) { MySubscriber sub = new MySubscriber(); Thread t1 = new Thread(sub); Thread t2 = new Thread(sub); t1.start(); t2.start(); } @Test public void publish() { MyPublisher publisher = new MyPublisher(); publisher.publish("hello,欢迎收听FM 89.9频道-交通频道"); } } 

2.2.6 查看测试结果

2.3 Topic小结

  1. Topic模式能够实现多个订阅者同时消费消息。
  2. Topic主题模式下,消息不会保存,只有在线的订阅者才会接收到消息。
  3. 通常可以用来解决公共消息推送的相关业务。

版权说明:欢迎以任何方式进行转载,但请在转载后注明出处!

原文链接:https://my.oschina.net/u/4118479/blog/3045926
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章