vivo Pulsar 万亿级消息处理实践(3)-KoP指标异常修复
作者:vivo 互联网大数据团队- Chen Jianbo
本文是《vivo Pulsar万亿级消息处理实践》系列文章第3篇。
Pulsar是Apache基金会的开源分布式流处理平台和消息中间件,它实现了Kafka的协议,可以让使用Kafka API的应用直接迁移至Pulsar,这使得Pulsar在Kafka生态系统中更加容易被接受和使用。KoP提供了从Kafka到Pulsar的无缝转换,用户可以使用Kafka API操作Pulsar集群,保留了Kafka的广泛用户基础和丰富生态系统。它使得Pulsar可以更好地与Kafka进行整合,提供更好的消息传输性能、更强的兼容性及可扩展性。vivo在使用Pulsar KoP的过程中遇到过一些问题,本篇主要分享一个分区消费指标缺失的问题。
系列文章:
文章太长?1分钟看图抓住核心观点👇
一、问题背景
在一次版本灰度升级中,我们发现某个使用KoP的业务topic的消费速率出现了显著下降,具体情况如下图所示:
什么原因导致正常的升级重启服务器会出现这个问题呢?直接查看上报采集的数据报文:
kop_server_MESSAGE_OUT{group="",partition="0",tenant="kop",topic="persistent://kop-tenant/kop-ns/service-raw-stats"} 3 kop_server_BYTES_OUT{group="",partition="0",tenant="kop",topic="persistent://kop-tenant/kop-ns/service-raw-stats"} 188
我们看到,KoP消费指标kop_server_MESSAGE
_OUT、kop_server_BYTES_OUT是有上报的,但指标数据里的group标签变成了空串(缺少消费组名称),分区的消费指标就无法展示了。是什么原因导致了消费组名称缺失?
二、问题分析
1、找到问题代码
我们去找下这个消费组名称是在哪里获取的,是否逻辑存在什么问题。根据druid中的kop_subscription对应的消费指标kop_server_
MESSAGE_OUT、kop_server_BYTES_OUT,找到相关代码如下:
private void handleEntries(final List<Entry> entries, final TopicPartition topicPartition, final FetchRequest.PartitionData partitionData, final KafkaTopicConsumerManager tcm, final ManagedCursor cursor, final AtomicLong cursorOffset, final boolean readCommitted) { .... // 处理消费数据时,获取消费组名称 CompletableFuture<String> groupNameFuture = requestHandler .getCurrentConnectedGroup() .computeIfAbsent(clientHost, clientHost -> { CompletableFuture<String> future = new CompletableFuture<>(); String groupIdPath = GroupIdUtils.groupIdPathFormat(clientHost, header.clientId()); requestHandler.getMetadataStore() .get(requestHandler.getGroupIdStoredPath() + groupIdPath) .thenAccept(getResultOpt -> { if (getResultOpt.isPresent()) { GetResult getResult = getResultOpt.get(); future.complete(new String(getResult.getValue() == null ? new byte[0] : getResult.getValue(), StandardCharsets.UTF_8)); } else { // 从zk节点 /client_group_id/xxx 获取不到消费组,消费组就是空的 future.complete(""); } }).exceptionally(ex -> { future.completeExceptionally(ex); return null; }); returnfuture; }); // this part is heavyweight, and we should not execute in the ManagedLedger Ordered executor thread groupNameFuture.whenCompleteAsync((groupName, ex) -> { if (ex != null) { log.error("Get groupId failed.", ex); groupName = ""; } ..... // 获得消费组名称后,记录消费组对应的消费指标 decodeResult.updateConsumerStats(topicPartition, entries.size(), groupName, statsLogger);
代码的逻辑是,从requestHandler的currentConnectedGroup(map)中通过host获取groupName,不存在则通过MetadataStore(带缓存的zk存储对象)获取,如果zk缓存也没有,再发起zk读请求(路径为/client_group_id/host-clientId)。读取到消费组名称后,用它来更新消费组指标。从复现的集群确定走的是这个分支,即是从metadataStore(带缓存的zk客户端)获取不到对应zk节点/client_group_id/xxx。
2、查找可能导致zk节点/client_group_id/xxx节点获取不到的原因
有两种可能性:一是没写进去,二是写进去但是被删除了。
@Override protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinator, CompletableFuture<AbstractResponse> resultFuture) { ... // Store group name to metadata store for current client, use to collect consumer metrics. storeGroupId(groupId, groupIdPath) .whenComplete((stat, ex) -> { if (ex != null) { // /client_group_id/xxx节点写入失败 log.warn("Store groupId failed, the groupId might already stored.", ex); } findBroker(TopicName.get(pulsarTopicName)) .whenComplete((node, throwable) -> { .... }); }); ...
从代码看到,clientId与groupId的关联关系是通过handleFindCoordinatorRequest(FindCoordinator)写进去的,而且只有这个方法入口。由于没有找到warn日志,排除了第一种没写进去的可能性。看看删除的逻辑:
protected void close(){ if (isActive.getAndSet(false)) { ... currentConnectedClientId.forEach(clientId -> { String path = groupIdStoredPath + GroupIdUtils.groupIdPathFormat(clientHost, clientId); // 删除zk上的 /client_group_id/xxx 节点 metadataStore.delete(path, Optional.empty()) .whenComplete((__, ex) -> { if (ex != null) { if (ex.getCause() instanceof MetadataStoreException.NotFoundException) { if (log.isDebugEnabled()) { log.debug("The groupId store path doesn't exist. Path: [{}]", path); } return; } log.error("Delete groupId failed. Path: [{}]", path, ex); return; } if (log.isDebugEnabled()) { log.debug("Delete groupId success. Path: [{}]", path); } }); }); } }
删除是在requsetHandler.close方法中执行,也就是说连接断开就会触发zk节点删除。
但有几个疑问:
-
/client_group_id/xxx 到底是干嘛用的?消费指标为什么要依赖它
-
为什么要在handleFindCoordinatorRequest写入?
-
节点/client_group_id/xxx为什么要删除,而且是在连接断开时删除,删除时机是否有问题?
首先回答第1个问题,通过阅读代码可以知道,/client_group_id/xxx 这个zk节点是用于在不同broker实例间交换数据用的(相当redis cache),用于临时存放IP+clientId与groupId的映射关系。由于fetch接口(拉取数据)的request没有groupId的,只能依赖加入Group过程中的元数据,在fetch消费时才能知道当前拉数据的consumer是哪个消费组的。
3、复现
若要解决问题,最好能够稳定地复现出问题,这样才能确定问题的根本原因,并且确认修复是否完成。
因为节点是在requsetHandle.close方法中执行删除,broker节点关闭会触发连接关闭,进而触发删除。假设:客户端通过brokerA发起FindCoordinator请求,写入zk节点/client_group
_id/xxx,同时请求返回brokerB作为Coordinator,后续与brokerB进行joinGroup、syncGroup等交互确定消费关系,客户端在brokerA、brokerB、brokerC都有分区消费。这时重启brokerA,分区均衡到BrokerC上,但此时/client_group_id/xxx因关闭broker而断开连接被删除,consumer消费刚转移到topic1-partition-1的分区就无法获取到groupId。
按照假设,有3个broker,开启生产和消费,通过在FindCoordinator返回前获取node.leader()的返回节点BrokerB,关闭brokerA后,brokerC出现断点复现,再关闭brokerC,brokerA也会复现(假设分区在brokerA与brokerC之间转移)。
复现要几个条件:
-
broker数量要足够多(不小于3个)
-
broker内部有zk缓存metadataCache默认为5分钟,可以把时间调小为1毫秒,相当于没有cache
-
findCoordinator返回的必须是其他broker的IP
-
重启的必须是接收到findCoordinator请求那台broker,而不是真正的coordinator,这时会从zk删除节点
-
分区转移到其他broker,这时新的broker会重新读取zk节点数据
到此,我们基本上清楚了问题原因:连接关闭导致zk节点被删除了,别的broker节点需要时就读取不到了。那怎么解决?
三、问题解决
方案一
既然知道把消费者与FindCoordinator的连接进行绑定不合适的,那么是否应该把FindCoordinator写入zk节点换成由JoinGroup写入,断连即删除。
consumer统一由Coordinator管理,由于FindCoordinator接口不一定是Coordinator处理的,如果换成由Coordinator处理的JoinGroup接口是否就可以了,这样consumer断开与Coordinator的连接就应该删除数据。但实现验证时却发现,客户端在断连后也不会再重连,所以没法重新写入zk,不符合预期。
方案二
还是由FindCoordinator写入zk节点,但删除改为GroupCoordinator监听consumer断开触发。
因为consumer统一由Coordinator管理,它能监听到consumer加入或者离开。GroupCoordinator的removeMemberAndUpdateGroup方法是coordinator对consumer成员管理。
private void removeMemberAndUpdateGroup(GroupMetadata group, MemberMetadata member) { group.remove(member.memberId()); switch (group.currentState()) { case Dead: case Empty: return; case Stable: case CompletingRebalance: maybePrepareRebalance(group); break; case PreparingRebalance: joinPurgatory.checkAndComplete(new GroupKey(group.groupId())); break; default: break; } // 删除 /client_group_id/xxx 节点 deleteClientIdGroupMapping(group, member.clientHost(), member.clientId()); }
调用入口有两个,其中handleLeaveGroup是主动离开,onExpireHeartbeat是超时被动离开,客户端正常退出或者宕机都可以调用removeMemberAndUpdateGroup方法触发删除。
public CompletableFuture<Errors> handleLeaveGroup( String groupId, String memberId ) { return validateGroupStatus(groupId, ApiKeys.LEAVE_GROUP).map(error -> CompletableFuture.completedFuture(error) ).orElseGet(() -> { return groupManager.getGroup(groupId).map(group -> { return group.inLock(() -> { if (group.is(Dead) || !group.has(memberId)) { return CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID); } else { ... // 触发删除消费者consumer removeMemberAndUpdateGroup(group, member); return CompletableFuture.completedFuture(Errors.NONE); } }); }) .... }); } void onExpireHeartbeat(GroupMetadata group, MemberMetadata member, long heartbeatDeadline) { group.inLock(() -> { if (!shouldKeepMemberAlive(member, heartbeatDeadline)) { log.info("Member {} in group {} has failed, removing it from the group", member.memberId(), group.groupId()); // 触发删除消费者consumer removeMemberAndUpdateGroup(group, member); } return null; }); }
但这个方案有个问题是,日志运维关闭broker也会触发一个onExpireHeartbeat事件删除zk节点,与此同时客户端发现Coordinator断开了会马上触发FindCoordinator写入新的zk节点,但如果删除晚于写入的话,会导致误删除新写入的节点。我们干脆在关闭broker时,使用ShutdownHook加上shuttingdown状态防止关闭broker时删除zk节点,只有客户端断开时才删除。
这个方案修改上线半个月后,还是出现了一个客户端的消费指标无法上报的情况。后来定位发现,如果客户端因FullGC出现卡顿情况,客户端可能会先于broker触发超时,也就是先超时的客户端新写入的数据被后监听到超时的broker误删除了。因为写入与删除并不是由同一个节点处理,所以无法在进程级别做并发控制,而且也无法判断哪次删除对应哪次的写入,所以用zk也是很难实现并发控制。
方案三
其实这并不是新的方案,只是在方案二基础上优化:数据一致性检查。
既然我们很难控制好写入与删除的先后顺序,我们可以做数据一致性检查,类似于交易系统里的对账。因为GroupCoordinator是负责管理consumer成员的,维护着consumer的实时状态,就算zk节点被误删除,我们也可以从consumer成员信息中恢复,重新写入zk节点。
private void checkZkGroupMapping(){ for (GroupMetadata group : groupManager.currentGroups()) { for (MemberMetadata memberMetadata : group.allMemberMetadata()) { String clientPath = GroupIdUtils.groupIdPathFormat(memberMetadata.clientHost(), memberMetadata.clientId()); String zkGroupClientPath = kafkaConfig.getGroupIdZooKeeperPath() + clientPath; // 查找zk中是否存在节点 metadataStore.get(zkGroupClientPath).thenAccept(resultOpt -> { if (!resultOpt.isPresent()) { // 不存在则进行补偿修复 metadataStore.put(zkGroupClientPath, memberMetadata.groupId().getBytes(UTF\_8), Optional.empty()) .thenAccept(stat -> { log.info("repaired clientId and group mapping: {}({})", zkGroupClientPath, memberMetadata.groupId()); }) .exceptionally(ex -> { log.warn("repaired clientId and group mapping failed: {}({})", zkGroupClientPath, memberMetadata.groupId()); return null; }); } }).exceptionally(ex -> { log.warn("repaired clientId and group mapping failed: {} ", zkGroupClientPath, ex); return null; }); } } }
经过方案三的优化上线,即使是历史存在问题的消费组,个别分区消费流量指标缺少group字段的问题也得到了修复。具体效果如下图所示:
四、总结
经过多个版本的优化和线上验证,最终通过方案三比较完美的解决了这个消费指标问题。在分布式系统中,并发问题往往难以模拟和复现,我们也在尝试多个版本后才找到有效的解决方案。如果您在这方面有更好的经验或想法,欢迎提出,我们共同探讨和交流。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
MCP 核心架构解析
引言 Model Context Protocol (MCP) 是一种为连接大型语言模型(LLM)应用而设计的通信协议,它建立在灵活、可扩展的架构基础上,旨在实现LLM应用程序与各类集成之间的无缝交互。本文将深入解析MCP的核心架构设计,包括其组件构成、通信机制、生命周期管理以及最佳实践,帮助开发者全面理解这一协议的工作原理和实现方式。 正文 1. 整体架构概述 MCP采用经典的客户端-服务器架构模型,包含三个主要角色: 主机(Host):通常是启动连接的LLM应用程序,如Claude Desktop或各类IDE环境。主机负责初始化和维护整个通信流程。 客户端(Client):在主机应用内部运行,与服务器保持1:1的连接关系,负责发送请求和接收响应。 服务器(Server):向客户端提供上下文信息、工具支持和提示内容,是整个协议的服务提供方。 这种分层架构设计使得MCP能够灵活适应不同的应用场景,同时保持高效的通信性能。 2. 核心组件解析 2.1 协议层设计 协议层是MCP的核心抽象层,主要负责以下功能: 消息帧的封装与解析 请求/响应的关联匹配 高层通信模式的管理 典型的协议层类结...
- 下一篇
从Rust模块化探索到DLB 2.0实践|得物技术
一、前言 在云原生架构高速迭代的背景下,基础设施的性能瓶颈与安全隐患成为技术演进的关键挑战。本文系统记录了团队基于Rust语言改造Nginx组件的完整技术路径:从接触Cloudflare的quiche库,引发对Rust安全特性的探索,到通过FFI实现核心逻辑的跨语言调用;从突破传统C模块开发范式自研ngx_http_rust_module SDK,到全面采用Pingora框架构建新一代DLB 2.0流量调度平台。 实践表明,Rust的内存安全机制与异步高并发能力可显著提升负载均衡组件的性能边界与可靠性,为超大规模流量调度场景提供全新解决方案。本技术演进过程将详述架构设计、核心模块实现及性能优化策略,为同类基础设施升级提供可复用的工程经验。 二、Nginx+Rust 的模块化探索 探索的起点源于和quiche(cloudflare开发的高效quic实现)的初次邂逅,这扇门将项目组成员引入了 Rust 语言的世界。Rust 以其卓越的内存安全、无惧并发的特性以及出色的性能潜力,迅速展示了其作为系统级编程语言的优势。这份吸引力促使我们思考:能否将 Rust 的安全与性能注入我们更广泛的基础设...
相关文章
文章评论
共有0条评论来说两句吧...