kafka故障排查-consumer处理超时导致的异常
最近遇到一个kafka方面的问题,大致就是由于consumer处理业务超时,导致无法正常提交Offset,进而导致无法消费新消息的问题。下面我想从以下几个方面对此次故障排查进行复盘分析:业务背景、问题描述、排查思路、经验教训。
一、业务背景
先简单描述一下业务背景吧。我们有个业务需要严格按顺序消费Topic消息,所以针对该topic设置了唯一的partition,以及唯一的副本。当同一个消费组的多个consumer启动时,只会有一个consumer订阅到该Topic,进行消费,保证同一个消费组内的消费顺序。
注:消费组的groupId名称为“smart-building-consumer-group”,订阅的Topic名称为“gate_contact_modify”。
二、问题描述
有一天我们突然收到一个问题反馈:producer侧的业务产生消息后,consumer侧并没有得到预期的结果。经过排查,排除了业务逻辑出现问题的可能性,我们判断最有可能是因为kafka消息没有被消费到。为了印证这个猜测,我们查看了consumer消费日志,发现日志中存在这样几处问题:
(1)日志偶尔会打印出一条Kafka的警告日志,内容为:org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.maybeAutoCommitOffsetsSync:648 - Auto-commit of offsets {gate_contact_modify-0=OffsetAndMetadata{offset=2801, metadata=''}} failed for group smart-building-consumer-group: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
(2)接着进行了一次rebalance;
(3)consumer侧输出了Topic消费者的业务日志,表明正常获取到了Topic消息。
接着我们查看kafka 消费组中该Topic对应的Offset的变化情况,发现Offset一直没有变化。
三、排查思路
日志中的异常信息很明确的告知我们,topic消息消费完成后,由于group发生了一次rebalance,导致Commit没有被提交,这表明两次poll消息的间隔时间超过了max.poll.interval.ms定义的最大间隔,这也意味着一次poll后处理消息的过程超时了,正是由于poll间隔时间超时,导致了一次rebalance。同时建议我们要么增加间隔时间,要么减少每次拉取的最大消息数。
另外,由于Commit没有被提交,导致OffSet值没有变化,那么每次拉取到的消息都是同一批重复消息。具体的异常流程如下图:
根据上述信息,我们进一步检查了consumer的max.poll.records配置、max.poll.interval.ms配置,并统计了每条Topic消息的处理耗时,发现max.poll.records使用了默认配置值500,max.poll.interval.ms使用了默认配置值为300s,而每条Topic消息的处理耗时为10S。这进一步证实了我们的推论:
由于每次拉取的消息数太多,而每条消息处理时间又较长,导致每次消息处理时间超过了拉取时间间隔,从而使得group进行了一次rebalance,导致commit失败,并最终导致下次拉取重复的消息、继续处理超时,进入一个死循环状态。
知道问题根源后,我们结合业务特点,更改了max.poll.records=1,每次仅拉取一条消息进行处理,最终解决了这个问题。
四、经验教训
这次故障排查,使我们对Kafka消息poll机制、rebalance和commit之间的相互影响等有了更深的理解。
(1)kafka每次poll可以指定批量消息数,以提高消费效率,但批量的大小要结合poll间隔超时时间和每条消息的处理时间进行权衡;
(2)一旦两次poll的间隔时间超过阈值,group会认为当前consumer可能存在故障点,会触发一次rebalance,重新分配Topic的partition;
(3)如果在commit之前进行了一次rebalance,那么本次commit将会失败,下次poll会拉取到旧的数据(重复消费),因此要保证好消息处理的幂等性;
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
python脚本调用iftop 统计业务应用流量
因公司服务器上部署应用较多,在有大并发访问、业务逻辑有问题的情况下反复互相调用或者有异常流量访问的时候,需要对业务应用进行故障定位,所以利用python调用iftop命令来获取应用进程流量,结合zabbix,可帮助定位分析问题。,以下是脚本内容,大概思路是: 利用iftop命令 iftop -t -P -N -n -s 2 来获取流量信息 对获取的流量信息进行处理,单位换算,同一个应用程序的所有链接流量进行合计(因为一个应用会有很多链接,每一个链接都有流量,全部相加即可得出这个应用的总流量) #!/usr/bin/python #coding=utf-8 #针对业务监听的端口流量进行统计,忽略对随机端口流量统计 #若针对突然流量增大,找到其进程进行告警,可以不做统计,获取到流量进行判断,若大于多少阀值,则输出 import os def change_unit(unit): if "Mb" in unit: flow = float(unit.strip("Mb")) * 1024 return flow elif "Kb" in unit: flow = float(unit.str...
- 下一篇
通过实例理解Java网络IO模型
网络IO模型里有多个概念比较难理解,本文通过用Java实现一个简单的redis,从最简单的单线程单连接到NIO与netty,介绍不同的IO模型。 网络IO模型及分类 网络IO模型是一个经常被提到的问题,不同的书或者博客说法可能都不一样,所以没必要死抠字眼,关键在于理解。 Socket连接 不管是什么模型,所使用的socket连接都是一样的。以下是一个典型的应用服务器上的连接情况。客户的各种设备通过Http协议与Tomcat进程交互,Tomcat需要访问Redis服务器,它与Redis服务器也建了好几个连接。虽然客户端与Tomcat建的是短连接,很快就会断开,Tomcat与Redis是长连接,但是它们本质上都是一样的。建立一个Socket后,就是"本地IP+port与远端IP+port"的一个配对,这个Socket由应用进程调用操作系统的系统调用创建,在内核空间会有一个与之对应的结构体,而应用程序拿到的是一个文件描述符(File Describer),就跟打开一个普通的文件一样,可以读写。不同的进程有自己的文件描述符空间,比如进程1中有个socket的fd为100,进程2中也有一个soc...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Linux系统CentOS6、CentOS7手动修改IP地址
- SpringBoot2全家桶,快速入门学习开发网站教程
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS关闭SELinux安全模块
- CentOS8安装Docker,最新的服务器搭配容器使用
- 设置Eclipse缩进为4个空格,增强代码规范