智联招聘 × Pulsar|Pulsar 客户端在高吞吐场景下的内存控制实践
> 本文来自 智联招聘 汪苏诚(萧易客)
1. 背景介绍
在现代分布式系统架构中,消息队列作为核心组件承担着系统解耦、流量削峰和异步处理的重要职责。Apache Pulsar 作为云原生时代的统一消息平台,其在高并发、大数据量场景下的表现直接影响着整个系统的稳定性和性能。这其中容易忽视的是在高吞吐场景下的客户端的表现。
在高吞吐的业务场景下,会面临以下技术痛点:
- 内存溢出风险:高频大消息发送导致客户端内存缓冲区快速增长
- 线程阻塞问题:不当的配置可能导致异步发送退化为同步阻塞
- 资源竞争冲突:Producer 和 Consumer 共享内存限制导致相互影响
- 监控和调优困难:缺乏有效的内存使用监控和调优策略
因此,本篇文章结合了我们对 Pulsar 客户端内存控制的一些理解和实践,希望帮助对大家构建高性能、高可用的分布式系统有所帮助。
2. 消息发送模式分析
在实际业务场景里,有阻塞和非阻塞两种消息发送方式。我们可以按照具体需求,灵活做出选择 。
2.1 同步发送的局限性
同步发送(Blocking Send)是 Pulsar 客户端最基础的消息发送方式。在这种模式下,发送线程会一直阻塞,直到收到 Broker 的确认响应。
代码示例:
// 初始化 pulsarClient 和 producer PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://xxx:6650").build(); Producer<byte[]> producer = pulsarClient.newProducer().topic("topic_name").create(); // 同步方式发送消息 try { MessageId messageId = producer.send(/* 消息内容 */); log.debug("..."); } catch (Exception e) { log.error("...", e); }
在我们的业务中,需要在用户发起搜索请求时,记录本次搜索请求的事件,以便后续进行热点分析和搜索优化。这里的搜索请求事件包含了搜索请求的上下文信息,例如搜索词等,而这一过程本身其实不会反馈到用户的搜索结果中。然后使用同步方式发送,必然会导致主线程阻塞,增加整体搜索延迟,这样会影响用户体验,同时也会给搜索执行过程带来更多的不确定性。
在这种情况下,我们需要明确区分主线流程和支线流程。主线流程是用户的核心交互路径,而记录搜索请求等操作属于支线流程。将支线流程嵌套在主线流程中不仅会增加系统的复杂性,还可能影响用户体验。因此,异步发送可能是一个更好的选择。
2.2 异步发送的优势
于是,我们可以优化一下,调整为非阻塞方式,将记录搜索事件放到其它线程中完成:
producer.sendAsync(/* message payload here */).whenComplete((msgId, ex) -> { if (ex != null) { log.error("Failed to record search activity", ex); } else { log.debug("Search activity messageId={}", msgId); } });
异步发送最大的优势在于其非阻塞特性。异步发送允许客户端并行处理多个发送请求,充分利用网络带宽和处理器资源。异步发送也天然支持消息批处理,同时异步模式也提供了更加灵活的背压处理机制。
2.3 高吞吐场景下数据发送的问题
在现实中,若用户搜索的 TPS 较高,例如在单实例上可以超过 1000 QPS(高和低都是相对而言的,这里只是举个例子)。若恰好记录的搜索事件内容较多(例如包含了搜索请求的完整上下文和搜索结果等),序列化之后大小能达到 100KB 甚至 1MB,那么上面代码在运行时你可能会遇到MemoryBufferIsFullError
异常:
org.apache.pulsar.client.api.PulsarClientException$MemoryBufferIsFullError: Client memory buffer is full at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:972) at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:452) at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:343) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:102) at org.apache.pulsar.client.impl.ProducerBase.sendAsync(ProducerBase.java:68)
另外若服务本身与 Pulsar Broker 之间出现了网络波动,或者 Pulsar 服务内部组件之间出现网络波动,导致整体Producer 写入延迟升高,亦或是短时间出现大量写入,你还可能会遇到ProducerQueueIsFullError
异常:
org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError: Producer send queue is full at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:965) at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:452) at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:343) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:102) at org.apache.pulsar.client.impl.ProducerBase.sendAsync(ProducerBase.java:68)
3. Producer 端内存控制机制
3.1 基于消息数量的控制
- maxPendingMessages 配置
- maxPendingMessagesAcrossPartitions 配置
下面我们对上面两种异常产生的原因作一下分析,我们先来看一下构建 Producer 时,ProducerBuilder中与内存使用有关的配置项:
/* * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. */ ProducerBuilder<t> maxPendingMessages(int maxPendingMessages); /* * Set the number of max pending messages across all partitions. */ ProducerBuilder<t> maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions);
maxPendingMessages
用来控制 producer 内部队列中正在发送还没有接收到 broker 确认的消息数量,若队列大小超出了这个限制,默认的行为就是抛出ProducerQueueIsFullError
异常,你可以通过修改另外一个配置blockIfQueueFull=true
调整为阻塞等待队列中空出新的空间,这里还有另外需要注意的地方在下面会细说。
maxPendingMessages
这个配置实际上是直接传递给底层各个分区的内部 producer 的,对于多分区的 topic,实际处于pending状态的最大消息数量是maxPendingMessages
乘以 topic 分区数量。由于maxPendingMessages
结合可变的 topic 分区数量使得最终的 pending 消息数量变得不可控,因此还有另外一个优先级更高的配置maxPendingMessagesAcrossPartitions
用来控制整个 topic 所有分区的总的一个 pending 消息数量,最终到各个分区内部 producer 取 maxPendingMessages
和 maxPendingMessagesAcrossPartitions / partitions
的较小值。
3.2 基于内存大小的控制
- memoryLimit 配置
- PIP-74 改进
然而,在现实应用场景中,不同业务的消息大小差异很大,单纯基于消息数量控制内存使用,开发者需要预估平均消息大小,这几乎不可能做到,因为消息的实际大小很可能会随着业务演进而发生变化,因此在PIP-74中,在构建 PulsarClient 时,ClientBuilder 提供了一个面向整个 client 实例统一的内存限制配置:
/* * Configure a limit on the amount of memory that will be allocated by this client instance. * * Setting this to 0 will disable the limit. */ ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);
当客户端所有 producer 中所有 pending 的消息大小总和超过这个限制时,默认则会抛出MemoryBufferIsFullError
异常,若同时配置了blockIfQueueFull=true
,则当前线程会阻塞等待前面pending的消息发送完成。
3.3 阻塞与非阻塞模式
- blockIfQueueFull 配置
- 使用建议
前面提到关于blockIfQueueFull
配置的使用有一个细节需要注意,这个配置是为了限制客户端 producer 内存使用的同时,让开发者简化处理队列或者内存 buffer 满了的情况可以继续发送消息,例如在一个后台定时任务的场景中批量发送消息。然而这里需要强调的是 blockIfQueueFull 一旦配置为 true,不论是应用发送消息调用的是阻塞的 Producer.send
方法还是非阻塞的 Producer.sendAsync
方法都会出现阻塞等待,“卡”住当前线程,那么对于我们上面的业务来说这是不可接受的,若由于支线流程(特殊情况容忍丢失的用户搜索事件)异常抖动,阻塞了主线流程(搜索主线程)就得不偿失了。
// 注意:若producer配置了blockIfQueueFull=true, // 当发送队列满或者内存buffer满,当前线程将卡在sendAsync方法调用 producer.sendAsync(/* message payload here */).whenComplete((msgId, ex) -> { if (ex != null) { log.error("Failed to record search activity", ex); } else { log.debug("Search activity messageId={}", msgId); } });
PIP-120对2.10.0
以及之后版本的客户端中,默认启用了memoryLimit配置,其默认值为64MB
,同时默认禁用了maxPendingMessages
和maxPendingMessagesAcrossPartitions
配置(默认值修改为0),另外将maxPendingMessagesAcrossPartitions
配置标记为了Deprecated
,因为使用这个配置最终目的就是控制客户端producer的内存使用,现在已经有memoryLimit这个更加直接的配置可以替代了。
4. Consumer 端内存控制
4.1 Consumer 端的内存使用特点
上面说的全部都是围绕着 Producer 侧的内存使用来讲的,其实在 PIP-74 中也提到了 Pulsar 客户端 consumer 侧的内存使用,只不过在实现中是分阶段进行的。
我们先来看一下Pulsar客户端的API早期在构造一个Consumer时,ConsumerBuilder 提供的与内存使用有关的选项:
/* * Sets the size of the consumer receive queue. * * (default: 1000) */ ConsumerBuilder<t> receiverQueueSize(int receiverQueueSize); /* * Sets the max total receiver queue size across partitions. * * (default: 50000) */ ConsumerBuilder<t> maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions);
Pulsar客户端通过预接收队列临时存放broker推送过来的消息,以便应用程序调用Consumer#receive
或者Consumer#receiveAsync
方法时直接从内存中返回消息,这是出于消费吞吐的考虑,本质上是一种以空间换取时间的策略。上面两个选项是给这个“空间”设置一个数量上的上限,注意这里仅是数量上的上限,实际的内存空间使用还要取决于平均消息大小。receiverQueueSize
控制每个分区consumer的接收队列大小,maxTotalReceiverQueueSizeAcrossPartitions
来控制所有分区consumer和parent consumer的接收队列总大小。
前面提到receiverQueueSize
和maxTotalReceiverQueueSizeAcrossPartitions
参数是以数量的形式间接的控制Consumer预接收队列的内存使用,在PIP-74中提出了整个client级别的memoryLimit,同时提出了一个新的控制Consumer内存使用的方案,就是autoScaledReceiverQueueSizeEnabled
:
/* * If this is enabled, the consumer receiver queue size is initialized as a very small value, 1 by default, * and will double itself until it reaches either the value set by {@link #receiverQueueSize(int)} or the client * memory limit set by {@link ClientBuilder#memoryLimit(long, SizeUnit)}. */ ConsumerBuilder<t> autoScaledReceiverQueueSizeEnabled(boolean enabled);
当启用了这个特性之后,receiverQueueSize
会从1开始呈2的指数倍增长,直至达到receiverQueueSize
的限制或达到client的memoryLimit限制,其目标是在有限制的内存使用下,达到最大的吞吐效率。
4.2 Consumer 端的内存注意事项
除了上面说的Producer和Consumer在生产和消费过程中的内存使用之外,还有一个容易被忽视的点是创建Consumer时ackTimeout
和ackTimeoutTickTime
的配置如果不匹配,会消耗较多堆内内存。
/* * Sets the timeout for unacknowledged messages, truncated to the nearest millisecond. The timeout must be greater than 1 second. */ ConsumerBuilder<t> ackTimeout(long ackTimeout, TimeUnit timeUnit); /** * Define the granularity of the ack-timeout redelivery. * * <p>By default, the tick time is set to 1 second. Using a higher tick time * reduces the memory overhead to track messages when the ack-timeout is set to * bigger values (e.g., 1 hour). */ ConsumerBuilder<t> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit);
若Consumer配置了ackTimeout并且配置了较大的时间窗口(例如1小时或者更长)时,应适当的调大ackTimeoutTickTime
,这是因为Consumer内部使用了一个简单时间轮的算法,若ackTimeout时间窗口很大,ackTimeoutTickTime
仍然使用其默认值1s
,时间轮本身将会占用大量堆内存空间。
5. 最佳实践小结
- 使用
sendAsync
非阻塞方法要注意其不能保证消息一定发送成功,特别是开启了blockIfQueueFull
之后,它会在特定情况下演变成阻塞方法。 - 对于同时使用到了Producer和Consumer的应用,推荐创建两个client,分别用来创建Producer和Consumer,避免由于共用memoryLimit导致相互影响。
参考
</t></p></t></t></t></t></t></t></byte[]>

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Databend 亮相 DTCC 2025:存算分离架构引领湖仓一体化
在数字化转型加速推进的今天,实时数据仓库与湖仓一体化架构已成为企业数据平台建设的核心议题。面对海量数据的爆发式增长和日益复杂的业务需求,如何构建既能满足实时性要求,又能兼顾成本效益的现代化数据平台,成为每一位数据架构师面临的重大挑战。8月21-23日,以"智能创新 数赢未来"为主题的第十六届中国数据库技术大会(DTCC 2025)在北京成功举办,Databend作为新一代云原生数据仓库的代表,全方位展示了其在AI时代湖仓一体化建设方面的创新实践与技术突破。 Databend 联合创始人深度思考:AI 时代下的湖仓一体化平台建设 在 DTCC 2025 的「实时数仓与湖仓一体应用实践(上)」专场中,Databend 联合创始人吴炳锡不仅担任专场主持人,更带来了题为《AI 时代下的湖仓一体化平台建设的思考》的深度分享。作为新一代云原生数据仓库的践行者,吴炳锡从技术演进的历史脉络出发,系统回顾了从传统数据库到现代湖仓一体化架构的发展历程。 Databend联合创始人吴炳锡 吴炳锡指出,传统大数据架构面临着架构复杂、组件繁多、运维成本高昂等诸多挑战。在 AI 时代背景下,企业不仅需要处理结构化...
- 下一篇
如何基于 SpringBoot 快速构建 Apache Pulsar 实时应用
Spring Boot Starter Pulsar [1] 是Spring 官方为 Apache Pulsar 开发的构建项目,该项目允许开发者轻松集成 Pulsar 客户端,并通过 Spring Boot 的方式,快速构建和开发 Pulsar 应用程序。 本文将带领大家基于SpringBoot 快速构建一个 Pulsar 实时应用。 快速开始 准备一个Pulsar Standalone 集群 Pulsar 支持 Standalone 模式启动服务,并且功能与集群模式几乎一致,我们可以据此快速搭建测试服务。 参考:Run a standalone Pulsar cluster locally[2] 下载Pulsar 二进制包(本文以 3.0.7 LTS版本示例): Apache 地址:https://archive.apache.org/dist/pulsar/ 腾讯云镜像:https://mirrors.cloud.tencent.com/apache/pulsar/ 阿里云镜像:https://mirrors.aliyun.com/apache/pulsar/ 下载二进制包 wg...
相关文章
文章评论
共有0条评论来说两句吧...