Kafka Producer 异步发送消息居然也会阻塞?
Kafka 一直以来都以高吞吐量的特性而家喻户晓,就在上周,在一个性能监控项目中,需要使用到 Kafka 传输海量消息,在这过程中遇到了一个 Kafka Producer 异步发送消息会被阻塞的问题,导致生产端发送耗时很大。
是的,你没听错,Kafka Producer 异步发送消息也会发生阻塞现象,那究竟是怎么回事呢?
在新版的 Kafka Producer 中,设计了一个消息缓冲池,客户端发送的消息都会被存储到缓冲池中,同时 Producer 启动后还会开启一个 Sender 线程,不断地从缓冲池获取消息并将其发送到 Broker,如下图所示:
这么看来,Kafka 的所有发送,都可以看作是异步发送了,因此在新版的 Kafka Producer 中废弃掉异步发送的方法了,仅保留了一个 send 方法,同时返回一个 Futrue 对象,需要同步等待发送结果,就使用 Futrue#get 方法阻塞获取发送结果。而我在项目中直接调用 send 方法,为何还会发送阻塞呢?
我们在构建 Kafka Producer 时,会有一个自定义缓冲池大小的参数 buffer.memory
,默认大小为 32M,因此缓冲池的大小是有限制的,我们不妨想一下,缓冲池内存资源耗尽了会怎么样?
Kafka 源码的注释是非常详细的,RecordAccumulator 类是 Kafka Producer 缓冲池的核心类,而 RecordAccumulator 类就有那么一段注释:
The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless this behavior is explicitly disabled.
大概的意思是:
当缓冲池的内存块用完后,消息追加调用将会被阻塞,直到有空闲的内存块。
由于性能监控项目每分钟需要发送几百万条消息,只要 Kafka 集群负载很高或者网络稍有波动,Sender 线程从缓冲池捞取消息的速度赶不上客户端发送的速度,就会造成客户端发送被阻塞。
我写个例子让大家直观感受一下被阻塞的现象:
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.ACKS_CONFIG, "0");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 * 1024);
properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 5242880);
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(properties);
String str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
List<byte[]> bytesList = new ArrayList<>();
Random random = new Random();
for (int j = 0; j < 1024; j++) {
int i1 = random.nextInt(10);
if (i1 == 0) {
i1 = 1;
}
byte[] bytes = new byte[1024 * i1];
for (int i = 0; i < bytes.length - 1; i++) {
bytes[i] = (byte) str.charAt(random.nextInt(62));
}
bytesList.add(bytes);
}
while (true) {
long start = System.currentTimeMillis();
producer.send(new ProducerRecord<>("test_topic", bytesList.get(random.nextInt(1023))));
long end = System.currentTimeMillis() - start;
if (end > 100) {
System.out.println("发送耗时:" + end);
}
// Thread.sleep(10);
}
}
以上例子构建了一个 Kafka Producer 对象,同时使用死循环不断地发送消息,这时如果把 Thread.sleep(10);
注释掉,则会出现发送耗时很长的现象:
使用 JProfiler 可以查看到分配内存的地方出现了阻塞:
跟踪到源码:
发现在 org.apache.kafka.clients.producer.internals.BufferPool#allocate
方法中,如果判断缓冲池没有空闲的内存了,则会阻塞内存分配,直到有空闲内存为止。
如果不注释 Thread.sleep(10);
这段代码则不会发生阻塞现象,打断点到阻塞的地方,也不会被 Debug 到,从现象能够得知,Thread.sleep(10);
使得发送消息的频率变低了,此时 Sender 线程发送的速度超过了客户端的发送速度,缓冲池一直处于未满状态,因此不会产生阻塞现象。
除了以上缓冲池内存满了会发生阻塞之外,Kafka Produer 其它情况都不会发生阻塞了吗?非也,其实还有一个地方,也会发生阻塞!
Kafka Producer 通常在第一次发送消息之前,需要获取该主题的元数据 Metadata,Metadata 内容包括了主题相关分区 Leader 所在节点信息、副本所在节点信息、ISR 列表等,Kafka Producer 获取 Metadata 后,便会根据 Metadata 内容将消息发送到指定的分区 Leader 上,整个获取流程大致如下:
如上图所示,Kafka Producer 在发送消息之前,会检查主题的 Metadata 是否需要更新,如果需要更新,则会唤醒 Sender 线程并发送 Metatadata 更新请求,此时 Kafka Producer 主线程则会阻塞等待 Metadata 的更新。
如果 Metadata 一直无法更新,则会导致客户端一直阻塞在那里。
近期热文
本文分享自微信公众号 - 后端进阶(objcoding)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
同城双活与异地多活架构分析
本文首发于 vivo互联网技术 微信公众号 链接:https://mp.weixin.qq.com/s/OjfFcjnGWV5kutxXndtpMg 作者:vivo官网商城开发团队 采用高可用系统架构支持重要系统,为关键业务提供7x24的不间断服务,已经成为众多企业保障业务稳定、持续运转的主要选择。服务多活是高可用架构重要实施手段,本文介绍了一些业界常用的多活手段例如同城双活、两地三中心、异地多活架构设计方案并详述了各种方案的优缺点。 一、为什么要做多活 随着移动互联网的深入发展,用户增长达到一定规模后,不少企业都会面高并发业务和临海量数据的挑战,传统的单机房在机器容量上存在瓶颈。在一些极端场景下,有可能所有服务器都出现故障,例如机房断电、机房火灾、地震等这些不卡抗拒因素会导致系统所有服务器都故障从而导致业务整体瘫痪,而且即使有其他地区的备份,把备份业务系统全部恢复到能够正常提供业务,花费的时间也比较长。为了满足中心业务连续性,增强抗风险能力,多活作为一种可靠的高可用部署架构,成为各大互联网公司的首要选择。 1、多活场景 多活架构的关键点就是指不同地理位置上的系统都能够提供业务服务,这...
-
下一篇
高手问答第 255 期 —— 如何掌握 Tars 开发,玩转微服务
随着 5G 的到来,各个行业都要面对海量请求,微服务架构在这个时代对企业而言有着举足轻重的作用。微服务架构存在运维难、监控难、实操难等问题, 而 Tars 是实现微服务架构优秀的一个选择。Tars 是高性能、多语言的微服务治理框架 ,在超过 120 家公司的 261,200 台服务器上稳定运行。 OSCHINA本期高手问答(2020 年 9月 14日——2020 年 9月 20日) 请来了俞慧涛老师@kerriganA,如果你对 Tars感兴趣,不管是架构 ,还是适用场景,都可以在这期高手问答得到启发。 问答主题 本次问答主要包括以下方向: Tars 架构、包含协议实现、系统性能 Tars 与其他的开源框架如何结合 ,如 mybatis、 springcloud、springboot Tars 适用的场景 有其他相关的问题,也欢迎提问。 嘉宾简介 俞慧涛 ,阅文集团后台开发专家, 负责阅文集团用户中台服务与 TarsJava 的技术设计工作。在阅文主导了 Tars 微服务在海外项目 、红袖新兴项目的应用。在流量高、 变化频率快的业务场景下也能保证其架构的稳定性。腾讯开源 TARS 项目...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- MySQL数据库在高并发下的优化方案
- CentOS7,CentOS8安装Elasticsearch6.8.6
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Docker快速安装Oracle11G,搭建oracle11g学习环境