vivo Pulsar 万亿级消息处理实践 (2) - 从 0 到 1 建设 Pulsar 指标监控链路
作者:vivo 互联网大数据团队- You Shuo
本文是《vivo Pulsar万亿级消息处理实践》系列文章第2篇,Pulsar支持上报分区粒度指标,Kafka则没有分区粒度的指标,所以Pulsar的指标量级要远大于Kafka。在Pulsar平台建设初期,提供一个稳定、低时延的监控链路尤为重要。
系列文章:
本文是基于Pulsar 2.9.2/kop-2.9.2展开的。
一、背景
作为一种新型消息中间件,Pulsar在架构设计及功能特性等方面要优于Kafka,所以我们引入Pulsar作为我们新一代的消息中间件。在对Pulsar进行调研的时候(比如:性能测试、故障测试等),针对Pulsar提供一套可观测系统是必不可少的。Pulsar的指标是面向云原生的,并且官方提供了Prometheus作为Pulsar指标的采集、存储和查询的方案,但是使用Prometheus采集指标面临以下几个问题:
-
Prometheus自带的时序数据库不是分布式的,它受单机资源的限制;
-
Prometheus 在存储时序数据时消耗大量的内存,并且Prometheus在实现高效查询和聚合计算的时候会消耗大量的CPU。
除了以上列出的可观测系统问题,Pulsar还有一些指标本身的问题,这些问题包括:
-
Pulsar的订阅积压指标单位是entry而不是条数,这会严重影响从Kafka迁移过来的用户的使用体验及日常运维工作;
-
Pulsar没有bundle指标,因为Pulsar自动均衡的最小单位是bundle,所以bundle指标是调试Pulsar自动均衡参数时重要的观测依据;
-
kop指标上报异常等问题。
针对以上列出的几个问题,我们在下面分别展开叙述。
二、Pulsar监控告警系统架构
在上一章节我们列出了使用Prometheus作为观测系统的局限,由于Pulsar的指标是面向云原生的,采用Prometheus采集Pulsar指标是最好的选择,但对于指标的存储和查询我们使用第三方存储来减轻Prometheus的压力,整个监控告警系统架构如下图所示:
在整个可观测系统中,各组件的职能如下:
-
Pulsar、bookkeeper等组件提供暴露指标的接口
-
Prometheus访问Pulsar指标接口采集指标
-
adaptor提供了服务发现、Prometheus格式指标的反序列化和序列化以及指标转发远端存储的能力,这里的远端存储可以是Pulsar或Kafka
-
Druid消费指标topic并提供数据分析的能力
-
vivo内部的检测告警平台提供了动态配置检测告警的能力
基于以上监控系统的设计逻辑,我们在具体实现的过程中遇到了几个比较关键的问题:
**一、**adaptor需要接收Pulsar所有线上服务的指标并兼容Prometheus格式数据,所以在调研Prometheus采集Pulsar指标时,我们基于Prometheus的官方文档开发了adaptor,在adaptor里实现了服务加入集群的发现机制以及动态配置prometheus采集新新加入服务的指标:
-
Prometheus动态加载配置:Prometheus配置-官方文档
-
Prometheus自定义服务发现机制:Prometheus自定义服务发现-官方文档
在可以动态配置Prometheus采集所有线上正在运行的服务指标之后,由于Prometheus的指标是基于protobuf协议进行传输的,并且Prometheus是基于go编写的,所以为了适配Java版本的adaptor,我们基于Prometheus和go提供的指标格式定义文件(remote.proto、types.proto和gogo.proto)生成了Java版本的指标接收代码,并将protobuf格式的指标反序列化后写入消息中间件。
**二、**Grafana社区提供的Druid插件不能很好的展示Counter类型的指标,但是bookkeeper上报的指标中又有很多是Counter类型的指标,vivo的Druid团队对该插件做了一些改造,新增了计算速率的聚合函数。
druid插件的安装可以参考官方文档(详情)
**三、**由于Prometheus比较依赖内存和CPU,而我们的机器资源组又是有限的,在使用远端存储的基础上,我们针对该问题优化了一些Prometheus参数,这些参数包括:
--storage.tsdb.retention=30m:该参数配置了数据的保留时间为30分钟,在这个时间之后,旧的数据将会被删除。
--storage.tsdb.min-block-duration=5m:该参数配置了生成块(block)的最小时间间隔为5分钟。块是一组时序数据的集合,它们通常被一起压缩和存储在磁盘上,该参数间接控制Prometheus对内存的占用。
--storage.tsdb.max-block-duration=5m:该参数配置了生成块(block)的最大时间间隔为5分钟。如果一个块的时间跨度超过这个参数所设的时间跨度,则这个块将被分成多个子块。
--enable-feature=memory-snapshot-on-shutdown:该参数配置了在Prometheus关闭时,自动将当前内存中的数据快照写入到磁盘中,Prometheus在下次启动时读取该快照从而可以更快的完成启动。
三、Pulsar 指标优化
Pulsar的指标可以成功观测之后,我们在日常的调优和运维过程中发现了一些Pulsar指标本身存在的问题,这些问题包括准确性、用户体验、以及性能调优等方面,我们针对这些问题做了一些优化和改造,使得Pulsar更加通用、易维护。
3.1 Pulsar消费积压指标
原生的Pulsar订阅积压指标单位是entry,从Kafka迁移到Pulsar的用户希望Pulsar能够和Kafka一样,提供以消息条数为单位的积压指标,这样可以方便用户判断具体的延迟大小并尽量不改变用户使用消息中间件的习惯。
在确保配置brokerEntryMetadataInterceptors=
org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor情况下,Pulsar broker端在往bookkeeper端写入entry前,通过拦截器往entry的头部添加索引元数据,该索引在同一分区内单调递增,entry头部元数据示例如下:
biz-log-partition-1 -l 24622961 -e 6 Batch Message ID: 24622961:6:0 Publish time: 1676917007607 Event time: 0 Broker entry metadata index: 157398560244 Properties: "X-Pulsar-batch-size 2431" "X-Pulsar-num-batch-message 50"
以分区为指标统计的最小单位,基于last add confirmed entry和last consumed entry计算两个entry中的索引差值,即是订阅在每个分区的数据积压。下面是cursor基于订阅位置计算订阅积压的示意图,其中last add confirmed entry在拦截器中有记录最新索引,对于last consumed entry,cursor需要从bookkeeper中读取,这个操作可能会涉及到bookkeeper读盘,所以在收集延迟指标的时候可能会增加采集的耗时。
效果
上图是新订阅积压指标和原生积压指标的对比,新增的订阅积压指标单位是条,原生订阅积压指标单位是entry。在客户端指定单条发送100w条消息时,订阅积压都有明显的升高,当客户端指定批次发送100w条消息的时候,新的订阅积压指标会有明显的升高,而原生订阅积压指标相对升高幅度不大,所以新的订阅积压指标更具体的体现了订阅积压的情况。
3.2 Pulsar bundle指标
Pulsar相比于Kafka增加了自动负载均衡的能力,在Pulsar里topic分区是绑定在bundle上的,而负载均衡的最小单位是bundle,所以我们在调优负载均衡策略和参数的时候比较依赖bunlde的流量分布指标,并且该指标也可以作为我们切分bundle的参考依据。我们在开发bundle指标的时候做了下面两件事情:
统计当前Pulsar集群非游离状态bundle的负载情况对于处于游离状态的bundle(即没有被分配到任何broker上的bundle),我们指定Pulsar leader在上报自身bundle指标的同时,上报这些处于游离状态的bundle指标,并打上是否游离的标签。
效果
上图就是bundle的负载指标,除了出入流量分布的情况,我们还提供了生产者/消费者到bundle的连接数量,以便运维同学从更多角度来调优负载均衡策略和参数。
3.3 kop消费延迟指标无法上报
在我们实际运维过程中,重启kop的Coordinator节点后会偶发消费延迟指标下降或者掉0的问题,从druid查看上报的数据,我们发现在重启broker之后消费组就没有继续上报kop消费延迟指标。
(1)原因分析
由于kop的消费延迟指标是由Kafka lag exporter采集的,所以我们重点分析了Kafka lag exporter采集消费延迟指标的逻辑,下图是Kafka-lag-exporter采集消费延迟指标的示意图:
其中,kafka-lag-exporter计算消费延迟指标的逻辑会依赖kop的describeConsumerGroups接口,但是当GroupCoordinator节点重启后,该接口返回的member信息中assignment数据缺失,kafka-lag-exporter会将assignment为空的member给过滤掉,所以最终不会上报对应member下的分区指标,代码调试如下图所示:
为什么kop/Kafka describeConsumerGroups接口返回member的assignment是空的?因为consumer在启动消费时会通过groupManager.storeGroup写入__consumer_
offset,在coordinator关闭时会转移到另一个broker,但另一个broker并没有把assignment字段反序列化出来(序列化为groupMetadataValue,反序列化为readGroupMessageValue),如下图:
(2)解决方案
在GroupMetadataConstants#readGroup-
MessageValue()方法对coordinator反序列化消费组元数据信息时,将assignment字段读出来并设置(序列化为groupMetadataValue,反序列化为readGroupMessageValue),如下图:
四、总结
在Pulsar监控系统构建的过程中,我们解决了与用户体验、运维效率、Pulsar可用性等方面相关的问题,加快了Pulsar在vivo的落地进度。虽然我们在构建Pulsar可观测系统过程中解决了一部分问题,但是监控链路仍然存在单点瓶颈等问题,所以Pulsar在vivo的发展未来还会有很多挑战。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Vite v7.0 正式发布,超快的前端构建工具
Vite v7.0 已正式发布。 Vite(法语意为 "快速的",发音/vit/,发音同 "veet")是一种新型前端构建工具,能够显著提升前端开发体验。它主要由两部分组成: 一个开发服务器,它基于原生 ES 模块提供了丰富的内建功能,如速度快到惊人的模块热替换(HMR)。 一套构建指令,它使用Rollup打包你的代码,并且它是预配置的,可输出用于生产环境的高度优化过的静态资源。 Node.js 支持 Vite 现在要求使用 Node.js 20.19+ 或 22.12+。由于 Node.js 18 已于 2025 年 4 月底达到生命周期终点(EOL),我们已不再支持该版本。 我们要求使用这些新的 Node.js 版本范围,以确保 Node.js 可以无需启用标志即可支持require(esm)。这使我们能够仅以 ESM 格式发布 Vite 7.0,同时不会阻止 CJS 模块通过require调用 Vite 的 JavaScript API。有关当前 ESM 在生态系统中的发展状况的详细分析,请查看 Anthony Fu 的文章《迈向纯 ESM》。 默认浏览器兼容性目标已更改为 B...
- 下一篇
空间理解模型 SpatialLM 正式发布首份技术报告
近日,空间理解模型SpatialLM正式发布首份技术报告。这一模型此前曾与DeepSeek-V3、通义千问Qwen2.5-Omni一起登上全球最大的开源社区HuggingFace全球趋势榜前三。 作为一款将大语言模型扩展到3D空间理解任务中的模型,SpatialLM能从3D点云输入生成结构化的空间场景描述,这一过程突破了大语言模型对物理世界几何与空间关系的理解局限,让机器具备空间认知与推理能力,为具身智能等相关领域提供空间理解基础训练框架。 在开源后经过广泛的实际验证,本次技术报告聚焦SpatialLM 1.1升级版本,其不仅包含了详细的消融实验与训练配方,还在点云编码方式、分辨率、用户指定识别类目等维度上实现优化。 多项基准测试数据显示:该模型在任务数据集微调后,在空间布局识别、3D物体检测任务中,均达到了相比与最新专业模型持平或更优的效果。 本次报告重点围绕算法框架和训练数据两方面展开。 在算法架构方面,SpatialLM将大语言模型(LLMs)扩展到3D空间理解任务中,特别在结构化室内建模领域实现了重要突破。 这一技术路线打破了传统任务专属架构(task-specific arc...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS7安装Docker,走上虚拟化容器引擎之路
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS6,CentOS7官方镜像安装Oracle11G
- Mario游戏-低调大师作品
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7