消息队列 Kafka学习总结
分享的目的
- 更深入了解消息中间件Kafka的系统架构;
- 更好的使用消息中间件Kafka解决实际项目中的问题;
- 通过Kafka的设计架构原理,和使用场景,能够更快速掌握研究其它类似的消息中间件,如RocketMQ, Notify, ActiviteMQ, 能够在实际的业务中更好使用这些消息中间件
分享大纲
Kafka系统架构;
Kafka开发;
Kafka参数调优;
Kafka系统架构
Kafka介绍
Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:
- 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以海量的消息存储也能够保持长时间的稳定性能(高性能);
- 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息(高并发);
- 支持通过Kafka服务器和消费机集群来分区消息(高可靠);
- 支持Hadoop并行数据加载;
- 支持各种语言丰富的客户端(java, C++, python, erlang, .Net, go, Clojure, Scala);
Kafka架构
Kafka基本概念介绍
Broker:
Kafka集群包含一个或多个服务器,这种服务器被称为broker;
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,实际开发中通过称为队列, topic名称,即叫做Kafka队名称。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处);
Partition
Partition是物理上的概念,每个Topic包含一个或多个Partition;
Segment
partition物理上由多个segment组成;
Producer
负责发布消息到Kafka broker;
Consumer
消息消费者,向Kafka broker读取消息的客户端;
Consumer Group
每个Consumer属于一个特定的Consumer Group可为每个Consumer指定group name,若不指定group name则属于默认的group, 例如 在电商中发货中心、交易中心分别是两个Kafka consumer group。
Kafka系统核心架构
Kafka topic在服务端的结构
- 一般情况下,一个topic下包含一个或多个Partition, 2-3个复本,这个在创建topic的时候可以指定;
- 多个Partition可以提高读写的吞吐量, 多个副本提高Kafka的可靠性;
- 在数据不需要保序的情况下,创建多个Partition最好; 在局部保序的情况下,同一个Partition消费保序即可; 在合局有消费保序的情况下,一个topic对应一个Partition, 如数据库实时同步场景;
- 每一个Partition理论上是一个无长的队列,包含多个文件,文件以时间戳+最小的offset命名,不能修改,只能以只能是以append的方式写入,会通过NIO内存映射文件的方持久化到硬盘上, 文件的大小固定,大小达到阈值以后,关闭旧文件 ,重新新建一个内存映射文件 ;在 flush硬盘的时候是顺序IO,因此写入Partition的速度会非常非常快, 过期的文件 根据保存时间,后台异步线程定期清除,释放磁盘空间;
Partition offset
- 一个消费集群,即group 对应一个topic下Partition的一个消费offset;
- 两个不同的消费集群对应的同一个Partition 下的消费offset互不影响;
- 可以通过时间戳定位Partition的offset, 这个一般在特殊的情况下才会这样这样做,大部分情况下,zk上都会有记录;
- 同一个Partition 只能严格顺序消费;
Kafka事物
数据传输的事务定义通常有以下三种级别
1.
最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输;
2.
最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输;
3.
精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的,但是很难做到;
Kafka的事物解决方案:
1.
当发布消息时,Kafka有一个committed的概念,一旦消息被提交了,只要消息被写入的分区的所在的副本broker是活动的,数据就不会丢失,但是如果producer发布消息时发生了网络错误, Kafka现在也没一个完美的解决方案;
2.
如果consumer崩溃了,会有另外一个consumer接着消费消息,它需要从一个合适的 offset 继续处理。这种情况下可以有以下选择:
2. 1
consumer可以先读取消息,然后将offset写入日志文件中,然后再处理消息。这存在一种可能就是在存储offset后还没处理消息就crash了,新的consumer继续从这个offset处理那么就会有些消息永远不会被处理,这就是上面说的“最多一次”。
2.2
consumer可以先读取消息,处理消息,最后记录offset,当然如果在记录offset之前就crash了,新的consumer会重复的消费一些消息,这就是上面说的“最少一次”;
2.3
“精确一次”可以通过将提交分为两个阶段来解决:保存了offset后提交一次,消息处理成功之后再提交一次。但是还有个更简单的做法:将消息的offset和消息被处理后的结果保存在一起;
Kafka一次消息的生命周期
Kafka zk结点介绍
broker注册结点
broker/ids/[0…N]
topic结点
broker/topics/[topic]
Broker/topics/[topic]/3->2
broker id 为3的broker为某个topic提供了2个分区进行消息的存储
消费分区与消费者的关系
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
消息消费进度Offset记录
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
消费者注册
/consumers/[group_id]/ids/[consumer_id]
消费者监听:
/consumers/[group_id]/ids
/brokers/ids/[0…N]
Kafka日志文件
Kafka所有日志文件均存储在server.properties文件配置参数log.dirs下,假设 log.dirs配置为/export/kafka/log/, 同一个topic下有多个不同partition,每个partition为一个目录,目录的命名规范为: topic名称+有序序号,第一个partition序号从0开始,序号最大值为partitions数量减1,例如: 一个名为trade为的topic, 有4个partition, 则在/export/kafka/log/目录下有下面的信息:
trade-0 trade-1 trade-2 trade-3
partition中文件存储方式:每个partion目录相当于一个巨型文件被平均分配到多个大小相等segment数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。这样做的好处就是能快速删除无用文件,有效提高磁盘利用率, 下面的partition中文件存储方式:
partition中segment文件存储结构
- segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件.
- segment文件命名规则:partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充,示例如下。
partition中通过offset查找message
第一步查找segment file;
第二步通过segment file查找message;
示例: 查找为offset=368776的message, 如下图所示;
-
定位segment file: 其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.
第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.第三个文件 00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起 始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。当 offset=368776时定位到00000000000000368769.index|log
-
在segment file中查找:当offset=368776时,依次定位到00000000000000368769.index的元数据物理
位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序 查找直到offset=368776为止;
Kafka 负载均衡
生产者的负载均衡
四层的负载均衡,不常用, 使用zookeeper进行负载均衡, 常 用, 其中partitioner.class 的配置决定了Kafka生产者的负载均衡, 有三种情况:
- kafka.producer.DefaultPartitioner (Hash模式)
- kafka.producer.ByteArrayPartitioner (Hash模式)
- 不指定随机轮询模式
消费者负载均衡
partition和consumer 对应好, 平均
1: 5个partition, 5个consumer, (1, 1, 1, 1, 1)
2: 8个 partition, 5个consumer, (2, 2, 2, 1, 1)
3: 5个 partition, 8个consumer, (1, 1, 1, 1, 1, 0, 0, 0)
Kafka线上部署
1.一般情况下,根据并发量,一个Kafka集群有3-5台broker机器,阿里是一条业务线7-8台物理机,内存是192G,为了减少消息的堆积,一个topic下128+个Partition, 默认是消费集群机器数量的2-3倍, 例如线上消费集群是64台,partition建议设置为128 – 192台之间, Note however that there cannot be more consumer instances in a consumer group than partitions.
2.一主多备部署:一个备的broker在和主的broker在同一个机房,另一个备的broker部署在同城的另一个机房, 进一步增加高可靠性.
- 为防止消息堆积,建议同一个topic的消费集群的消费能力不能小于生产的集群.
- 为了提高网络的利用率,建议一次性发送的消息尽可能的大,避免小包网络传输.
Kafka 保证
- 消息通过生产者被发送出去,将会按消息发送时的顺序追加到到一个指定的topic partition,也是说,记录M1和记录M2被同一个生产者发送,并且M1要先于M2发送,那么在日志文件中M1将会有比M2更小的offset(生产保序)
- 一个消费者实例顺序消费存储在日志中的记录(消费保序)
- 对于一个有N个复本的topic,最多情况下N-1个broker server丢失数据,也可以保证在记录被提交的情况下,不会丢数据(高可用);
Kafka开发
Kafka应用开发
kafka java客户端maven依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.2.2</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.0</version> </dependency>
Kafka producer代码示例
public class KafkaProducerTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerTest.class); private static Properties properties = null; // kafka连续配置 项 static { properties = new Properties(); properties.put("bootstrap.servers", "centos.master:9092,centos.slave1:9092,centos.slave2:9092"); properties.put("producer.type", "sync"); properties.put("request.required.acks", "1"); properties.put("serializer.class", "kafka.serializer.DefaultEncoder"); properties.put("partitioner.class", "kafka.producer.DefaultPartitioner"); properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); } public void produce() { KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<byte[],byte[]>(properties); ProducerRecord<byte[],byte[]> kafkaRecord = new ProducerRecord<byte[],byte[]>( "test", "kkk".getBytes(), "vvv".getBytes()); kafkaProducer.send(kafkaRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(null != e) { LOG.info("the offset of the send record is {}", metadata.offset()); LOG.error(e.getMessage(), e); } LOG.info("complete!"); } }); kafkaProducer.close(); } public static void main(String[] args) { KafkaProducerTest kafkaProducerTest = new KafkaProducerTest(); for (int i = 0; i < 10; i++) { kafkaProducerTest.produce(); } } }
Kafka Consumer代码示例
public class ConsumerSample { public static void main(String[] args) { Properties props = new Properties(); props.put("zk.connect", "localhost:2181"); props.put("zk.connectiontimeout.ms", "1000000"); props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put("groupid", "test_group"); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); HashMap<String, Integer> map = new HashMap<String, Integer>(); map.put("test-topic", 4); Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector.createMessageStreams(map); List<KafkaStream<Message>> streams = topicMessageStreams.get("test-topic"); ExecutorService executor = Executors.newFixedThreadPool(4); for (final KafkaStream<Message> stream : streams) { executor.submit(new Runnable() { public void run() { for (MessageAndMetadata msgAndMetadata : stream) { System.out.println("topic: " + msgAndMetadata.topic()); Message message = (Message) msgAndMetadata.message(); ByteBuffer buffer = message.payload(); buffer.get(bytes); String tmp = new String(bytes); System.out.println("message content: " + tmp); } } }); } } }
Kafka开发注意事项
- 生产端注意设置好producer.type和partitioner.class 这两个参数,第一个参数 对写入吞吐量影响巨大,要结合实际的业务场景来设置,第二个参数关系到消费的结果是不是正常的,也要结合实际的业务场景来设置.
- 生产端和消费端的key.serializer 和 value.serializer分别设置一样,否则消费端可能会产生乱码.
- 根据实际的业务场景,设置好生产端和消费端的其它配置参数.
Kafka常见的使用模式
Consumer 消费消息模式
- 推模式
- 拉模式(常见)
Producer 发布消息模式
局部保序模式(hash映射)
全部保序模式 (只有一个Partition)
不保序模式(轮训模式)
Kafka的使用场景
Messaging, 大规模分布式网站
异步,解耦、削峰
Website Activity Tracking
通过agent定时收集每台主机的syslog信息
Metrics
Log Aggregation
Event Sourcing
Commit Log
异地机房的数据近实时同步(全局保序和局部保序)
Kafka参数调优
Broker 参数调优
- log.dirs Kafka数据存放的目录。可以指定多个目录,中间用逗号分隔,当新partition被创建的时会被存放到 当前存放partition最少的目录.
- num.io.threads 服务器用来执行读写请求的IO线程数,此参数的数量至少要等于服务器上磁盘的数量.
- queued.max.requests I/O线程可以处理请求的队列大小,若实际请求数超过此大小,网络线程将停止接收新的请求,建议500-1000.
- num.partitions 默认partition数量,如果topic在创建时没有指定partition数量,默认使用此值,建议修改为consumer 数量的1-3倍.
- log.segment.bytes Segment文件的大小,超过此值将会自动新建一个segment,此值可以被topic级别的参数覆盖, 建议1G ~ 5G.
- default.replication.factor 默认副本数量,建议改为2.
- num.replica.fetchers Leader处理replica fetch消息的线程数量, 建议设置大点2-4.
- offsets.topic.num.partitions offset提交主题分区的数量,建议设置为100 ~ 200.
Producer 参数调优
- request.required.acks :用来控制一个produce请求怎样才能算完成, 主要是来表示写入数据的持久化的,有三个值(0, 1, -1), 持久化的程度依次增高.
- producer.type : 同步异步模式。async表示异步,sync表示同步。如果设置成异步模式,可以允许生产者以batch的形式push数据,这样会极大的提高broker性能,推荐设置为异步.
- partitioner.class : Partition类,默认对key进行hash, 即 kafka.producer.DefaultPartitioner.
- compression.codec :指定producer消息的压缩格式,可选参数为: “none”, “gzip” and “snappy”.
- queue.buffering.max.ms :启用异步模式时,producer缓存消息的时间。比如我们设置成1000时,它会缓存1秒的数据再一次发送出去,这样可以极大的增加broker吞吐量,但也会造成时效性的降低.
- queue.buffering.max.messages:采用异步模式时producer buffer 队列里最大缓存的消息数量,如果超过这个数值,producer就会阻塞或者丢掉消息.
- batch.num.messages:采用异步模式时,一个batch缓存的消息数量。达到这个数量值时producer才会发送消息.
Consumer参数调优
- fetch.message.max.bytes:查询topic-partition时允许的最大消息大小,consumer会为每个partition缓存此大小的消息到内存,因此,这个参数可以控制consumer的内存使用量。这个值应该至少比server允许的最大消息大小大,以免producer发送的消息大于consumer允许的消息.
- num.consumer.fetchers:拉数据的线程数量,为了保序,建议一个,用默认值.
- auto.commit.enable:如果此值设置为true,consumer会周期性的把当前消费的offset值保存到zookeeper,当consumer失败重启之后将会使用此值作为新开始消费的值.
- auto.commit.interval.ms: Consumer提交offset值到zookeeper的周期.
Kafka常见问题总结
客户端消费出现空指针异常
重新设置消费的partition offset
客户端无法消费
网络不通 jar包冲突(netty, slf4j)
同类产品
RocketMq
JMQ
其它消息产品
notify
ActiveMQ(Apache)
Hornet(Jboss)
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Android NDK:/../build/core/setup-app.mk:81: Android NDK: Application t...
异常 D:/Program/android/sdk/ndk-bundle/build//../build/core/setup-app.mk:81: Android NDK: Application targets deprecated ABI(s): mips64 armeabi mips D:/Program/android/sdk/ndk-bundle/build//../build/core/setup-app.mk:82: Android NDK: Support for these ABIs will be removed in a future NDK release. 原因 r16之后已经不支持mips, mips64, armeabi这三种架构 解决方法 编译时仅支持以下架构(去掉armeabi、mips64、mips) APP_ABI := arm64-v8a armeabi-v7a x86 x86_64
- 下一篇
国产Dhyana禅定x86处理器开始启动生产
Dhyana处理器与AMD EPYC处理器很相似,Linux内核开发者指出,二者只是厂商ID与产品序列号有所不同。事实上,Linux维护者甚至将EPYC支持代码转移到Dhyana处理器,发现可以成功运行,由此说明两款处理器差异很小。工控主板 从官方声明看,AMD并没有将最终芯片设计出售给中国伙伴,而是允许合作伙伴为中国服务器市场设计自己的芯片。几十年来,x86架构一直由英特尔、AMD和威盛(VIA)控制。 从Dhyana(禅定)以及AMD Zen(禅)的关系来看,前者就是AMD授权给海光公司的国产X86处理器,2016年AMD与中国天津海光投资集团达成了合作协议,AMD将最新的Zen处理器授权给了中国公司,获得了2.93亿美元的授权费。 根据双方的协议,国产X86处理器主要是针对服务器市场的。 不过此前也有消息称,在此次合作中,AMD与海光成立了两家合资公司,AMD控股51%,这样保证了AMD对国产X86处理器的控制权,而海光无法插手Zen内核,只能做些外围芯片工作。 据此前媒体报道,Linux内核补丁中出现了一款Hygon(海光公司)代号为Dhyana(禅定)的Family 18h处...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
-
Docker使用Oracle官方镜像安装(12C,18C,19C)
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8编译安装MySQL8.0.19
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
推荐阅读
最新文章
- SpringBoot2全家桶,快速入门学习开发网站教程
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS6,CentOS7官方镜像安装Oracle11G
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作