消息中间件系列五、rabbit消息的确认机制
前言:这是中间件一个系列的文章之一,有需要的朋友可以看看这个系列的其他文章:
消息中间件系列一、消息中间件的基本了解
消息中间件系列二、Windows下的activeMQ和rabbitMQ的安装
消息中间件系列三、JMS和activeMQ的简单使用
消息中间件系列四、认识AMQP和RabbiyMq的简单使用
消息中间件系列五、rabbit消息的确认机制
消息中间件系列六,rabbit与spring集成实战
一、消息的确认机制
1、消费者收到的每一条消息都必须进行确认。(分为自动确认和消费者自行确认)
消费者在声明队列时,指定autoAck参数,true自动确认,false时rabbitmq会等到消费者显示的发回一个ack信号才会删除消息。autoAck=false,有足够时间让消费者处理消息,直到消费者显示调用basicAck为止。Rabbitmq中消息分为了两部分:
1、等待投递的消息;
2、已经投递,但是还没有收到ack信号的。如果消费者断连了,服务器会把消息重新入队,投递给下一个消费者。
补充:未ack的消息是没有超时时间的,没有处理会一直在队列中,知道内存溢出。
2、如何明确拒绝消息
a、消费者断连,
b、消费者使用reject命令(requeue=true,重新分发消息,false移除消息),
c、nack命令(批量的拒绝)(rabbitMQ的特有命令)
二、为什么要有个发送方(生产者)确认模式?
生产者不知道消息是否真正到达RabbitMq,也就是说发布操作不返回任何消息给生产者。
AMQP协议层面为我们提供的事务机制解决了这个问题。
AMQP事务:讲几个消息打包一起发给队列,如果队列有一个或部分消息没有成功接收处理,那么这几个消息就会被回退。
但是事务机制本身也会带来问题:
1、严重的性能问题
2、使生产者应用程序产生同步
RabbitMQ团队为我们拿出了更好的方案,即采用发送方确认模式,该模式(异步模式)比事务更轻量,性能影响几乎可以忽略不计。
发送方确认模式的机制。
三、消费者确认
首先当然要添加依赖,下面的所有代码都是在同一个项目中,生产者确认也是,下文不再重复说依赖的问题。
添加依赖
客户端Jar包和源码包下载地址:
http://repo1.maven.org/maven2/com/rabbitmq/amqp-client/5.0.0/amqp-client-5.0.0.jar
http://repo1.maven.org/maven2/com/rabbitmq/amqp-client/5.0.0/amqp-client-5.0.0-sources.jar
如果是引入jar包的形式还需要引入slf4j-api-1.6.1.jar。
如果是Maven工程加入:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.0.0</version> </dependency>
注意:5系列的版本最好使用JDK8及以上, 低于JDK8可以使用4.x(具体的版本号到Maven的中央仓库查)的版本。
1、确认回复
消费者在通道channel调用basicConsume方法声明队列的时候,可以设置为自动确认和消费者自行确认。
a、自动确认
basicConsume第二个参数autoAck(是否自动确认)设置为true,那么队列每次接收到消息之后都会向队列发送确认消息,确认之后队列会删除相应的消息。上一篇博客消息中间件系列四、认识AMQP和RabbiyMq的简单使用的用例就全部是用这种确认方式(需要的朋友可以去这篇文章看用例,其实代码差别不多)。
b、自行确认
basicConsume当第二个参数设置为false时,就要消费者自己确认了,否则消息会一直留在队列中直到内存溢出。那么怎么确认呢?
在消息监听回调方法里面,获取通道,然后调用basicAck即可进行手动确认了,方法参数为:deliveryTag投递的标记符,multiple是否进行批量回复
this.getChannel().basicAck(envelope.getDeliveryTag(),false);//参数:deliveryTag投递的标记符,multiple是否进行批量回复
下面看代码实例
生产者实例代码:
package dongnaoedu.consumerconfirm; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConsumerConfirmProducer { private final static String EXCHANGE_NAME = "direct_cc_confirm_1"; private final static String ROUTE_KEY = "error"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); /* factory.setUsername(..); factory.setPort(); factory.setVirtualHost();*/ Connection connection = factory.newConnection();//连接 Channel channel = connection.createChannel();//信道 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器 for(int i=0;i<10;i++){ String message = "Hello world_"+(i+1); channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,null,message.getBytes()); System.out.println("Sent "+ROUTE_KEY+":"+message); } channel.close(); connection.close(); } }
消费者实例代码:
package dongnaoedu.consumerconfirm; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ClientConsumerAck { private static final String EXCHANGE_NAME = "direct_cc_confirm_1"; public static void main(String[] argv) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection();//连接 Channel channel = connection.createChannel();//信道 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器 //声明队列 String queueName = "consumer_confirm"; //创建队列,参数分别为:队列名,durable是否进行持久化,exlusive是否私有,auto-delete没有消费者是自动删除,arguments相关参数 channel.queueDeclare(queueName,false,false,false,null); String server = "error"; channel.queueBind(queueName,EXCHANGE_NAME,server); System.out.println("Waiting message......."); Consumer consumerB = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println("Accept:"+envelope.getRoutingKey()+":"+message); this.getChannel().basicAck(envelope.getDeliveryTag(),false);//参数:投递的标记符,b:是否进行批量回复 } }; //三个参数:queueName队列名,autoAck是否自动确认,callback消息监听回调 channel.basicConsume(queueName,false,consumerB); } }
分别启动消费者和生产者,可以看到队列里存留的信息是0,说明生产者产生的消息都被消费者确认消费了
2、拒绝回复
在某些情况,需要消费者收到消息后不清除队列中的消息,那么消费者就要拒绝回复,。那么怎么拒绝呢?
在消息监听回调方法里面,获取通道,然后调用basicReject拒绝,方法参数为:deliveryTag投递的标记符,requeue拒绝后是否让其他消费者处理消息。
新建一个消费者,只需在上面的消费中改动21行以下的代码,其他不变:
Consumer consumerB = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { this.getChannel().basicReject(envelope.getDeliveryTag(),true);//b:拒绝后是否让其他消费者处理消息 System.out.println("Reject:"+envelope.getRoutingKey() +":"+new String(body,"UTF-8")); } }; channel.basicConsume(queueName,false,consumerB);
分别启动消费者和生产者,消费者发完10条消息就停了,由于消费者拒绝确认,又设置转让其他消费者处理,并且只有一个消费者,就会一直循环接收消息,拒绝消息。而队列中的消息一直都是10条。
清空队列消息,再同时把上面确认回复的消费者启动,启动生产者,两个消费者的打印信息如下:
说明队列先是轮询给消费者发消息,ClientConsumerReject拒绝并转给其他队列处理,这时消息有可能再次发给自己,消息处理完之后队列消息会清空:
四、生产者确认
为什么队列要给生产者回复消息确认在目录二已经说过了,这里不再重复,发送方确认可以分为同步确认和异步确认。
先上消费者,设置简单的自动确认模式:
package dongnaoedu.producerconfirm; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ProducerConfirmConsumer { private static final String EXCHANGE_NAME = "producer_confirm"; public static void main(String[] argv) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打开连接和创建频道,与发送端一样 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器 String queueName = "producer_confirm"; //参数分别为:队列名,durable是否进行持久化,exlusive是否私有,auto-delete没有消费者是自动删除,arguments相关参数 channel.queueDeclare(queueName,false,false,false,null); String severity="error";//只关注error级别的日志 channel.queueBind(queueName, EXCHANGE_NAME, severity);//把队列按路由键绑定到交换器上 System.out.println(" [*] Waiting for messages......"); // 创建队列消费者 final Consumer consumerB = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println( "Received ["+ envelope.getRoutingKey() + "] "+message); } }; //三个参数:queueName队列名,autoAck是否自动确认,callback消息监听回调 channel.basicConsume(queueName, true, consumerB); } }
1、生产者同步确认
要实现发送方确认需要调用通道的confirmSelect将信道设置为发送方确认。
channel.confirmSelect();
生产者发送消息之后,可以调用通道的waitForConfirms方法,等待队列的回复,改方法会阻塞进程。
package dongnaoedu.producerconfirm; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 发送方确认同步模式 */ public class ProducerConfirm { private final static String EXCHANGE_NAME = "producer_confirm"; private final static String ROUTE_KEY = "error"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { /** * 创建连接连接到MabbitMQ */ ConnectionFactory factory = new ConnectionFactory(); // 设置MabbitMQ所在主机ip或者主机名 factory.setHost("127.0.0.1"); // 创建一个连接 Connection connection = factory.newConnection(); // 创建一个信道 Channel channel = connection.createChannel(); //将信道设置为发送方确认 channel.confirmSelect(); for(int i=0;i<2;i++){ String msg = "Hello "+(i+1); channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,null,msg.getBytes()); if (channel.waitForConfirms()){//同步阻塞到消费者发送确认消息,效率太低,还不如用事务 System.out.println(ROUTE_KEY+":"+msg); } } // 关闭频道和连接 channel.close(); connection.close(); } }
由于这种同步模式效率实在太低,会阻塞到消费者发送确认消息,还不如用事务。现在看看异步模式的:
2、生产者异步确认
要实现发送方确认需要调用通道的confirmSelect将信道设置为发送方确认。
channel.confirmSelect();
生产者异步确认需要在通道通过addConfirmListener方法添加确认监听接口,接口有两个方法要实现,分别是
handleAck:当rabbitMQ队列返回确认的时候调用的方法;
handleNack:如果rabbitMQ队列出现内部错误,发生数据丢失调用的方法;
channel.addConfirmListener(new ConfirmListener() { //当rabbitMQ队列返回确认的时候调用的方法 public void handleAck(long deliveryTag, boolean multiple) throws IOException { //确认回调执行的代码 } //如果rabbitMQ队列出现内部错误,发生数据丢失调用的方法 public void handleNack(long deliveryTag, boolean multiple) throws IOException { //队列发送错误回调的代码 } });
最好还在通道通过addReturnListener方法添加返回监听接口,接口需要实现一个方法
handleReturn:当投递消息时无法找到一个合适的队列时回调的方法。
channel.addReturnListener(new ReturnListener() { public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { //回调代码。。。 } });
补充:在通道调用basicPublish方法声明队列的时候,把mandatory(默认是false)设置为true,且投递消息时无法找到一个合适的队列才会回调addReturnListener接口的handleReturn方法;如果把mandatory(默认是false)设置为false,且投递消息时无法找到一个合适的队列,那么就会丢弃消息(缺省)。
调用basicPublish方法如下:
//参数分别为:交换器名称,路由键,mandatory(默认是false),参数,消息 channel.basicPublish(EXCHANGE_NAME, severity, false,null, message.getBytes());
发送方确认异步模式整体代码:
package dongnaoedu.producerconfirm; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 发送方确认异步模式 */ public class ProducerConfirmAsync { private final static String EXCHANGE_NAME = "producer_confirm"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { /** * 创建连接连接到MabbitMQ */ ConnectionFactory factory = new ConnectionFactory(); // 设置MabbitMQ所在主机ip或者主机名 factory.setHost("127.0.0.1"); // 创建一个连接 Connection connection = factory.newConnection(); //连接被关闭 //connection.addShutdownListener(); // 创建一个信道 Channel channel = connection.createChannel(); // 指定转发 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //将信道设置为发送方确认 channel.confirmSelect(); //信道被关闭 //channel.addShutdownListener(); //deliveryTag代表了(channel)唯一的投递,是个单调递增的正整数 //multiple:false,是否把本channel内容小于deliveryTag的消息一次性确认 //添加异步监听器, channel.addConfirmListener(new ConfirmListener() { //当rabbitMQ队列返回确认的时候调用的方法 public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("Ack deliveryTag="+deliveryTag +"multiple:"+multiple); } //如果rabbitMQ队列出现内部错误,发生数据丢失调用的方法 public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("Ack deliveryTag="+deliveryTag +"multiple:"+multiple); } }); //服务端的返回方法 //投递消息时无法找到一个合适的队列 //1、mandatory参数为true //消息返回给生产者 //false 丢弃消息(缺省) channel.addReturnListener(new ReturnListener() { public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println("replyText:"+replyText); System.out.println("exchange:"+exchange); System.out.println("routingKey:"+routingKey); System.out.println("msg:"+msg); } }); String[] severities={"error","info","warning"}; for(int i=0;i<3;i++){ String severity = severities[i%3]; // 发送的消息 String message = "Hello World_"+(i+1)+("_"+System.currentTimeMillis()); //参数分别为:交换器名称,路由键,mandatory(默认是false),参数,消息 channel.basicPublish(EXCHANGE_NAME, severity, false,null, message.getBytes()); //channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,null,msg.getBytes()); System.out.println("----------------------------------------------------"); System.out.println(" Sent Message: [" + severity +"]:'"+ message + "'"); Thread.sleep(200);//延长程序生命时长,并且隔离每条消息 } // 关闭频道和连接 channel.close(); connection.close(); } }
看代码可知生产者发了三条消息,分别是"error","info","warning"这三个类型的。
分别启动消费者和生产者:
由于队列只绑定的路由键只有error,所以消费者只能接收到error类的信息。
生产者打印信息解释:生产者没发送一条消息的时候都打印出一条横线并把消息信息也打印出来;每次发送消息给队列,队列都能正常接收并会返回确认,所以发送每条消息后都会回调ConfirmListener的handleAck方法;由于"info","warning"这两个类型的路由键没有队列绑定,所以没有队列能接收者两条消息,而且mandatory参数为true,所以发现没有合适的队列会回调ReturnListener接口的handleReturn方法,打印出相应的信息。
生产者发的消息中,error类型的被确认消费,其他两个类型没有找到合适的队列(没有队列绑定相应的路由键),所以消息会忽略,会把消息丢弃(这个有在上一篇博客消息中间件系列四、认识AMQP和RabbiyMq的简单使用 说明并证实过)。
注意:生产者确认模式和消费者对消息的确认是不同的,发送方确认是消息发给rabbit队列之后,rabbit给生产者回复消息说明队列接收到消息了;消费者确认是消费者收到消息后给rabbit队列回复消息说明消费者正常收到消息并处理了,然后消息从队列中删除。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
高可用Redis(三):Hash类型
1.哈希类型键值结构 哈希类型也是key-value结构,key是字符串类型,其value分为两个部分:field和value 其中field部分代表属性,value代表属性对应的值 上面的图里,user:1:info为key,name,age,Date为user这个key的一些属性,value是属性对应的值 在hash中,可以为key添加一个新的属性和新的值 比如使用下面的命令向user:1:info这个key添加一个新属性viewCounter,属性对应的值为100 hset user:1:info viewCounter 100 2.特点 key-value结构 key(field)不能相同,value可以相同 3.Redis哈希类型对应的命令 3.1 hget命令,hset命令和hdel命令 hget key field 获取hash key对应的field的value hset key field value 设置hash key对应的field的value hdel key field 删除hash key对应的field的value 例子: 127.0.0.1:6379&...
- 下一篇
@ConfigurationProperties注解的使用与@Value的使用
读取的配置文件application.yml: jianshu: id: futaosmile 1. 使用@ConfigurationProperties注解注入 @RunWith(SpringRunner.class) @SpringBootTest @ConfigurationProperties(prefix = "jianshu") public class SpringmvcdemoApplicationTests { private String id; @Test public void test10() { System.out.println(id); } } 输出null,注入失败 原因:使用@ConfigurationProperties注解的方式注入为每个注入的字段添加setter方法 package com.futao.springmvcdemo; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Hadoop3单机部署,实现最简伪集群
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS7设置SWAP分区,小内存服务器的救世主
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Windows10,CentOS7,CentOS8安装Nodejs环境
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- 设置Eclipse缩进为4个空格,增强代码规范
- SpringBoot2全家桶,快速入门学习开发网站教程