玩转kafka中的消费者
上一篇介绍了如何使用Kafka的生产者,这一篇将介绍在实际生产中如何合理地使用Kafka的消费者API。
Kafka中消费者API分为新版和旧版,本章只介绍新版,旧版的就不做介绍了。
首发于我的个人博客:http://www.janti.cn/article/kafkaconsumer
准备工作
kafka版本:2.11-1.1.1
操作系统:centos7
java:jdk1.8
有了以上这些条件就OK了,具体怎么安装和启动Kafka这里就不强调了,可以看上一篇文章。
新建一个maven工程,需要的依赖如下:
<dependency> <groupId>org.apache.kafkagroupId> <artifactId>kafka_2.11artifactId> <version>1.1.1version> dependency> <dependency> <groupId>org.apache.kafkagroupId> <artifactId>kafka-clientsartifactId> <version>1.1.1version> dependency>
简单的消费者例子
Kafka中是封装了KafkaConsumer类,消息接受都是通过该类来进行的。与生产者一样,在实例化消费者之前都是要进行配置的。
首先介绍这里面的配置:- bootstrap.servers:配置连接代理列表,不必包含Kafka集群的所有代理地址,当连接上一个代理后,会从集群元数据信息中获取其他存活的代理信息。但为了保证能够成功连上Kafka集群,在多代理集群的情况下,建议至少配置两个代理。(由于电脑配置有限,本文实验的是单机情况)
- key.deserializer : 用于反序列化消息Key的类
- value.deserializer :用于反序列化消息值(Value)的类
- group.id:指定消费者所在的组
- client.id:指定客户端所在组的ID
- enable.auto.commit:设置是否自动提交。在没有指定消费偏移量提交方式时,默认是每隔1S发送一次
- auto.commit.interval.ms:自动提交偏移值的时间间
package kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; /** * 简单的消费者 * * @author tangj */ public class KafkaSimpleDemo { static Properties properties = new Properties(); private static String topic = "MyOrder"; //poll超时时间 private static long pollTimeout = 2000; // 1.消费者配置 static { // bootstarp server 地址 properties.put("bootstrap.servers", "10.0.90.53:9092"); // group.id指定消费者,所在的组 properties.put("group.id", "order"); // 组中client 的ID名称 properties.put("client.id", "consumer"); // 在没有指定消费偏移量提交方式时,默认是每个1s提交一次偏移量,可以通过auto.commit.interval.ms参数指定提交间隔 // 自动提交要设置成true // 手动提交设置成false properties.put("enable.auto.commit", true); // 自动提交偏移值的 时间间隔 properties.put("auto.commit.interval.ms", 2000); // key序列化 properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value序列化 properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); } public static void main(String args[]) { consumeAutoCommit(); } /** * 消费消息自动提交偏移值 */ public static void consumeAutoCommit() { //2\. 实例化消费者 KafkaConsumer kafkaConsumer = new KafkaConsumer(properties); // 3.订阅主题 kafkaConsumer.subscribe(Arrays.asList(topic)); try { // 消费者是一个长期的过程,所以使用永久循环, while (true) { // 4.拉取消息 ConsumerRecords records = kafkaConsumer.poll(pollTimeout); for (ConsumerRecord record : records) { System.out.println("消息总数为: " + records.count()); System.out.println("收到消息: " + String.format("partition = %d, offset= %d, key=%s, value=%s%n", record.partition(), record.offset(), record.key(), record.value())); } } } catch (Exception e) { e.printStackTrace(); } finally { kafkaConsumer.close(); } } }看了简单的消费者模式,来看看消费者中的其他知识:消费模型,分区均衡,偏移量的提交,多线程消费者。
消费模型
可以看到在配置的时候,配置了消费者组这个变量。这里主要讲讲消费模型。消费模型主要分为两种:消费者组模型,发布订阅模型 消费者组中有多个消费者,组内的消费者共享一个group.id,并且有一个单独的client.id。消费者组用来消费一个主题下的所有分区,但是每个分区只能由该组内的一个消费者消费,不会被重复消费。 发布订阅模型就是所有的消费者都可以通过订阅来获取Kafka中的消息。分区再平衡
介绍一种情况,消费者并不是越多越好的,当消费者大于分区数时,就会有部分消费者一直空闲着。 分区再平衡,即:parition rebanlance。总的来说分区再平衡就是一个消费者原来消费的分区变成由其他消费者消费,它只发生在消费者组中。 再均衡的作用就是为了保证消费者组的高可用和伸缩性,但是再均衡期间消费者会无法读取消息,有短暂的暂停时间。偏移量的处理
偏移量的左右就是记录已经消费的消息。之前的一个简单的demo演示了自动提交的方式,接下来介绍手动提交。 在实际生产中,消费者拉取到消息之后会进行一些业务处理,比如存到数据库,写入缓存,网络请求等,这些都会有失败的可能,所以要对偏移值进行更精细的控制。 手动提交有两种方式: 第一种是同步提交,同步提交是阻塞的,对于提交失败的处理,它会一直提交,直到提交成功。 第二种是异步提交,异步提交是非阻塞的,对于提交失败,也不会重新提交。 当然,对于手动提交的业务设计,还是要结合具体业务进行考虑和设计。手动提交之后,需要设置 enable.auto.commit为false.并且不需要设置 auto.commit.interval.ms。下面给出一个自动提交的例子,设置没消费5次,提交一次偏移值:
public static void consumeHandleCommit() {
KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
try {
int maxcount = 5;
int count = 0;
kafkaConsumer.subscribe(Arrays.asList(topic));
for (; ; ) {
// 拉取消息
ConsumerRecords records = kafkaConsumer.poll(polltimeout);
for (ConsumerRecord record : records) {
System.out.println("收到消息: " + String.format("partition = %d, offset= %d, key=%s, value=%s%n",
record.partition(), record.offset(), record.key(), record.value()));
count++;
}
// 业务逻辑完成后,提交偏移量
if (count >= maxcount) {
kafkaConsumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map offsets, Exception exception) {
if (null != exception) {
exception.printStackTrace();
} else {
System.out.println("偏移值提交成功");
}
}
});
count = 0;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaConsumer.close();
}
多线程消费者
单线程的消费者效率肯定是低于多线程的消费者的,但是消费者的多线程设计与生产者不同,KafkaConsumer是非线程安全的。
保证线程安全,每个线程,各自实例化一个KafkaConsumer对象,并且多个消费者线程只消费同一个主题,不考虑多个消费者线程消费同一个分区。
线程实体类:
package kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
public class KafkaConsumerThread implements Runnable {
//每个线程私有一个consumer实例
private KafkaConsumer<String,String> consumer;
public KafkaConsumerThread(Map<String,Object> configMap,String topic) {
Properties properties = new Properties();
properties.putAll(configMap);
this.consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
try {
for (; ; ) {
// 拉取消息
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("收到消息: " + String.format("partition = %d, offset= %d, key=%s, value=%s%n",
record.partition(), record.offset(), record.key(), record.value()));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
线程启动类:
package kafka.consumer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class KafkaConsumerExcutor {
/**
* kafkaConsumer是非线程安全的,处理好多线程同步的方案是
* 每个线程实例化一个kafkaConsumer对象
*/
public static void main(String args[]) {
String topic = "hello";
Map<String, Object> configMap = new HashMap<>();
configMap.put("bootstrap.servers", "10.0.90.53:9092");
//group.id指定消费者,所在的组,保证所有线程都在一个消费者组
configMap.put("group.id", "test");
configMap.put("enable.auto.commit", true);
configMap.put("auto.commit.interval.ms", 1000);
// key序列化
configMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value序列化
configMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
ExecutorService service = Executors.newFixedThreadPool(6);
// 该主题总共有6个分区,那么保证6个线程
for (int i = 0; i < 6; i++) {
service.submit(new KafkaConsumerThread(configMap, topic));
}
}
}
总结
本文介绍了kafka中的消费者,包括多线程消费者,偏移量的处理,以及分区再平衡。
切记一点,实际的生产中消费者需要根据实际业务来进行设计。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
PHP学习9——MySQL数据库
主要内容: MySQL的启动 MySQL数据库操作 数据库表设计 创建和查看表 修改表结构 MySQL语句操作 数据库备份与恢复 PHP操作MySQL数据库 面向对象的数据库操作 MySQL数据库是目前最流行的数据库之一,因为他是自由的开源的软件,可以免费的使用,功能强大,跨平台,安全性高,成本低(完全免费),存储量大。 MySQL的启动 点击MySQL的start即可,绿色为正常启动 按住win+r,打开命令窗口,输入cmd,使用dos连接和退出 MySQL数据库操作 创建数据库 CREATE DATABASE db_name 注意: 数据库名db_name在windows不区分大小写,而linux是区分的,为了程序的移植,建议使用小写字母名称。 不能与其他数据库同名,否则发生错误。 名称可以包含字母,数字,下划线,美元符号($),但是不能以数字开头,也不能使用MySQL关键字。 名称最长64个字符。 每条命令以分号;结束,按enter提交,也可以将一条命令分为多行写,分号结束。 查看数据库 SHOW DATABASES 选择数据库 USE db_name 删除数据库 DROP...
- 下一篇
C#与C++的发展历程第一 - 由C#3.0起
原文: C#与C++的发展历程第一 - 由C#3.0起 俗话说学以致用,本系列的出发点就在于总结C#和C++的一些新特性,并给出实例说明这些新特性的使用场景。前几篇文章将以C#的新特性为纲领,并同时介绍C++中相似的功能的新特性,最后一篇文章将总结之前几篇没有介绍到的C++11的新特性。 C++从11开始被称为现代C++(Modern C++)语言,开始越来越不像C语言了。就像C#从3.0开始就不再像Java了。这是一种超越,带来了开发效率的提高。 一种语言的特性一定是与这种语言的类型和运行环境是分不开的,所以文章中说C#的新特性其中也包括新的.NET Framework和CLR(DLR)对C#的支持。 系列文章目录 1.C#与C++的发展历程第一 - 由C#3.0起 2.C#与C++的发展历程第二 - C#4.0再接再厉 3. C#与C++的发展历程第三 - C#5.0异步编程的巅峰 由于C#2.0除了泛型,迭代器yield,foreach等与Java等有所不同,其它没有特别之处,所以本系列将直接从C#3.0开始。 C#3.0 (.NET Framework 3.5, CLR ...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
-
Docker使用Oracle官方镜像安装(12C,18C,19C)
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8编译安装MySQL8.0.19
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
推荐阅读
最新文章
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS6,CentOS7官方镜像安装Oracle11G
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Hadoop3单机部署,实现最简伪集群
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果