ActiveMQ (二)
今天继续给大家分享的是ActiveMQ,如有不足,敬请指教。
上次我们说到,我们发现消费者每次只能消费一条消息。当队列中有多条消息的时候,我们需要多次运行消费者,才能消费完这些消息。很麻烦!!!!如何解决这个问题呢?
这就需要使用ActiveMQ监听器来监听队列,持续消费消息。
一、ActiveMQ监听器
1.1 配置步骤说明
- 创建一个监听器对象。
- 修改消费者代码,加载监听器
1.2 配置步骤
1.2.1 创建监听器MyListener类
package com.xkt.listener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* @author lzx
*
*/
public class MyListener implements MessageListener {
@Override
public void onMessage(Message message) {
if (null != message) {
if (message instanceof TextMessage) {
try {
TextMessage tMsg = (TextMessage) message;
String content = tMsg.getText();
System.out.println("监听到的消息是 " + content);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
1.2.2 修改MyConsumer代码,加载监听器
- 监听器需要持续加载,因此消费程序不能结束。这里我们使用输入流阻塞消费线程结束
package com.xkt.consumer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.xkt.listener.MyListener;
/**
* @author lzx
*
*/
public class Myconsumer {
private ConnectionFactory factory;
private Connection connection;
private Session session;
private Destination destination;
private MessageConsumer consumer;
public void receiveFromMq() {
try {
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616");
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地, 目的地命名即队列命名, 消息消费者需要通过此命名访问对应的队列
destination = session.createQueue("queue");
// 5.创建消息消费者, 创建的消息消费者与某目的地对应, 即方法参数目的地
consumer = session.createConsumer(destination);
// 7.加载监听器
consumer.setMessageListener(new MyListener());
// 监听器需要持续加载,这里我们使用输入流阻塞当前线程结束。监听指定队列,只要有消息进来,就消费这条消息
System.in.read();
// 在java项目中,可以通过IO阻塞程序,持续加载监听器
// 在web项目中,可以通过配置文件,直接加载监听器。
} catch (Exception e) {
e.printStackTrace();
System.out.println("读取失败");
} finally {
if (null != consumer) {
try {
consumer.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (null != session) {
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
1.2.3 测试
- 多次运行生产者,发送多条消息到队列中
图示 |
---|
- 运行消费者。观察结果
图示 |
---|
- 查看ActiveMQ管理控制界面,所有消息都被消费了!
图示 |
---|
在以上示例中,只能向一个消费者发送消息。但是有一些场景,需求有多个消费者都能接收到消息,比如:美团APP每天的消息推送。该如何实现呢?
二、Topic模式实现
2.1 配置步骤说明
- 搭建ActiveMQ消息服务器。(略)
- 创建主题订阅者。
- 创建主题发布者。
2.2 配置步骤
2.2.1 创建主题订阅者MySubscriber
- 说明:主题订阅模式下,可以有多个订阅者。我们这里用多线程来模拟
package com.xkt.subscriber;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* @author lzx
*
*/
public class MySubscriber implements Runnable {
/**
* 多线程的线程安全问题 解决方案:
*
* (1)加锁 --极不推荐 (2)不使用全局变量 ---> SpringMVC是线程安全的吗? 答:默认不是 解决办法:(1)使用原型模式--不推荐
* (2)不使用全局变量 (3)ThreadLocal (3)其它框架来代替,比如redis
*/
private TopicConnectionFactory factory;
private TopicConnection connection;
private TopicSession session;
private Topic topic;
private TopicSubscriber subscriber;
private Message message;
@Override
public void run() {
try {
// 1、创建连接工厂
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616");
// 2、创建连接
connection = factory.createTopicConnection();
connection.start();
// 3、创建会话
session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建topic主题
topic = session.createTopic("topic-gzsxt");
// 5、创建订阅者
subscriber = session.createSubscriber(topic);
// 6、订阅
while (true) {
message = subscriber.receive();
if (null != message) {
if (message instanceof TextMessage) {
TextMessage tMsg = (TextMessage) message;
String content = tMsg.getText();
System.out.println("订阅者: " + Thread.currentThread().getName() + " 接收的消息是:" + content);
}
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
2.2.2 修改测试类
package com.xkt.test;
import com.xkt.subscriber.MySubscriber;
/**
* @author lzx
*
*/
public class TestMQ {
public static void main(String[] args) {
MySubscriber sub = new MySubscriber();
Thread t1 = new Thread(sub);
Thread t2 = new Thread(sub);
t1.start();
t2.start();
}
}
2.2.3 查看测试结果
- 查看AcitveMQ管理界面 |图示 | | :------------: | |
|
2.2.4 创建主题发布者MyPublisher
package com.xkt.publish;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* @author lzx
*
*/
public class MyPublisher {
private TopicConnectionFactory factory;
private TopicConnection connection;
private TopicSession session;
private Topic topic;
private TopicPublisher publisher;
private Message message;
public void publish(String msg) {
try {
// 1、创建连接工厂
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616");
// 2、创建连接
connection = factory.createTopicConnection();
connection.start();
// 3、创建会话
session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建topic主题
topic = session.createTopic("topic-gzsxt");
// 5、创建发布者
publisher = session.createPublisher(topic);
// 6、创建消息对象
message = session.createTextMessage(msg);
// 7、发布消息
publisher.publish(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != publisher) {
try {
publisher.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (null != session) {
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.stop();
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (null != session) {
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
2.2.5 修改测试类
package com.xkt.test;
import org.junit.Test;
import com.xkt.publish.MyPublisher;
import com.xkt.subscriber.MySubscriber;
/**
* @author lzx
*
*/
public class TestMQ {
public static void main(String[] args) {
MySubscriber sub = new MySubscriber();
Thread t1 = new Thread(sub);
Thread t2 = new Thread(sub);
t1.start();
t2.start();
}
@Test
public void publish() {
MyPublisher publisher = new MyPublisher();
publisher.publish("hello,欢迎收听FM 89.9频道-交通频道");
}
}
2.2.6 查看测试结果
2.3 Topic小结
- Topic模式能够实现多个订阅者同时消费消息。
- Topic主题模式下,消息不会保存,只有在线的订阅者才会接收到消息。
- 通常可以用来解决公共消息推送的相关业务。
版权说明:欢迎以任何方式进行转载,但请在转载后注明出处!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
Go Ticker资源泄露案例
前言 前面我们研究了Ticker的实现原理,已经知道Ticker如果不主动停止会有资源泄露的问题。 本节介绍一个真实的案例,重点分析产生资源泄露的现象以及排查思路。 应用背景 曾经做过一个产品,不经意间出现了CPU使用率缓慢升高,最后CPU使用率竟然达到了100%,严重影响了业务。经过排查,问题出在Ticker的使用方式上,创建了Ticker,在使用结束后没有释放导致的。 该产品需要监控其他服务器的健康状态,其中很常见的一种做法是心跳检测。简单的说,周期性的ping这些服务器,能在指定时间内收到ack说明与该服务器之间的网络没问题。 当时使用了一个小众的开源组件tatsushid/go-fastping来做ping。 该组件介绍如下图所示: 问题现象 在做性能测试时,管理了1000台服务器,差不多4天后发现系统越来越慢,查看CPU使用情况,结果发现CPU使用率已经达到100%。 排查性能问题主要使用pprof,关于pprof的使用方法及原理介绍在请参照相关章节。 使用pprof查看CPU使用情况,主要是查看CPU都在忙什么: 从上图可以看出,CPU主要是被runtime包占用了,其中第...
-
下一篇
Jenkins 插件开发之旅:两天内从 idea 到发布(上篇)
本文首发于:Jenkins 中文社区 本文介绍了笔者首个 Jenkins 插件开发的旅程, 包括从产生 idea 开始,然后经过插件定制开发, 接着申请将代码托管到 jenkinsci GitHub 组织, 最后将插件发布到 Jenkins 插件更新中心的过程。 鉴于文章篇幅过长,将分为上下两篇进行介绍。 从一个 idea 说起 前几天和朋友聊天时,聊到了 Maven 版本管理领域的 SNAPSHOT 版本依赖问题, 这给他带来了一些困扰,消灭掉历史遗留应用的 SNAPSHOT 版本依赖并非易事。 类似问题也曾经给笔者带来过困扰,在最初没能去规避问题, 等到再想去解决问题时却发现困难重重,牵一发而动全身, 导致这个问题一直被搁置,而这也给笔者留下深刻的印象。 等到再次制定 Maven 规范时,从一开始就考虑 强制禁止 SNAPSHOT 版本依赖发到生产环境。 这里是通过在 Jenkins 构建时做校验实现的。 因为没有找到提供类似功能的 Jenkins 插件, 目前这个校验通过 shell 脚本来实现的, 具体的做法是在 Jenkins 任务中 Maven 构建之前增加一个 Execu...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- 2048小游戏-低调大师作品
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- MySQL数据库在高并发下的优化方案
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Docker快速安装Oracle11G,搭建oracle11g学习环境