Spring for Apache Kafka实战
背景介绍
Kafka是一个分布式的、可分区的、可复制的消息系统,在现在的互联网公司,应用广泛,在我们公司在主要运用在定时推送业务,批量数据处理,日志上传等方面,我发现网上大部分博客,在使用上还只是对Apache 官方提供的client,进行运用开发,在这里推荐使用 Spring for Apache Kafka(简称spring-kafka) ,更新维护稳定,方法众多,并且强大,现已加入Spring豪华大礼包。
大坑预警
spring-kafka实际上也是对apache的kafka-client进行了包装和开发,所以使用的时候一定注意,你引入的spring-kafka里封装的kafka-client的版本要和服务器上的kafka服务器版本要对应,不然就会产生问题,比如消费失败。官网上在首页就贴出了SpringKafka和kafka-client版本(它的版本号要和kafka服务器的版本保持一致)的对应关系Spring for Apache Kafka
生产者的使用
spring-kafka对于生产者提供了两个模板类,分别是
- KafkaTemplate :包装了一个生产者,并提供方便的方法将数据发送到kafka的topics。
- ReplyingKafkaTemplate :KafkaTemplate的子类,增加了一个请求/应答功能,在发送数据后会返回一个future ,里面封装了消费者的应答信息。
这里主要介绍KafkaTemplate。对于使用,官方提供了这样一个案例
To use the template, configure a producer factory and provide it in the template’s constructor:
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
官方的文档只是给出了,如何配置和通过工厂构造出一个模板。而我们需要的是一个完整的能在项目中使用的案例。在这里我给出一个与Spring boot集成,完整的生产者案例。
首先构造一个模板类
@Configuration
public class TestTemplateConfig{
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// kafka.metadata.broker.list=10.16.0.214:9092,10.16.0.215:9092,10.16.0.216:9092
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.ACKS_CONFIG,"-1");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,5048576);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
/** 获取工厂 */
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
/** 注册实例 */
@Bean(name="testTemplate")
public KafkaTemplate<String, String> testTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
运用@Configuration注解定义配置类,在内部通过spring-kafka提供的KafkaProducerFactory,来构造出KafkaTemplate,并在方法上通过@Bean注解将其注册,这样在初始化Spring容器时,就会将KafkaTemplate注入到容器中进行实例化。
发送消息
@Resource
private KafkaTemplate testTemplate;
//同步发送
public void syncSend(){
testTemplate.send("topic",result.toString()).get(10, TimeUnit.SECONDS);
}
//异步发送
public void asyncSend() {
ListenableFuture<SendResult<Integer, String>> future = testTemplate.send("topic",result.toString());
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
System.out.println("success");
}
@Override
public void onFailure(Throwable ex) {
System.out.println("failure");
}
});
}
消费者
对于消费者的使用,spring-kafka提供了,两种方式
Messages can be received by configuring a MessageListenerContainer and providing a Message Listener, or by using the @KafkaListener annotation.。 —— Spring for Apache Kafka
也就是通过配置MessageListenerContainer,可以提供消息侦听器或使用@KafkaListener注释来接收消息。
而这个官方的消息侦听器,其实就是接口,官方提供了8个接口来满足不同的应用场景,这里我们使用最常用的MessageListener接口
public class KafkaConsumerSerivceImpl implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> data) {
//根据不同的主题进行消费
if("主题1".equals(data.topic())){
}else if("主题2".equals(data.topic())){
}
}
}
另一种通过@KafkaListener注解极为方便
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"testTopic"})
public void receive(String message){
System.out.println("消费消息:" + message);
}
}
无论是哪种方法,都需要创建一个MessageListenerContainer,对于这个容器的构造官方给的案例比较详细。
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
构造过程
Notice that to set container properties, you must use the getContainerProperties() method on the factory. It is used as a template for the actual properties injected into the container. —— Spring for Apache Kafka
说明,要设置监听容器的属性,比如轮询时间,只能是通过getContainerProperties().set()方法,来设置。
关键配置
生产者配置
bootstrap.servers:brokers集群地址
acks:
- -1 :producer在所有follower副本确认接收到数据后才算一次发送完成
- 0 :producer不等待来自broker同步完成的确认继续发送下一条
- 1 :producer在leader已成功收到的数据并得到确认后发送下一条
retries:发送失败重试次数
batch.size:当多个记录被发送到同一个分区时,生产者会尝试将记录合并到更少的请求中
linger.ms:批处理延迟时间上限
buffer.memory:批处理缓冲区
max.request.size:发送数据最大大小
消费者配置
concurrency:消费监听器容器并发数
session.timeout.ms:检测消费者故障的超时
auto.offset.reset:
- earliest :当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
- latest :当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
- none :topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
enable.auto.commit:
- true : 自动提交位移
- false :关闭自动提交位移, 在消息被完整处理之后再手动提交位移
auto.commit.interval.ms:若enable.auto.commit=true,这里设置自动提交周期
备注:
对数据可靠性较高的场景建议 offset 手动提交并将acks 设置为 "all" 即所有副本都同步到数据时send方法才返回, 以此来完全判断数据是否发送成功, 理论上来讲数据不会丢失。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
Python实现MySQL连接池
python编程中可以使用MySQLdb进行数据库的连接及诸如查询/插入/更新等操作,但是每次连接mysql数据库请求时,都是独立的去请求访问,相当浪费资源,而且访问数量达到一定数量时,对mysql的性能会产生较大的影响。因此,实际使用中,通常会使用数据库的连接池技术,来访问数据库达到资源复用的目的。 数据库连接池 python数据库连接池包 DBUtils: DBUtils是一套Python数据库连接池包,并允许对非线程安全的数据库接口进行线程安全包装。DBUtils来自Webware for Python。 DBUtils提供两种外部接口: PersistentDB :提供线程专用的数据库连接,并自动管理连接。 PooledDB :提供线程间可共享的数据库连接,并自动管理连接。下载地址:DBUtils 下载解压后,使用python s
-
下一篇
Numpy入门
标题中的英文首字母大写比较规范,但在python实际使用中均为小写。 2018年7月23日笔记 0. 学习内容: Python科学计算库:Numpy需要掌握的知识: 1.Numpy简介;2.Numpy程序包;3.简单的Numpy程序;4.为什么使用Numpy; 5.Numpy是什么;6.Numpy数据溢出; 1. Numpy简介 Numpy是python语言中的科学计算库。 下文主要介绍数据科学工具包Numpy的基本用法,内容包括: 1.Numpy的ndarray多维数组创建 2.Numpy的ndarray多维数组索引切片访问 3.Numpy的ndarray多维数组的组合分割 2. Numpy程序包 集成开发环境为Jupyter notebook 语言及其版本为python3.6 安装numpy在cmd中运行命令:pip install numpy,如果电脑安装了最新版的anaconda,则自带jupyter notebook和numpy库。 集成开发环境如下图所示: image_1cj3s95tl1rsg1hlm1fatkhl12bj9.png-24kB 3.简单的Numpy程序 两...
相关文章
文章评论
共有0条评论来说两句吧...