ActiveMQ (一)
今天给大家分享的是ActiveMQ,如有不足,敬请指教。
那么我们必须知道ActiveMQ是什么。
一、ActiveMQ简介
1.1 ActiveMQ是什么
- ActiveMQ是一个消息队列应用服务器。支持JMS规范。
1.1.1 JMS概述
- 全称:Java Message Service ,即为Java消息服务,是一套java消息服务的API标准。(标准即接口)
- 实现了JMS标准的系统,称之为JMS Provider。
1.1.2 消息队列
1.1.2.1 概念
- 消息队列是在消息的传输过程中保存消息的容器,提供一种不同进程或者同一进程不同线程直接通讯的方式
图示 |
---|
- Producer:消息生产者,负责产生和发送消息到 Broker;
- Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue;
- Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理;
1.2 ActiveMQ能做什么
- 实现两个不同应用(程序)之间的消息通讯。
- 实现同一个应用,不同模块之间的消息通讯。
1.3 ActiveMQ下载
- ActiveMQ官网地址: http://activemq.apache.org
- ActiveMQ下载地址:http://activemq.apache.org/download-archives.html
1.4 ActiveMQ主要特点
- 支持多语言、多协议客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
- 对Spring的支持,ActiveMQ可以很容易整合到Spring的系统里面去。
- 支持高可用、高性能的集群模式
二、示例
2.1 需求
- 使用ActiveMQ实现消息队列模型
2.2 配置步骤说明
- 搭建ActiveMQ消息服务器(略)。
- 创建一个java项目。
- 创建消息生产者,发送消息。
- 创建消息消费者,接收消息
2.3 第一部分 创建java项目,导入jar包
图示 |
---|
2.4 第二部分 创建消息生成者,发送消息
2.4.1 创建MyProducer类,定义sengMsg2MQ方法
package com.xkt.producer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author lzx * */ public class MyProducer { // 定义链接工厂 private ConnectionFactory factory; // 定义链接 private Connection connection; // 定义会话 private Session session; // 定义目的地 private Destination destination; // 定义消息 private Message message; // 定义消息生生产者 private MessageProducer producer; public void sengMsg2MQ(String msg) { try { /* * 1、创建链接工厂 * * ActiveMQConnectionFactory(userName, password, brokerURL) * * userName:用户名 默认admin password:密码 默认admin brokerURL:消息服务中心地址 * tcp://0.0.0.0:61616 基于tcp协议 */ factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616"); // 2.创建链接 connection = factory.createConnection(); // 开启链接 connection.start(); /* * 3、创建会话 * * createSession(transacted, acknowledgeMode) * * transacted:是否使用事物 true|false true 表示使用事物,每次对消息进行读写之后,要提交事物。如果使用了事物,则消息确认机制失效 * false 表示不使用事物 * * acknowledgeMode: 消息确认机制 Session.AUTO_ACKNOWLEDGE - * 自动确认消息机制,一旦读取到消息,则消费成功,消息出队列,避免重复消费 Session.CLIENT_ACKNOWLEDGE - * 客户端确认消息机制,手动确认,即消费了消息成功之后,再确认 Session.DUPS_OK_ACKNOWLEDGE - * 有副本的客户端确认消息机制。集群模式下 * */ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 4.创建队列 destination = session.createQueue("test-mq"); // 5.创建消息对象 message = session.createTextMessage(msg); // 6.创建消息生产者 producer = session.createProducer(destination); // 7.发送消息 producer.send(message); // session.commit(); System.out.println("消息发送成功"); } catch (JMSException e) { e.printStackTrace(); System.out.println("消息发送失败"); } finally { // 回收消息发送者资源 if (null != producer) { try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } } // 回收会话资源 if (null != session) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } // 回收链接资源 if (null != connection) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
2.4.2 创建一个测试类
package com.xkt.test; import org.junit.Test; import com.xkt.consumer.Myconsumer; import com.xkt.producer.MyProducer; /** * @author lzx * */ public class MessageTest { @Test public void testSend() { try { MyProducer producer = new MyProducer(); producer.sengMsg2MQ("测试发送数据"); } catch (Exception e) { e.printStackTrace(); } } }
2.4.3 测试
图示 |
---|
- 查看ActiveMQ管理控制界面
图示 |
---|
2.5 第三部分 创建消息消费者,消费消息
2.5.1 创建MyConsumer类
package com.xkt.consumer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author lzx * */ public class Myconsumer { private ConnectionFactory factory; private Connection connection; private Session session; private Destination destination; private MessageConsumer consumer; private Message message; public void receiveFromMq() { try { factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616"); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); // 创建目的地, 目的地命名即队列命名, 消息消费者需要通过此命名访问对应的队列 destination = session.createQueue("queue"); // 创建消息消费者, 创建的消息消费者与某目的地对应, 即方法参数目的地 consumer = session.createConsumer(destination); // 6、读取消息 message = consumer.receive(5000); // 7.提取文本 if (null != message) { if (message instanceof TextMessage) { TextMessage tMsg = (TextMessage) message; String content = tMsg.getText(); System.out.println("从列表中读取的是" + content); } } // 在手动确认机制下,消费完消息之后,必须手动确认,让消费的消息出队列否则,会出现重复消费的问题。 message.acknowledge(); } catch (JMSException e) { e.printStackTrace(); System.out.println("读取失败"); } finally { if (null != consumer) { try { consumer.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != session) { try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != connection) { try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } }
2.5.2 修改测试类MessageTest,新增测试方法
@Test public void testReceive() { try { Myconsumer consumer = new Myconsumer(); consumer.receiveFromMq(); } catch (Exception e) { e.printStackTrace(); } }
2.5.3 测试
图示 |
---|
- 查看ActiveMQ管理控制界面
图示 |
---|
在前面的示例中,我们发现消费者每次只能消费一条消息。当队列中有多条消息的时候,我们需要多次运行消费者,才能消费完这些消息。很麻烦!!!!如何解决这个问题呢?我们将在后面的文章中给出。
版权说明:欢迎以任何方式进行转载,但请在转载后注明出处!
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
细谈 vue - slot 篇
本篇文章是细谈 vue 系列第二篇了,上篇我们已经细谈了 vue 的核心之一 vdom,传送门 今天我们将分析我们经常使用的 vue 功能 slot 是如何设计和实现的,本文将围绕 普通插槽 和 作用域插槽 以及 vue 2.6.x 版本的 v-slot 展开对该话题的讨论。当然还不懂用法的同学建议官网先看看相关 API 先。接下来,我们直接进入正文吧 一、普通插槽 首先我们看一个我们对于 slot 最常用的例子 <template> <div class="slot-demo"> <slot>this is slot default content text.</slot> </div> </template> 然后我们直接使用,页面则正常显示一下内容 然后,这个时候我们使用的时候,对 slot 内容进行覆盖 <slot-demo>this is slot custom content.</slot-demo> 内容则变成下图所示 对于此,大家可能都能清楚的知道会是这种情况。今天我就将带领...
- 下一篇
docker CE on Linux示例浅析(三)镜像与容器管理
概述 github项目地址:https://github.com/superwujc 尊重原创,欢迎转载,注明出处:https://my.oschina.net/superwjc/blog/3045629 历史系列: docker CE on Linux示例浅析(一)安装与基本运行 docker CE on Linux示例浅析(二)数据存储与持久化 镜像是静态存储的磁盘文件,容器是动态运行的内存进程,二者可以视为docker一体的两面。在docker的整个工作流程中,与镜像与容器相关的操作贯穿始终,其他功能大都以此为基础进行展开或封装。本文将以jdk + tomcat为例,简述镜像/容器的基本管理,包括构建与运行,以及迁移与恢复过程。 环境 宿主机2台:dock_host_0(192.168.9.168/24),dock_host_1(192.168.9.169/24),均为全新最小化安装,二者的系统与软件环境一致。操作系统版本CentOS Linux release 7.6.1810 (Core),内核版本3.10.0-957.12.1.el7.x86_64,docker为默认安装...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2全家桶,快速入门学习开发网站教程
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS8编译安装MySQL8.0.19
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- SpringBoot2配置默认Tomcat设置,开启更多高级功能