Kafka 消息队列 Java版
消费者
apache kafka工具类,消费者Consumer类
public class Consumer {
private ConsumerHandler handler;
private ConsumerConfig config;
private KafkaConsumer<String, String> consumer;
private boolean startFlag = false;
/**
* 创建消费者
*
* @param handler
* 消费者处理类
* @param config
* 消费者处理配置
*/
public Consumer(ConsumerHandler handler, ConsumerConfig config) {
this.handler = handler;
this.config = config;
init();
}
/**
* 初始化接收器
*/
private void init() {
Properties props = new Properties();
props.put("bootstrap.servers", config.getBootstrapServers());// 服务器ip:端口号,集群用逗号分隔
props.put("group.id", config.getGroupID());
/* 是否自动确认offset */
props.put("enable.auto.commit", "true");
/* 自动确认offset的时间间隔 */
props.put("auto.commit.interval.ms", config.getAutoCommitInterVal());
props.put("session.timeout.ms", config.getSessionTimeOut());
/* key的序列化类 */
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/* value的序列化类 */
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
if (config.isProcessBeforeData()) {
/* 消费者订阅的topic, 可同时订阅多个 */
consumer.subscribe(config.getTopicList(), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
long offset = handler.getSeek(partition.topic(), partition.partition());
if (offset >= 0) {
if (consumer != null) {
consumer.seek(partition, offset + 1);
}
} else {
consumer.seekToBeginning(partitions);
}
}
}
});
start();
} else {
consumer.subscribe(config.getTopicList());
}
}
public void start() {
startFlag = true;
while (startFlag) {
/* 读取数据,读取超时时间为XXms */
ConsumerRecords<String, String> records = consumer.poll(config.getPollTime());
if (records.count() > 0) {
long offset = 0;
int partition = 0;
for (ConsumerRecord<String, String> record : records) {
if (record != null) {
offset = record.offset();
partition = record.partition();
try {
handler.processObject(record.topic(), record.partition(), record.offset(), record.value());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
try {
Thread.currentThread();
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
consumer.close();
}
public void stop() {
startFlag = false;
}
}
消费者配置ConsumerConfig类
public class ConsumerConfig {
private String bootstrapServers;
private String groupID;
private int autoCommitInterVal =1000;
private int sessionTimeOut = 30000;
private List<String> topicList;
private boolean processBeforeData;
private long pollTime = 100;
public ConsumerConfig() {
super();
}
/**
* 创建消费者配置
* @param bootstrapServers 服务器配合 格式为服务器ip:端口号,集群用逗号分隔 例如 192.168.1.1:9092,192.168.1.2:9092
* @param groupID groupID
* @param autoCommitInterVal 自动提交时间单位毫秒, 默认1000
* @param sessionTimeOut 超时时间单位毫秒 , 默认30000
* @param topicList topicList列表
* @param processBeforeData 是否处理启动之前的数据,该开关需要配置consumerHandler的跨步存储使用
* @param pollTime 每次获取数据等待时间单位毫秒,默认100毫秒
*/
public ConsumerConfig(String bootstrapServers, String groupID, int autoCommitInterVal, int sessionTimeOut
,List<String> topicList,boolean processBeforeData,long pollTime) {
this.bootstrapServers = bootstrapServers;
this.groupID = groupID;
this.autoCommitInterVal = autoCommitInterVal;
this.sessionTimeOut = sessionTimeOut;
this.topicList = topicList;
this.processBeforeData = processBeforeData;
this.pollTime = pollTime;
}
public String getBootstrapServers() {
return bootstrapServers;
}
public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
public String getGroupID() {
return groupID;
}
public void setGroupID(String groupID) {
this.groupID = groupID;
}
public int getAutoCommitInterVal() {
return autoCommitInterVal;
}
public void setAutoCommitInterVal(int autoCommitInterVal) {
this.autoCommitInterVal = autoCommitInterVal;
}
public int getSessionTimeOut() {
return sessionTimeOut;
}
public void setSessionTimeOut(int sessionTimeOut) {
this.sessionTimeOut = sessionTimeOut;
}
public List<String> getTopicList() {
return topicList;
}
public void setTopicList(List<String> topicList) {
this.topicList = topicList;
}
public boolean isProcessBeforeData() {
return processBeforeData;
}
public void setProcessBeforeData(boolean processBeforeData) {
this.processBeforeData = processBeforeData;
}
public long getPollTime() {
return pollTime;
}
public void setPollTime(long pollTime) {
this.pollTime = pollTime;
}
}
消费者处理ConsumerHandler类
public interface ConsumerHandler {
/**
* 处理收到的消息
* @param topic 收到消息的topic名称
* @param partition 收到消息的partition内容
* @param offset 收到消息在队列中的编号
* @param value 收到的消息
*/
void processObject(String topic,int partition,long offset,String value);
/**
* 获取跨步
* @param topic 接受消息的topic
* @param partition 接受消息的partition
* @return 当前topic,partition下的seek
*/
long getSeek(String topic , int partition);
}
生产者
kafka生产者,工具Producer类
public class Producer {
private ProducerConfig config ;
private org.apache.kafka.clients.producer.Producer<String,String> producer;
public Producer(ProducerConfig config){
this.config = config;
init();
}
private void init(){
Properties props = new Properties();
props.put("bootstrap.servers",config.getBootstrapServers());
props.put("acks", "all");
props.put("retries", config.getRetries());
props.put("batch.size", config.getBatchSize());
props.put("linger.ms", config.getLingerMs());
props.put("buffer.memory", config.getBufferMemory());
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
}
/**
* 发送消息
* @param topic 要发送的topic
* @param msg
*/
public void sendMessage(String topic,String msg){
try {
producer.send(new ProducerRecord<String, String>(config.getTopic(), String.valueOf(new Date().getTime()), msg)).get();
} catch (InterruptedException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
} catch (ExecutionException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
producer.flush();
}
public void close(){
producer.close();
}
}
kafka生产者配置ProducerConfig类
public class ProducerConfig {
private String bootstrapServers;
private String topic;
private int retries = 0;
private int batchSize = 16384;
private int lingerMs=1;
private int bufferMemory=33554432;
public ProducerConfig() {
super();
}
/**
* 创建生产者配置文件
* @param bootstrapServers 服务器配合 格式为服务器ip:端口号,集群用逗号分隔 例如 192.168.1.1:9092,192.168.1.2:9092
* @param retries
* @param batchSize
* @param lingerMs
* @param bufferMemory
*/
public ProducerConfig(String bootstrapServers,int retries, int batchSize, int lingerMs, int bufferMemory) {
this.bootstrapServers = bootstrapServers;
this.retries = retries;
this.batchSize = batchSize;
this.lingerMs = lingerMs;
this.bufferMemory = bufferMemory;
}
public String getBootstrapServers() {
return bootstrapServers;
}
public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public int getRetries() {
return retries;
}
public void setRetries(int retries) {
this.retries = retries;
}
public int getBatchSize() {
return batchSize;
}
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
public int getLingerMs() {
return lingerMs;
}
public void setLingerMs(int lingerMs) {
this.lingerMs = lingerMs;
}
public int getBufferMemory() {
return bufferMemory;
}
public void setBufferMemory(int bufferMemory) {
this.bufferMemory = bufferMemory;
}
}
测试
消费者处理实现ConsumerHandlerImpl类
public class ConsumerHandlerImpl implements ConsumerHandler{
/**
* 处理收到的消息
* @param topic 收到消息的topic名称
* @param partition 收到消息的partition内容
* @param offset 收到消息在队列中的编号
* @param value 收到的消息
*/
public void processObject(String topic,int partition,long offset,String value) {
System.out.println(topic+"从kafka接收"+partition+"到"+offset+"的消息是:"+value);
}
/**
* 获取跨步
* @param topic 接受消息的topic
* @param partition 接受消息的partition
* @return 当前topic,partition下的seek
*/
public long getSeek(String topic , int partition) {
return 1;
}
}
main方法类
public class AppResourceTest{
public static void main(String[] args){
BeanDefinitionRegistry reg=new DefaultListableBeanFactory();
PropertiesBeanDefinitionReader reader=new PropertiesBeanDefinitionReader(reg);
reader.loadBeanDefinitions(new ClassPathResource("resources/kafka-consumer.properties"));
reader.loadBeanDefinitions(new ClassPathResource("resources/kafka-producer.properties"));
BeanFactory factory=(BeanFactory)reg;
ConsumerConfig consumerConfig=(ConsumerConfig)factory.getBean("consumerConfig");
System.out.println(consumerConfig.getPollTime());
ProducerConfig producerConfig=(ProducerConfig)factory.getBean("producerConfig");
System.out.println(producerConfig.getBatchSize());
Producer producer = new Producer(producerConfig);
producer.sendMessage(producerConfig.getTopic(),"s4335453453454");
producer.close();
System.out.println("consumer");
Consumer consumer = new Consumer(new ConsumerHandlerImpl(),consumerConfig);
try{
Thread.currentThread();
Thread.sleep(10000);
}catch(Exception e){
e.printStackTrace();
}
}
}
运行结果

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
Java堆内存又溢出了!教你一招必杀技
JAVA堆内存管理是影响性能主要因素之一。堆内存溢出是JAVA项目非常常见的故障,在解决该问题之前,必须先了解下JAVA堆内存是怎么工作的。 先看下JAVA堆内存是如何划分的,如图: JVM内存划分为堆内存和非堆内存,堆内存分为年轻代(Young Generation)、老年代(Old Generation),非堆内存就一个永久代(Permanent Generation)。 年轻代又分为Eden和Survivor区。Survivor区由FromSpace和ToSpace组成。Eden区占大容量,Survivor两个区占小容量,默认比例是8:1:1。 堆内存用途:存放的是对象,垃圾收集器就是收集这些对象,然后根据GC算法回收。 非堆内存用途:永久代,也称为方法区,存储程序运行时长期存活的对象,比如类的元数据、方法、常量、属性等。 在JDK1.8版本废弃了永久代,替代的是元空间(MetaSpace),元空间与永久代上类似,都是方法区的实现,他们最大区别是:元空间并不在JVM中,而是使用本地内存。元空间有注意有两个参数: MetaspaceSize :初始化元空间大小,控制发生GC阈值 M...
-
下一篇
一对一直播交友源码实现即时通讯非常“有一套”
在这个物欲横流的时代,心浮气躁、急功近利更是成为社会的普遍共性。大多数人都承受着巨大的压力,在这个时代小心翼翼的行走,而一对一直播交友源码的出现,带领他们找到了压力宣泄的出口,即陌生人与陌生人之间的社交。回归正题,在直播间里我们通常都会看到网友们在线上互相交流和发礼物,在这里,主要是用到了即时通讯技术。本文主要想给大家分享一下关于搭建即时通讯服务器的相关内容。 1.即时通讯是什么?即时通讯简称IM,是一个终端服务,允许两人或多人使用网络即时的传递文字讯息、档案、语音与视频交流。2.即时通讯技术的原理(socket)是什么?Socket即用于描述IP地址和端口号,是一种网络的通信机制。网络通信底层都是通过socket建立连接的,因为它包含IP和端口,只要有这两个就能准确找到一台主机上的某个应用。3.IM通信原理是什么?举个简单的例子,客户端A要想和客户端B产生通信,但是无法直接进行,这个时候就需要通过IM服务器,从而使两者之间产生通信。客户端A通过socket与IM服务器产生链接,客户端B也通过socket与IM服务器产生链接,客户端A把信息发送给IM应用服务器并且指定发送给客户端B,服...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS6,CentOS7官方镜像安装Oracle11G
- CentOS7,8上快速安装Gitea,搭建Git服务器
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- 面试大杂烩
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS7设置SWAP分区,小内存服务器的救世主
- Docker使用Oracle官方镜像安装(12C,18C,19C)