技术文档 | Pulsar 中的消息保留、过期及积压机制解析(下)
在 Pulsar broker 中, 消息的 Retention, Expiry 和 Backlog quota 是比较重要的功能,它们表现的是 Pulsar 对于流经它的数据的管理。 但是受限于复杂度和文档语言等因素,开发者可能无法在第一时间很直观的了解它们。
本系列上篇为大家介绍了 Retention 和 Expiry 的概念、行为、应用、实现和注意事项技术文档 | Pulsar 中的消息保留、过期及积压机制解析(上),本文将带来关于 Backlog quota 的解析。
Backlog quota
1. 概念
Backlog 意为消息积压,指未被消费的消息;quota 意为配额,指对于未消费消息的限制。因此 Backlog quota 是为了限制消息堆积。
当消费者的消费速率跟不上生产者的生产速率时,会出现消息堆积的情况,这在日常开发过程中非常常见。尽管**相比于其他消息队列,Pulsar 提供了几乎可以无限扩容消费者数量的机制 **(Shared, Key_Shared订阅模式) 来提高消费速率, 但是在实际的业务场景中,消息堆积的情况也时有发生。
为了应对这种情况,Pulsar 提供了 Backlog quota 机制来在一定程度治理它。当然,这种治理无法提高消费者的消费速率,只是在生产速率和消费速率之间做出一种平衡,比如说它的一种治理策略是自动清理 Backlog 消息。
2. 行为
Pulsar 在 Topic 级别和 Subscription 级别都有 Backlog 的概念。Topic 级别的 Backlog 是指该 Topic 下所有 Subscription 的 Backlog 总和 (pulsar_msg_backlog 和 pulsar_storage_backlog_size 的含义略有差异,这里使用 pulsar_msg_backlog 的含义),Subscription 级别的 Backlog 是指当前 Subscription 的 Backlog。
Backlog quota 机制实际工作在 Subscription 级别,它和 Expiry 机制略有相似,但更加强大。它对于 Backlog 有两项限制、两种作用域和三种治理策略:
2.1 两项限制
-
limitTime:Backlog 的最大存活时间,单位是秒,超过这个时间的 Backlog 会进入治理流程;
-
limitSize:Backlog 的最大大小,单位是字节,超过这个大小的 Backlog 会进入治理流程;
2.2 两种作用域
-
destination_storage:针对 Topic 的 Backlog 的存储空间,和 limitSize 搭配使用;
-
message_age:针对 Topic 的 Backlog 的消息存活时间,和 limitTime 搭配使用;
2.3 三种治理策略
-
producer_request_hold:当 Backlog 超过限制,Pulsar 会挂起 Producer 的链接请求,直到 Backlog 降到限制以下;
-
consumer_backlog_eviction:当 Backlog 超过了限制,Pulsar 会自动移动所有超限的 Subscription 的游标(相当于自动确认这些消息,使得这些消息对 Consumer 不可见),将 Backlog 降低到限制以下;
-
producer_exception:当 Backlog 超过了限制,客户端创建 Producer 会抛出异常,直到 Backlog 降到限制以下。
3. 应用
3.1 监控
-
Pulsar 在 Prometheus 上提供了 pulsar_msg_backlog 和 pulsar_storage_backlog_size 来分别观测 Topic 级别的未消费的消息数量、未消费消息的总大小。如果这两个指标数值较高,说明该 Topic 消息积压严重。
另外,Pulsar 也提供了 pulsar_subscription_back_log 这一 Subscription 级别的指标,当我们发现某个 Topic 的 backlog 数值较高时,可以通过查看该 Topic 下的 pulsar_subscription_back_log 指标来找到具体的 Subscription;
-
通过 Topic stats 来监控 Backlog quota 的情况:
pulsar-admin topics stats persistent://my-tenant/my-ns/my-topic
3.2 设置
和 Retention 和 Expiry 一样,Backlog quota 的设置也分为两个级别:namespace 和 topic 级别。在 Namespace 级别设置了之后,该 Namespace 的所有 Topic 都会继承该策略;在 Topic 级别设置了之后,该 Topic 会覆盖 Namespace 的设置。
Namespace 级别
-
查看当前 Namespace 的 Backlog quota
pulsar-admin namespaces get-backlog-quotas my-tenant/my-ns
-
设置 Namespace 的 Backlog quota
pulsar-admin namespaces set-backlog-quota my-tenant/my-ns --limitTime 3600 --policy producer_request_hold --type message_age
-
删除 Namespace 的 Backlog quota
pulsar-admin namespaces remove-backlog-quota my-tenant/my-ns
Topic 级别
-
查看当前 Topic 的 Backlog quota
pulsar-admin topics get-backlog-quotas persistent://my-tenant/my-ns/my-topic
-
设置 Topic 的 Backlog quota
pulsar-admin topics set-backlog-quota persistent://my-tenant/my-ns/my-topic --limitTime 3600 --policy producer_request_hold --type message_age
-
删除 Topic 的 Backlog quota
pulsar-admin topics remove-backlog-quota persistent://my-tenant/my-ns/my-topic
4. 实现
Backlogquota 机制的触发有两个入口,分别是 ServerCnx#handleProducer(https://github.com/apache/pulsar/blob/v3.0.4/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1448) 和 BrokerService#startBacklogQuotaChecker()(https://github.com/apache/pulsar/blob/v3.0.4/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java#L657) ,前者和后者略有差距,这里不做详细解释。仅以 BrokerService#startBacklogQuotaChecker() 为例,简单介绍 Backlog quota 的执行流程。
4.1 Backlog Quota Checker 初始化
在 Pulsar 启动时, BrokerService#startBacklogQuotaChecker() 会检查当前 Broker 是否允许 Backlog quota 检查(broker.conf的backlogQuotaCheckEnabled(default=true))。如果允许,向线程池注册一个定时任务,定时任务的执行周期是 broker.conf 的 backlogQuotaCheckIntervalInSeconds(default=60s)。Pulsar 每隔 60s 检查一次所有 Topic 的 Backlog quota,如果 Topic 设置了 Backlog quota,执行后续流程。
4.2 Backlog Quota 执行流程
-
遍历所有 Topic,如果 Topic 设置了 Backlog quota,执行后续流程。否则,跳过该 Topic;
-
优先根据 limitSize 检查该 Topic 消费最慢的 Subscription 的 Backlog 是否超过了限制,如果未超限,再根据 limitTime 检查;
-
如果 Backlog 超过了限制,根据 policy 执行相应的治理策略:
-
producer_request_hold:挂起 Producer 的链接请求,直到 Backlog 降到限制以下;
-
consumer_backlog_eviction:自动移动所有超限的 Subscription 的游标,将 Backlog 降低到限制以下;
-
producer_exception:客户端创建 Producer 会抛出异常,直到 Backlog 降到限制以下。
-
5. 注意事项
-
Pulsar 暴露出的 Prometheus 指标中的 pulsar_storage_backlog_size 并不完全精准,它只是一个近似值。在 Shared 和 Key_Shared 模式下,允许消息单独确认消息,但是这些单独确认的消息不会加入到 Backlog 的计算中,因此这个指标并不会精准反映 Backlog 情况,它通常会比实际数值大;
-
pulsar_msg_backlog 一般也是近似值,不会将 Ack 空洞计算在内,但是如果将 broker.conf 的 exposePreciseBacklogInPrometheus 设置为 true,则会将单独确认的消息计算在内,pulsar_msg_backlog 会更加精准;
-
由于 EntryFilter(https://pulsar.apache.org/docs/3.2.x/develop-plugin/#entry-filter) 机制的存在,在消费消息时可以根据 EntryFilter 过滤掉一些消息,这些被过滤掉的消息严格来说并不算 Backlog,但是我们在计算 Backlog 时,不可能将 Bookkeeper 中的所有消息都拉取出来计算。因此如果 Broker 挂载了 EntryFilter 插件,pulsar_msg_backlog 和 pulsar_storage_backlog_size 以及 pulsar_subscription_back_log 都无法精准反映实际的 Backlog 情况,它们通常会比实际数据大一些;
-
不管是根据 limitSize 还是 limitTime 来限制 Backlog,当 policy=consumer_backlog_eviction 时,都无法完全精准的清理 Backlog。理想情况下,会将 Backlog 降低到原来的 10%;
-
如果在 Broker 端禁用 Backlog quota checker (将 broker.conf 中 backlogQuotaCheckEnabled 设置为 false),并且设置的 Backlog quota 的 policy=consumer_backlog_eviction,Pulsar 将不会自动清理 Backlog;
-
如果使用 Backlog quota,然后 Backlog 达到了阈值,并且 policy=producer_request_hold 或 producer_exception ,在 Broker 重启或自动重平衡时,会导致所有的 Producer 无法链接到 Broker,进而无法生产消息,直到 Backlog 降到限制以下;
-
如果使用 limitTime 限制 Backlog,需要注意 Client 和 Broker 的时间同步,否则可能会导致 Backlog 无法正确的清理。因为此时 Backlog 的判断是以 Broker 的时间为基准的;
-
如果该 Topic 设置了 Retention,Backlog quota 必须要小于 Retention。假设 Retention 设置了 10GB,Backlog quota 必须要小于 10GB。
总结
在文章最后,对 Pulsar 的 Retention, Expiry 和 Backlog quota 做一个总结:
-
Retention 是 Pulsar 对于过期数据的保留和清理策略,它工作在 Topic 级别,通过定时任务清理过期数据,将全部 Subscription 都消费过后的数据从存储介质上删除来清理存储空间;
-
Expiry 即为 Message TTL,它工作在 Subscription 级别,通过定时任务来检查 Subscription 中超时未消费的消息,并自动的将这些消息确认,使其对消费者不可见;
-
Backlog quota 是对未被消费的消息的限制,它实际工作在 Subscription 级别,通过定时任务来检查 Subscription 中的 Backlog,如果 Backlog 超过了限制,会执行相应的治理策略,拒绝新的 Producer 链接或者自动确认消息。
这三个功能并不冲突,它们可以组合使用,我们可以**通过 Retention 删除过期数据,通过 Expiry 处理超时未消费的数据,通过 Backlog quota 治理消息堆积。**但是由于他们三者都涉及到了对数据的操作,大家在使用时应当谨慎,在使用前根据实际业务仔细评估,避免数据丢失或者数据不一致的情况。
社区将持续输出更多 Pulsar 的技术内容;欢迎加入社群讨论或在评论区留言,与我们交流更多关于 Pulsar 的问题。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
深度解析:腾讯 BiFang 如何借助 Apache Pulsar 引领湖流一体新潮流
在大数据领域,腾讯的湖流一体存储引擎 BiFang 基于 Pulsar 构建,融合了消息队列与数据湖的能力,实现了流批数据处理入口统一,还具备全增量查询、实时可见等优势,为大数据处理带来全新体验。 Apache Pulsar 是构建高效大数据解决方案的核心基础之一,能助力企业更好地应对大数据挑战,实现数据价值最大化。 毕方 BiFang x Apache Pulsar- BiFang 联合 Pulsar 为湖流一体带来新体验 - BiFang,中文为毕方,中国古神话中的神鸟,象征着变革和能量,隐喻湖流一体存储引擎的先进和可靠。 在大数据领域,流计算和数据湖是两个核心组件,各自具有独特的功能和优势,流计算聚焦实时数据流处理, 数据湖专注海量数据存储分析。随着数据量的增长和实时分析需求的增加,将流计算与数据湖相结合的需求日益凸显,行业希望通过整合流计算与数据湖技术,逐步形成湖流一体技术范式。 腾讯天穹大数据去年发布了湖流一体的雏形系统------BSS 流批一体存储,该方案基于天穹 Pulsar 实现了消息队列和数据湖使用同一份数据,统一流和湖的客户端,支持数据秒级写入和查询,完成和 In...
- 下一篇
2-5 倍性能提升,30% 成本降低,阿里云 SelectDB 存算分离架构助力波司登集团实现降本增效
波司登集团作为全球领先的羽绒服公司,每年的销售旺季集中在四个月间,需高效把握业务机遇以实现高营收。为满足集团销售旺季的实时数据分析需求,同时降低淡季数据分析成本,波司登决定升级大数据架构,采用阿里云数据库 SelectDB 版升级数仓,基于阿里云 SelectDB 云原生存算分离架构,实现了资源隔离与弹性扩缩容,并取得了查询性能提升 2-5 倍、总体成本降低 30% 以上、效率提升 30% 的可观收益。 业务需求 波司登集团自 1976 年创立以来,专注羽绒服制造领域已有 48 年,产品畅销全球 72 个国家。2021 年,波司登羽绒服销售规模达到全球领先,并且在 2023 年实现全年营收 232.14 亿元,同比增长 38.4%。近年来,波司登集团通过数据驱动的精细化运营,成功从"羽绒服专家"转型为"多品类功能性服饰巨头",其数据分析业务覆盖门店运营、电商平台、用户运营等多个环节。 门店运营:波司登门店规模目前已超过 3500 家,运营注重精细化及高效化**。门店数据分析服务须具备高并发与低延迟能力**,以应对节假日、新品发布、促销及寒潮期间的销售高峰期,实时监控库存与销售数据,快速...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS8安装Docker,最新的服务器搭配容器使用
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS8编译安装MySQL8.0.19
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2全家桶,快速入门学习开发网站教程
- Docker使用Oracle官方镜像安装(12C,18C,19C)