您现在的位置是:首页 > 文章详情

ActiveMQ (一)

日期:2019-05-05点击:428


今天给大家分享的是ActiveMQ,如有不足,敬请指教。

那么我们必须知道ActiveMQ是什么。

一、ActiveMQ简介

1.1 ActiveMQ是什么

  • ActiveMQ是一个消息队列应用服务器。支持JMS规范。

1.1.1 JMS概述

  • 全称:Java Message Service ,即为Java消息服务,是一套java消息服务的API标准。(标准即接口)
  • 实现了JMS标准的系统,称之为JMS Provider。

1.1.2 消息队列

1.1.2.1 概念

  • 消息队列是在消息的传输过程中保存消息的容器,提供一种不同进程或者同一进程不同线程直接通讯的方式
图示
  1. Producer:消息生产者,负责产生和发送消息到 Broker;
  2. Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue;
  3. Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理;

1.2 ActiveMQ能做什么

  • 实现两个不同应用(程序)之间的消息通讯。
  • 实现同一个应用,不同模块之间的消息通讯。

1.3 ActiveMQ下载

1.4 ActiveMQ主要特点

  1. 支持多语言、多协议客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  2. 对Spring的支持,ActiveMQ可以很容易整合到Spring的系统里面去。
  3. 支持高可用、高性能的集群模式

二、示例

2.1 需求

  • 使用ActiveMQ实现消息队列模型

2.2 配置步骤说明

  1. 搭建ActiveMQ消息服务器(略)。
  2. 创建一个java项目。
  3. 创建消息生产者,发送消息。
  4. 创建消息消费者,接收消息

2.3 第一部分 创建java项目,导入jar包

图示

2.4 第二部分 创建消息生成者,发送消息

2.4.1 创建MyProducer类,定义sengMsg2MQ方法

package com.xkt.producer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author lzx * */ public class MyProducer { // 定义链接工厂 private ConnectionFactory factory; // 定义链接 private Connection connection; // 定义会话 private Session session; // 定义目的地 private Destination destination; // 定义消息 private Message message; // 定义消息生生产者 private MessageProducer producer; public void sengMsg2MQ(String msg) { try { /* * 1、创建链接工厂 * * ActiveMQConnectionFactory(userName, password, brokerURL) * * userName:用户名 默认admin password:密码 默认admin brokerURL:消息服务中心地址 * tcp://0.0.0.0:61616 基于tcp协议 */ factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616"); // 2.创建链接 connection = factory.createConnection(); // 开启链接 connection.start(); /* * 3、创建会话 * * createSession(transacted, acknowledgeMode) * * transacted:是否使用事物 true|false true 表示使用事物,每次对消息进行读写之后,要提交事物。如果使用了事物,则消息确认机制失效 * false 表示不使用事物 * * acknowledgeMode: 消息确认机制 Session.AUTO_ACKNOWLEDGE - * 自动确认消息机制,一旦读取到消息,则消费成功,消息出队列,避免重复消费 Session.CLIENT_ACKNOWLEDGE - * 客户端确认消息机制,手动确认,即消费了消息成功之后,再确认 Session.DUPS_OK_ACKNOWLEDGE - * 有副本的客户端确认消息机制。集群模式下 * */ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 4.创建队列 destination = session.createQueue("test-mq"); // 5.创建消息对象 message = session.createTextMessage(msg); // 6.创建消息生产者 producer = session.createProducer(destination); // 7.发送消息 producer.send(message); // session.commit(); System.out.println("消息发送成功"); } catch (JMSException e) { e.printStackTrace(); System.out.println("消息发送失败"); } finally { // 回收消息发送者资源 if (null != producer) { try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } } // 回收会话资源 if (null != session) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } // 回收链接资源 if (null != connection) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } } 

2.4.2 创建一个测试类

package com.xkt.test; import org.junit.Test; import com.xkt.consumer.Myconsumer; import com.xkt.producer.MyProducer; /** * @author lzx * */ public class MessageTest { @Test public void testSend() { try { MyProducer producer = new MyProducer(); producer.sengMsg2MQ("测试发送数据"); } catch (Exception e) { e.printStackTrace(); } } } 

2.4.3 测试

图示
  • 查看ActiveMQ管理控制界面
图示

2.5 第三部分 创建消息消费者,消费消息

2.5.1 创建MyConsumer类

package com.xkt.consumer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author lzx * */ public class Myconsumer { private ConnectionFactory factory; private Connection connection; private Session session; private Destination destination; private MessageConsumer consumer; private Message message; public void receiveFromMq() { try { factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616"); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); // 创建目的地, 目的地命名即队列命名, 消息消费者需要通过此命名访问对应的队列 destination = session.createQueue("queue"); // 创建消息消费者, 创建的消息消费者与某目的地对应, 即方法参数目的地 consumer = session.createConsumer(destination); // 6、读取消息 message = consumer.receive(5000); // 7.提取文本 if (null != message) { if (message instanceof TextMessage) { TextMessage tMsg = (TextMessage) message; String content = tMsg.getText(); System.out.println("从列表中读取的是" + content); } } // 在手动确认机制下,消费完消息之后,必须手动确认,让消费的消息出队列否则,会出现重复消费的问题。 message.acknowledge(); } catch (JMSException e) { e.printStackTrace(); System.out.println("读取失败"); } finally { if (null != consumer) { try { consumer.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != session) { try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != connection) { try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } } 

2.5.2 修改测试类MessageTest,新增测试方法

 @Test public void testReceive() { try { Myconsumer consumer = new Myconsumer(); consumer.receiveFromMq(); } catch (Exception e) { e.printStackTrace(); } } 

2.5.3 测试

图示
  • 查看ActiveMQ管理控制界面
图示

在前面的示例中,我们发现消费者每次只能消费一条消息。当队列中有多条消息的时候,我们需要多次运行消费者,才能消费完这些消息。很麻烦!!!!如何解决这个问题呢?我们将在后面的文章中给出。

版权说明:欢迎以任何方式进行转载,但请在转载后注明出处!

原文链接:https://my.oschina.net/u/4118479/blog/3045575
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章