深入理解RabbitMQ中的prefetch_count参数
前提
在某一次用户标签服务中大量用到异步流程,使用了RabbitMQ
进行解耦。其中,为了提高消费者的处理效率针对了不同节点任务的消费者线程数和prefetch_count
参数都做了调整和测试,得到一个相对合理的组合。这里深入分析一下prefetch_count
参数在RabbitMQ
中的作用。
prefetch_count参数的含义
先从AMQP
(Advanced Message Queuing Protocol
,即高级消息队列协议,RabbitMQ
实现了此协议的0-9-1
版本的大部分内容)和RabbitMQ
的具体实现去理解prefetch_count
参数的含义,可以查阅对应的文档(见文末参考资料)。AMQP 0-9-1
定义了basic.qos
方法去限制消费者基于某一个Channel
或者Connection
上未进行ack
的最大消息数量上限。basic.qos
方法支持两个参数:
-
global
:布尔值。 -
prefetch_count
:整数。
这两个参数在AMQP 0-9-1
定义中的含义和RabbitMQ
具体实现时有所不同,见下表:
global 参数值 | AMQP 0-9-1 中prefetch_count 参数的含义 | RabbitMQ 中prefetch_count 参数的含义 |
---|---|---|
false | prefetch_count 值在当前Channel 的所有消费者共享 | prefetch_count 对于基于当前Channel 创建的消费者生效 |
true | prefetch_count 值在当前Connection 的所有消费者共享 | prefetch_count 值在当前Channel 的所有消费者共享 |
或者用简洁的英文表格理解:
global | prefetch_count in AMQP 0-9-1 | prefetch_count in RabbitMQ |
---|---|---|
false | Per channel limit | Per customer limit |
true | Per connection limit | Per channel limit |
这里画一个图理解一下:
上图仅仅为了区分协议本身和RabbitMQ
中实现的不同,接着说说prefetch_count
对于消费者(线程)和待消费消息的作用。假定一个前提:RabbitMQ
客户端从RabbitMQ
服务端获取到队列消息的速度比消费者线程消费速度快,目前有两个消费者线程共用一个Channel
实例。当global
参数为false
时候,效果如下:
而当global
参数为true
时候,效果如下:
在消费者线程处理速度远低于RabbitMQ
客户端从RabbitMQ
服务端获取到队列消息的速度的场景下,prefetch_count
条未进行ack
的消息会暂时存放在一个队列(准确来说是阻塞队列,然后阻塞队列中的消息任务会流转到一个列表中遍历回调消费者句柄,见下一节的源码分析)中等待被消费者处理。这部分消息会占据JVM
的堆内存,所以在性能调优或者设定应用程序的初始化和最大堆内存的时候,如果刚好用到RabbitMQ
的消费者,必须要考虑这些"预取消息"的内存占用量。不过值得注意的是:「prefetch_count
是RabbitMQ
服务端的参数,它的设置值或者快照都不会存放在RabbitMQ
客户端」。同时需要注意prefetch_count
生效的条件和特性(从参数设置的一些demo
和源码上感知):
-
prefetch_count
参数仅仅在basic.consume
的autoAck
参数设置为false
的前提下才生效,也就是不能使用自动确认,自动确认的消息没有办法限流。 -
basic.consume
如果在非自动确认模式下忘记了手动调用basic.ack
,那么prefetch_count
正是未ack
消息数量的最大上限。 -
prefetch_count
是由RabbitMQ
服务端控制,一般情况下能保证各个消费者线程中的未ack
消息分发是均衡的,这点笔者猜测是consumerTag
起到了关键作用。
RabbitMQ客户端中prefetch_count源码跟踪
❝
编写本文的时候引入的RabbitMQ客户端版本为:com.rabbitmq:amqp-client:5.9.0
❞
上面说了这么多都只是根据官方的文档或者博客中的理论依据进行分析,其实更加根本的分析方法是直接阅读RabbitMQ
的Java
客户端源码,主要是针对basic.qos
和basic.consume
两个方法,对应的是com.rabbitmq.client.impl.ChannelN#basicQos()
和com.rabbitmq.client.impl.ChannelN#basicConsume()
两个方法。先看ChannelN#basicQos()
:
这里的basicQos()
方法多了一个prefetchSize
参数,用于限制分发内容的大小上限,默认值0
代表无限制,而prefetchCount
的取值范围是[0,65535]
,取值为0
也是代表无限制。这里的ChannelN#basicQos()
实现中直接封装basic.qos
方法参数进行一次RPC
调用,意味着直接更变RabbitMQ
服务端的配置,即时生效,同时参数值完全没有保存在客户端代码中,印证了前面一节的结论。接着看ChannelN#basicConsume()
方法:
上图已经把关键部分用红圈圈出,因为整个消息消费过程是异步的,涉及太多的类和方法,这里不全量贴出,整理了一个流程图:
整个消息消费过程,prefetch_count
参数并未出现在客户端代码中,又再次印证了前面一节的结论,即prefetch_count
参数的行为和作用完全由RabbitMQ
服务端控制。而最终Customer
或者常用的DefaultCustomer
句柄是在WorkPoolRunnable
中回调的,这类任务的执行线程来自于ConsumerWorkService
内部的线程池,而这个线程池又使用了Executors.newFixedThreadPool()
去构建,使用了默认的线程工厂类,因此在Customer#handleDelivery()
方法内部打印的线程名称的样子是pool-1-thread-*
。
❝
这里VariableLinkedBlockingQueue就是前一节中的message queue的原型
❞
prefetch_count参数使用
设置prefetch_count
参数比较简单,就是调用Channel#basicQos()
方法:
`public class RabbitQos {
static String QUEUE = "qos.test";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE, true, false, false, null);
channel.basicQos(2);
channel.basicConsume("qos.test", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("1------" + Thread.currentThread().getName());
sleep();
}
});
channel.basicConsume("qos.test", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("2------" + Thread.currentThread().getName());
sleep();
}
});
for (int i = 0; i < 20; i++) {
channel.basicPublish("", QUEUE, MessageProperties.TEXT_PLAIN, String.valueOf(i).getBytes());
}
sleep();
}
private static void sleep() {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (Exception ignore) {
}
}
}
`
上面是原生的amqp-client
的写法,如果使用了spring-amqp
(spring-boot-starter-amqp
),可以通过配置文件中的spring.rabbitmq.listener.direct.prefetch
属性指定所有消费者线程的prefetch_count
,如果要针对部分消费者线程进行该属性的设置,则需要针对RabbitListenerContainerFactory
进行改造。
prefetch_count参数最佳实践
关于prefetch_count
参数的设置,RabbitMQ
官方有一篇文章进行了分析:《Finding bottlenecks with RabbitMQ 3.3》。该文章分析了消息流控的整个流程,其中提到了prefetch_count
参数的一些指标:
这里指出了,如果prefetch_count
的值超过了30
,那么网络带宽限制开始占主导地位,此时进一步增加prefetch_count
的值就会变得收效甚微。也就是说,「官方是建议把prefetch_count
设置为30
」。这里再参看一下spring-boot-starter-amqp
中对此参数定义的默认值,具体是AbstractMessageListenerContainer
中的DEFAULT_PREFETCH_COUNT
:
如果没有通过spring.rabbitmq.listener.direct.prefetch
进行覆盖,那么使用spring-boot-starter-amqp
中的注解定义的消费者线程中设置的prefetch_count
就是250
。
笔者认为,应该综合带宽、每条消息的数据报大小、消费者线程处理的速率等等角度去考虑prefetch_count
的设置。总结如下(个人经验仅供参考):
-
当消费者线程的处理速度十分慢,而队列的消息量十分少的场景下,可以考虑把
prefetch_count
设置为1
。 -
当队列中的每条消息的数据报十分大的时候,要计算好客户端可以容纳的未
ack
总消息量的内存极限,从而设计一个合理的prefetch_count
值。 -
当消费者线程的处理速度十分快,远远大于
RabbitMQ
服务端的消息分发,在网络带宽充足的前提下,设置可以把prefetch_count
值设置为0
,不做任何的消息流控。 -
一般场景下,建议使用
RabbitMQ
官方的建议值30
或者spring-boot-starter-amqp
中的默认值250
。
小结
小结一下:
-
prefetch_count
是RabbitMQ
服务端的参数,设置后即时生效。 -
prefetch_count
对于AMQP-0-9-1
中的定义与RabbitMQ
中的实现不完全相同。 -
prefetch_count
值设置建议使用框架提供的默认值或者通过分组实验结合数据报大小进行计算和评估出一个合理值。
(本文完 c-4-d e-a-20201017)
个人博客
本文分享自微信公众号 - Throwable文摘(throwable-doge)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
10年经验17张图带你进入gitflow企业项目代码版本管理的最佳实践
前言 对于项目版本管理,你是否存在这样的痛点:项目分支多而杂不好管理,git log界面commit信息错乱复杂无规范,版本回退不知道选择什么版本合适……。 项目版本管理的最佳实践系列,笔者将以两篇文章的形式展开介绍(即基础篇与进阶篇)。本文为gitflow版本管理的最佳实践-基础篇。基础篇主要介绍git应用于生产的基本流程与怎么使用gitflow管理你的项目版本线(适用于敏捷迭代的项目管理场景下)。进阶篇 将着重介绍gitflow+jenkins+docker+DevOps+敏捷Scrum 完成项目持续构建与持续交付(CI/CD)。阅读本文需要有一定git基础,基础知识则不在本文展开,善用网上冲浪工具便可学习到许多Git的基础知识。实际上,本文介绍的并不是纯粹的gitflow,而是结合实际生产对gitflow的改造与最佳实践。 Git的基本术语与简写 术语 解释 PR 即pull request,拉取请求。请求git代码管理员将你的代码合并到仓库的分支中。一般的PR由标题部分,描述部分与代码部分组成。 code review 在PR过程中代码管理员对你提交的代码进行代码审查,即你的代...
- 下一篇
面试官:如何写出让 CPU 跑得更快的代码?
前言 代码都是由 CPU 跑起来的,我们代码写的好与坏就决定了 CPU 的执行效率,特别是在编写计算密集型的程序,更要注重 CPU 的执行效率,否则将会大大影响系统性能。 CPU 内部嵌入了 CPU Cache(高速缓存),它的存储容量很小,但是离 CPU 核心很近,所以缓存的读写速度是极快的,那么如果 CPU 运算时,直接从 CPU Cache 读取数据,而不是从内存的话,运算速度就会很快。 但是,大多数人不知道 CPU Cache 的运行机制,以至于不知道如何才能够写出能够配合 CPU Cache 工作机制的代码,一旦你掌握了它,你写代码的时候,就有新的优化思路了。 那么,接下来我们就来看看,CPU Cache 到底是什么样的,是如何工作的呢,又该写出让 CPU 执行更快的代码呢? 正文 CPU Cache 有多快? 你可能会好奇为什么有了内存,还需要 CPU Cache?根据摩尔定律,CPU 的访问速度每 18 个月就会翻倍,相当于每年增长 60% 左右,内存的速度当然也会不断增长,但是增长的速度远小于 CPU,平均每年只增长 7% 左右。于是,CPU 与内存的访问性能的差距不断...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- CentOS关闭SELinux安全模块
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- CentOS6,CentOS7官方镜像安装Oracle11G
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Hadoop3单机部署,实现最简伪集群
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Windows10,CentOS7,CentOS8安装Nodejs环境