面试官问我:线程锁导致的kafka客户端超时,如何解决?
本文分享自华为云社区《线程锁导致的kafka客户端超时问题》,作者: 张俭 。
问题背景
有一个环境的kafka client发送数据有部分超时,拓扑图也非常简单
定位历程
我们先对客户端的环境及JVM情况进行了排查,从JVM所在的虚拟机到kafka server的网络正常,垃圾回收(GC)时间也在预期范围内,没有出现异常。
紧接着,我们把目光转向了kafka 服务器,进行了一些基础的检查,同时也查看了kafka处理请求的超时日志,其中我们关心的metadata和produce请求都没有超时。
问题就此陷入了僵局,虽然也搜到了一些kafka server会对连上来的client反解导致超时的问题( https://github.com/apache/kafka/pull/10059),但通过一些简单的分析,我们确定这并非是问题所在。
同时,我们在环境上也发现一些异常情况,当时觉得不是核心问题/解释不通,没有深入去看
- 问题JVM线程数较高,已经超过10000,这个线程数量虽然确实较高,但并不会对1个4U的容器产生什么实质性的影响。
- 负责指标上报的线程CPU较高,大约占用了1/4 ~ 1/2 的CPU核,这个对于4U的容器来看问题也不大
当排查陷入僵局,我们开始考虑其他可能的调查手段。我们尝试抓包来找线索,这里的抓包是SASL鉴权+SSL加密的,非常难读,只能靠长度和响应时间勉强来推断报文的内容。
在这个过程中,我们发现了一个非常重要的线索,客户端竟然发起了超时断链,并且超时的那条消息,实际服务端是有响应回复的。
随后我们将kafka client的trace级别日志打开,这里不禁感叹kafka client日志打的相对较少,发现的确有log.debug(“Disconnecting from node {} due to request timeout.”, nodeId);的日志打印。
与网络相关的流程:
try { // 这里发出了请求 client.send(request, time.milliseconds()); while (client.active()) { List<ClientResponse> responses = client.poll(Long.MAX_VALUE, time.milliseconds()); for (ClientResponse response : responses) { if (response.requestHeader().correlationId() == request.correlationId()) { if (response.wasDisconnected()) { throw new IOException("Connection to " + response.destination() + " was disconnected before the response was read"); } if (response.versionMismatch() != null) { throw response.versionMismatch(); } return response; } } } throw new IOException("Client was shutdown before response was read"); } catch (DisconnectException e) { if (client.active()) throw e; else throw new IOException("Client was shutdown before response was read"); }
这个poll方法,不是简单的poll方法,而在poll方法中会进行超时判断,查看poll方法中调用的handleTimedOutRequests方法
@Override public List<ClientResponse> poll(long timeout, long now) { ensureActive(); if (!abortedSends.isEmpty()) { // If there are aborted sends because of unsupported version exceptions or disconnects, // handle them immediately without waiting for Selector#poll. List<ClientResponse> responses = new ArrayList<>(); handleAbortedSends(responses); completeResponses(responses); return responses; } long metadataTimeout = metadataUpdater.maybeUpdate(now); try { this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); } catch (IOException e) { log.error("Unexpected error during I/O", e); } // process completed actions long updatedNow = this.time.milliseconds(); List<ClientResponse> responses = new ArrayList<>(); handleCompletedSends(responses, updatedNow); handleCompletedReceives(responses, updatedNow); handleDisconnections(responses, updatedNow); handleConnections(); handleInitiateApiVersionRequests(updatedNow); // 关键的超时判断 handleTimedOutRequests(responses, updatedNow); completeResponses(responses); return responses; }
由此我们推断,问题可能在于客户端hang住了一段时间,从而导致超时断链。我们通过工具Arthas深入跟踪了Kafka的相关代码,甚至发现一些简单的操作(如A.field)也需要数秒的时间。这进一步确认了我们的猜想:问题可能出在JVM。JVM可能在某个时刻出现问题,导致系统hang住,但这并非由GC引起。
为了解决这个问题,我们又检查了监控线程CPU较高的问题。我们发现线程的执行热点是从"sun.management.ThreadImpl"中的"getThreadInfo"方法。
"metrics-1@746" prio=5 tid=0xf nid=NA runnable java.lang.Thread.State: RUNNABLE at sun.management.ThreadImpl.getThreadInfo(Native Method) at sun.management.ThreadImpl.getThreadInfo(ThreadImpl.java:185) at sun.management.ThreadImpl.getThreadInfo(ThreadImpl.java:149)
进一步发现,在某些版本的JDK8中,读取线程信息是需要加锁的。
至此,问题的根源已经清晰明了:过高的线程数以及线程监控时JVM全局锁的存在导致了这个问题。您可以使用如下的demo来复现这个问题
import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class ThreadLockSimple { public static void main(String[] args) { for (int i = 0; i < 15_000; i++) { new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(200_000); } catch (InterruptedException e) { throw new RuntimeException(e); } } }).start(); } ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); executorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println("take " + " " + System.currentTimeMillis()); } }, 1, 1, TimeUnit.SECONDS); ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); ScheduledExecutorService metricsService = Executors.newSingleThreadScheduledExecutor(); metricsService.scheduleAtFixedRate(new Runnable() { @Override public void run() { long start = System.currentTimeMillis(); ThreadInfo[] threadInfoList = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds()); System.out.println("threads count " + threadInfoList.length + " cost :" + (System.currentTimeMillis() - start)); } }, 1, 1, TimeUnit.SECONDS); } }
为了解决这个问题,我们有以下几个可能的方案:
- 将不合理的线程数往下降,可能存在线程泄露的场景
- 升级jdk到jdk11或者jdk17(推荐)
- 将Thread相关的监控临时关闭
这个问题的解决方案应根据实际情况进行选择,希望对你有所帮助。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
聊聊kube-scheduler如何完成调度和调整调度权重
本文分享自华为云社区《kube-scheduler如何完成调度和调整调度权重》,作者: 可以交个朋友。 一、概述 Kube-scheduler作为k8s集群的默认调度器,它监听(watch机制)kube-apiserver,查询还未调度的pod,根据调度策略将pod调度至集群内最适合的Node 二、调度流程 首先我们通过API或者kubectl工具创建pod,kube-apiserver收到请求信息存储到etcd中,调度器通过watch机制监听apiserver查看到还未被调度的pod列表,循环遍历的为每个pod尝试分配node,这个分配过程如下: kube-scheduler内Informer组件list-watch apiserver,使用spec.nodeName=""筛选出还未调度的Pod 预选(predicate):调度器通过Predicate算法过滤掉不满足条件的节点 优选(priorlty):对于通过预选的节点,通过打分机制,筛选出得分最高的node 当调度器为Pod选择了一个合适的节点后,将Pod和节点进行绑定(将节点名称赋值给pod的spec.nodeName字段) ...
- 下一篇
Apache RocketMQ 5.0 腾讯云落地实践
ApacheRocketMQ发展历程回顾 RocketMQ最早诞生于淘宝的在线电商交易场景,经过了历年双十一大促流量洪峰的打磨,2016年捐献给Apache社区,成为Apache社区的顶级项目,并在国内外电商,金融,互联网等各行各业的广大客户落地验证,得到广泛认可。 ApacheRocketMQ社区在2022年10月正式对外发布了全新的5.0版本,腾讯云消息队列团队也和社区紧密合作,支持了5.0的商业化版本,现在将整个落地过程的经验教训做个总结,回馈社区。 什么是RocketMQ5.0? 一个新版本号? 一套新设计的API? 一系列新的特性实现? 一个存算分离的新架构? 一种新的商业化产品形态? RocketMQ面向云原生的新思考? ApacheRocketMQ社区过去一年对5.0新架构从不同的角度进行了分享介绍,导致很多用户对5.0新架构认识不一致,其实从以上不同角度理解都对,本文尝试从多个维度做一个较全面的解释和回顾,帮助用户全面理解RocketMQ5.0架构演进背后的思考逻辑。 RocketMQ5.0的演进目标 基础架构适应云原生化 RocketMQ运行依赖的环境过去十年发生了巨...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- MySQL8.0.19开启GTID主从同步CentOS8
- Mario游戏-低调大师作品
- Linux系统CentOS6、CentOS7手动修改IP地址
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7安装Docker,走上虚拟化容器引擎之路
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题