Kafka 架构及原理分析
设计架构
数据多写支持
Kafka 的进阶功能
原理分析
生产者原理
服务端响应
分区存储
Leader 选举
消费者原理
性能分析
技术总结
REFERENCES
简介
Kafka适合什么样的场景?
它可以用于两大类别的应用:
构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。(相当于message queue)
构建实时流式应用程序,对这些流数据进行转换或者影响。(就是流处理,通过kafka stream topic和topic之间内部进行变化)
为了理解 Kafka 是如何做到以上所说的功能,从下面开始,我们将深入探索Kafka 的特性。
定位
手机用户请横屏获取最佳阅读体验,REFERENCES中是本文参考的链接,如需要链接和更多资源,可以关注公众号后回复『知识星球』加入并获取长期知识分享服务。
设计架构
使用场景
消息导入 MaxCompute、OOS、RDS、Hadoop、HBase 等离线数据仓库
架构
依赖 Zookeeper 实现配置和节点管理
如上图所示,一个 Kafka 集群架构中:
两个 Topic : Topic0 和 Topic1 。
Topic0 有 2 个分区:partition0 和 partition1 ,每个分区一共 3 个副本。
Topic1 只有 1 个分区:partition0 ,每个分区一共 3 个副本。
图中红色字体的副本代表是 Ieader ,黑色字体的副本代表是 follower 。
绿色的线代表是数据同步。蓝色的线是写消息,橙色的线是读消息,都是针对 leader 节点。
第 1 个消费者,消费 topic0 的 partition0 ,还消费 topic1 的 partition0 。
第 2 个消费者,消费 topic0 的 partition1。
第 3 个消费者,没有 partition 可以消费。
第一个消费者组,消费了 topic0 的两个分区。
第二个消费者组,既消费 topic0 ,又消费 topic1 :
Broker
Topic
使用 Topic的订阅和发送,来实现生产者和消费者的关联, 多对多
副本机制
副本同步主节点数据,但是不允许读 follow 节点,避免读写不一致的问题,降低延迟。
存储
数据分段(针对文件过大,超出 1G)
消费分组
消费编号
consumer_offset-[0~49] 保存消费者消费的偏移量
数据多写支持
业务场景:数据同步存储到 mysql、ES
基于 binlog 实现主从复制
canal
伪装为 slave 节点,进行数据同步,解析 binlog,可以对接 kafka,实现数据的多写。
MYSQL的数据修改,通过 kafka 完成数据变更的自动推送,实现多写操作。
Kafka 的进阶功能
消息幂等性
enable.idempotence=true
事务
操作 API
初始化 producer.initTransactions()
开启 producer.beginTransaction()
提交 producer.commitTransaction()
场景:
发送消息到多个 topic 或多个 partition
消费以后发出消息 consume-process-produce
实现原理:
Transaction Coordinator 协调者
事务日志:topic记录 __transaction_state
生产者事务 ID:transaction.id 发送前进行参数设置,可以使用 UUID
特性
原理分析
生产者原理
KafkaProducer
分区路由
累加器
服务端响应
奇数副本节点,确保投票、同步成功(半数工作正常的节点确定 )
哪些节点等待同步完成?
如何判断消息发送成功?
=> 见下一节 服务端 ACK
和 Leader 节点保持同步的最大时间间隔 replica.lag.time.max.ms
服务端 ISR: in-sync replica set
服务端 ACK
不等待 ACK
=> props.put("acks","0")
效率最高,可靠性低
默认:Leader 落盘成功则返回 ACK
=> props.put("acks","1")
可靠性较低,还是有丢失消息风险
Leader 和全部 Follpwers 落盘返回 ACK
=> props.put("acks","-1") 或 props.put("acks","all")
性能最差,可靠性高,但是还是有可能会带来问题,最后环节响应 ACK 失败,发送端如果设置了 retries 重发参数,会发生消息重复的问题。
分区存储
特点
副本的存储
如图:topic 对应 3 个分区,每个分区一共 3 个副本。
查看副本情况
ISR: 当前和 Leader 节点保持同步的节点集合
如图:topic 对应 4 个分区,每个分区 2 个副本
查看副本情况
副本分配规则
AdminUtils.scala -> assignReplicasToBrokers
第一个分区(编号为0)的第一个副本放置位置式 随机从 brokerList 选择的
其他分区的第一个副本放置位置相对于第 0 个分区依次往后移 nextReplicaShift
日志存储格式
segment 分段
分割方式
大小分割:log.segment.bytes 单个日志段的最大大小,默认 1073741824 -> 1G
log.roll.hours 新日志段轮转时间间隔(小时为单位),次要配置为 log.roll.ms
log.roll.ms 新日志段轮转时间间隔(毫秒为单位),如果未设置,则使用 log.roll.hours
索引写满:log.index.size.max.bytes offset 索引的最大字节数,默认10485760 -> 10M
索引
offset index 偏移量索引
如上图,通过kafka-dump-log.sh脚本查看索引文件。
索引特点:
设置的越大,代表扫描的速度越快,索引越稀疏,也更加耗内存(查找数据和维护索引的开销增大)
设置的越小,代表扫描的速度越慢,索引越密集,也更加省内存
时间复杂度:O(log2n)+O(m) n 表示文件个数,m 表示稀疏程度
建立条件: log.index.interval.bytes 添加 offset 索引字段大小间隔,默认 4096, 4KB
time index 时间戳索引
定义消息的时间戳类型: log.message.timestamp.type=CreateTime/logAppendTime
索引检索过程
根据 index 索引文件中的 offset 找到消息的 postion
根据 position 从 log 文件中比较,最终找到消息
为什么不用 B+tree 做索引结构?
消息清理策略
开关:log.cleaner.enable=true 默认为true。这意味着cleanup.policy = compact的主题默认被压缩,根据 log.cleaner.dedupe.buffer.size,128 MB的堆将被分配给清理进程。您可以根据您使用的压缩主题来查看 log.cleaner.dedupe.buffer.size和其他log.cleaner配置值。(0.9.0.1中的显著变化)
策略:log.cleanup.policy=delete/compact 超出保留窗口期的日志段的默认清理策略。用逗号隔开有效策略列表。有效策略:delete和compact
压紧,删除重复的 key:Log Cleaner默认启用。这会启动清理的线程池。
周期:log.retention.check.interval.ms=300 000 日志清理器检查是否有日志符合删除的频率(以毫秒为单位)
log.retention.hours 日志删除的时间阈值(小时为单位) 默认 168 小时,即 1 个星期
log.retention.minutes 日志删除的时间阈值(分钟为单位),如果未设置,将使用log.retention.hours的值。
log.retention.ms 日志删除的时间阈值(毫秒为单位),如果未设置,将使用log.retention.minutes的值。
log.retention.bytes 日志删除的大小阈值。
log.segment.bytes 单个日志段文件最大大小。
Leader 选举
参数配置: unclean.leader.election.enable 指定副本是否能够不在 ISR 中选举为 Leader,会导致数据丢失,默认为 false。
PacificA
优先算法:默认设置 ISR 的第一个副本为 Leader
主从同步
LEO:Log End Offset,下一条等待写入的消息的 offset(最新的 offset + 1)
HW:High Watermark,ISR 中最小的 LEO => 限制消费者最后可以消费的消息,小于 HW 的消息才可以被消费,确保一致性
主从同步
Follower 节点会向 Leader 发送一个 fetch 请求,leader 向 follower 发送数据后,需要更新 follower 的 LEO
Follower 接收到数据响应后,依次写入消息并更新 LEO
Leader 更新 HW (ISR 最小的 LEO)
故障处理
Follower 故障
之前记录的 HW,删除 高于 HW 标识的数据,恢复后重新从 HW 之前的 offset 开始同步数据
Leader 故障
之前记录的 HW,删除 高于 HW 标识的数据,重新从新 Leader 同步数据
消费者原理
根据 offset 和时间戳进行消费
offset 的存储
__consumer_offsets => topic 的存储结构
GroupMetadata:保存了消费组中各个消费者的信息(每个消费者有编号)
OffsetAndMetadata:保存了消费组和各个 partition 的 offset 位移信息元数据
group.id 取 hash 后 ,对50取模,获取消费组对应绑定分区的下标
消费策略:auto.offset.reset,默认值为 lastest
none:当前没有找到之前的 offset 时抛出异常
提交偏移量,commit 后更新消费组的 offset
API:consumer.commitSync();
自动:enable.auto.commit=true
手动:enable.auto.commit=false
消费者分配
消费者分配
RangeAssignor:默认分配原则,范围固定分配
RoundRobinAssignor:轮询方式分配
StickyAssignor :粘滞策略(相对均匀策略,每次基本都不一样)
分区重分配
rebalance 针对分区少,消费者多的情况
性能分析
磁盘 I/O 本身速度很慢,Kafka 如何优化实现低延迟、高吞吐的目标?
磁盘寻址
Kafka 日志文件顺序存放 -> 磁盘顺序读写。
Sequential disk (磁盘顺序读写) 比 Random SSD (固态的随机读写)要更快。
零拷贝
内核空间、用户空间
DMA (Direct Memory Access) 直接内存访问
2次 CPU 数据拷贝
零拷贝:Linux sendfile 方法,省去 CPU 拷贝的过程,提升至少一倍的性能。
零拷贝实现代码
传输层封装代码:PlaintextTransportLayer
技术总结
主从选举:会选择一个 broker 作为 “controller”节点。controller 节点负责检测 brokers 级别故障,并负责在 broker 故障的情况下更改这个故障 Broker 中的 partition 的 leadership 。这种方式可以批量的通知主从关系的变化,使得对于拥有大量partition 的broker ,选举过程的代价更低并且速度更快。如果 controller 节点挂了,其他 存活的 broker 都可能成为新的 controller 节点。
日志的分区partition (分布)在Kafka集群的服务器上。每个服务器在处理数据和请求时,共享这些分区。每一个分区都会在已配置的服务器上进行备份,确保容错性。
每个分区都有一台 server 作为 “leader”,零台或者多台server作为 follwers 。leader server 处理一切对 partition (分区)的读写请求,而follwers只需被动的同步leader上的数据。当leader宕机了,followers 中的一台服务器会自动成为新的 leader。每台 server 都会成为某些分区的 leader 和某些分区的 follower,因此集群的负载是平衡的。
消费者使用一个 消费组 名称来进行标识,发布到topic中的每条记录被分配给订阅消费组中的一个消费者实例.消费者实例可以分布在多个进程中或者多个机器上。
如果所有的消费者实例在同一消费组中,消息记录会负载平衡到每一个消费者实例。
如果所有的消费者实例在不同的消费组中,每条消息记录会广播到所有的消费者进程。
对象的内存开销非常高,通常是所存储的数据的两倍(甚至更多)。
随着堆中数据的增加,Java 的垃圾回收变得越来越复杂和缓慢。
Kafka 对消息的存储和缓存严重依赖于文件系统:
实际上顺序磁盘访问在某些情况下比随机内存访问还要快
当旧的数据保留时间超过指定时间、日志大达到规定大小后就丢弃
至少保证日志包含每一个key的最终值而不只是最近变更的完整快照。这意味着下游的消费者可以获得最终的状态而无需拿到所有的变化的消息信息。
网络层相当于一个 NIO 服务, sendfile(零拷贝) 的实现是通过 MessageSet 接口的 writeTo 方法完成的.这样的机制允许 file-backed 集使用更高效的 transferTo 实现,而不在使用进程内的写缓存。线程模型是一个单独的接受线程和 N 个处理线程,每个线程处理固定数量的连接。这种设计方式经过大量的测试,发现它是实现简单而且快速的。协议保持简单以允许未来实现其他语言的客户端.
REFERENCES
整理不易,如果觉得有帮助的话,欢迎点击收藏 和在看 ,顺便帮小编点点广告 哦,感谢^_^!