AMQP-RabbitMQ/1/概念/一对一简单模型
JMS,AMQP,MQTT的区别与联系
- JMS
Java消息传递服务(Java Messaging Service ) - AMQP
高级消息队列协议(Advanced Message Queueing Protocol ) - MQTT
消息队列遥测传输(Message Queueing Telemetry Transport )
简单理解:
*JMS是专门为Java设计的一套消息服务API,像ActiveMQ就是对它的实现
*AMQP为了解决不同平台之间的通信问题,定义了一种名为amqp的通信协议,从而实现平台和语言无关性。
*MQTT也是一种通信协议。相比于AMQP的复杂性,它简单的多。所以amqp用于处理相对较重的任务,如两个系统平台之间的消息传输。而mqtt因为非常轻量,所以大量应用于物联网。
RabbitMQ常用命令
- 修改密码
rabbitmqctl change_password Username Newpassword - 显示所有用户
rabbitmqctl list_users - 启动
rabbitmq-server start - 关闭
rabbitmqctl stop - homebrew安装配置文件地址
/usr/local/etc/rabbitmq/rabbitmq-env.conf - 管理平台地址
http://localhost:15672/
1. 简单模型
The simplest thing that does something
- 模型图示
- 生产者
package com.futao.springmvcdemo.mq.rabbit.simple;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqQueueEnum;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
/**
* 简单发送者
*
* @author futao
* Created on 2019-04-22.
*/
@Slf4j
public class Send {
@SneakyThrows
public static void main(String[] args) {
@Cleanup
Connection connection = RabbitMqConnectionTools.getConnection();
@Cleanup
Channel channel = connection.createChannel();
//定义一个队列
channel.queueDeclare(RabbitMqQueueEnum.SIMPLE.getQueueName(), false, false, false, null);
String msg = "Hello RabbitMq!";
channel.basicPublish("", RabbitMqQueueEnum.SIMPLE.getQueueName(), null, msg.getBytes());
log.info("Send msg:[{}] success", msg);
}
}
- 消费者
package com.futao.springmvcdemo.mq.rabbit.simple;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqQueueEnum;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
/**
* 简单消费者
*
* @author futao
* Created on 2019-04-22.
*/
@Slf4j
public class Recv {
@SneakyThrows
public static void main(String[] args) {
Channel channel = RabbitMqConnectionTools.getChannel();
channel.queueDeclare(RabbitMqQueueEnum.SIMPLE.getQueueName(), false, false, false, null);
log.info("Waiting for message...");
DeliverCallback deliverCallback = ((consumerTag, message) -> {
log.info("收到消息:[{}],tag:[{}]", new java.lang.String(message.getBody()), consumerTag);
});
channel.basicConsume(RabbitMqQueueEnum.SIMPLE.getQueueName(), true, deliverCallback, consumerTag -> {
});
}
}
- 特点:一对一。一个生产者,一个消费者。
通用代码
- 工具类 - 链接工厂
package com.futao.springmvcdemo.mq.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* rabbitMq配置类
*
* @author futao
* Created on 2019-04-19.
*/
public class RabbitMqConnectionTools {
/**
* 获取链接
*
* @return
* @throws IOException
* @throws TimeoutException
*/
public static Connection getConnection() {
try {
return getConnectionFactory().newConnection();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage(), e);
}
}
/**
* 连接工厂
*
* @return
*/
private static ConnectionFactory getConnectionFactory() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("futao");
factory.setPassword("123456");
factory.setVirtualHost("/springmvc");
return factory;
}
/**
* 创建并获取通道
*
* @return
*/
@SneakyThrows
public static Channel getChannel() {
Connection connection = RabbitMqConnectionTools.getConnection();
return connection.createChannel();
}
}
- 枚举类 - RabbitMqQueueEnum队列名称
package com.futao.springmvcdemo.mq.rabbit;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* queue名称枚举
*
* @author futao
* Created on 2019-04-19.
*/
@Getter
@AllArgsConstructor
public enum RabbitMqQueueEnum {
/**
* 简单queue
*/
SIMPLE("simple-queue"),
/**
* WorkQueue 工作队列
*/
WORK_QUEUE("work-queue"),
/**
* 发布订阅-fanout
*/
EXCHANGE_QUEUE_FANOUT_ONE("exchange-queue-fanout-1"),
EXCHANGE_QUEUE_FANOUT_TWO("exchange-queue-fanout-2"),
EXCHANGE_QUEUE_DIRECT_ONE("exchange-queue-direct-1"),
EXCHANGE_QUEUE_DIRECT_TWO("exchange-queue-direct-2"),
EXCHANGE_QUEUE_TOPIC_ONE("exchange-queue-topic-1"),
EXCHANGE_QUEUE_TOPIC_TWO("exchange-queue-topic-2"),
EXCHANGE_QUEUE_TOPIC_THREE("exchange-queue-topic-3"),
EXCHANGE_QUEUE_TOPIC_FOUR("exchange-queue-topic-4"),
/**
* RPC队列
*/
RPC_QUEUE("rpc-queue");
/**
* queue名称
*/
private String queueName;
}
关注公众号
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
【从入门到放弃-SpringBoot】SpringBoot源码分析-WebServer
前言 前文【从入门到放弃-SpringBoot】SpringBoot源码分析-启动中,我们分析了springboot的启动过程,在refreshContext中调用了onRefresh。在SERVLET类型应用中,实际实例化的应用上下文为ServletWebServerApplicationContext。其onRefresh中调用了createWebServer。我们在本文中一起分析下是一个web应用是如何启动的。 ServletWebServerApplicationContext::createWebServer private void createWebServer() { WebServer webServer = this.webServer; ServletContext servletContext
-
下一篇
AMQP-RabbitMQ/2/工作队列
2. 工作队列 Work queues Distributing tasks among workers 消息将发送给c1或者c2 个人理解 生产者定义Queue,并向该队列发送消息 多个消费者可以从指定的同一个Queue中读取消息。每条消息只会发送给其中某一个消费者。 生产者 package com.futao.springmvcdemo.mq.rabbit.workqueue; import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools; import com.futao.springmvcdemo.mq.rabbit.RabbitMqQueueEnum; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import lombok.Cleanup; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; /** * 简单发送者 * * @auth...
相关文章
文章评论
共有0条评论来说两句吧...



微信收款码
支付宝收款码