Kafka 消息队列 Java版
消费者
apache kafka工具类,消费者Consumer类
public class Consumer { private ConsumerHandler handler; private ConsumerConfig config; private KafkaConsumer<String, String> consumer; private boolean startFlag = false; /** * 创建消费者 * * @param handler * 消费者处理类 * @param config * 消费者处理配置 */ public Consumer(ConsumerHandler handler, ConsumerConfig config) { this.handler = handler; this.config = config; init(); } /** * 初始化接收器 */ private void init() { Properties props = new Properties(); props.put("bootstrap.servers", config.getBootstrapServers());// 服务器ip:端口号,集群用逗号分隔 props.put("group.id", config.getGroupID()); /* 是否自动确认offset */ props.put("enable.auto.commit", "true"); /* 自动确认offset的时间间隔 */ props.put("auto.commit.interval.ms", config.getAutoCommitInterVal()); props.put("session.timeout.ms", config.getSessionTimeOut()); /* key的序列化类 */ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); /* value的序列化类 */ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); if (config.isProcessBeforeData()) { /* 消费者订阅的topic, 可同时订阅多个 */ consumer.subscribe(config.getTopicList(), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { for (TopicPartition partition : partitions) { long offset = handler.getSeek(partition.topic(), partition.partition()); if (offset >= 0) { if (consumer != null) { consumer.seek(partition, offset + 1); } } else { consumer.seekToBeginning(partitions); } } } }); start(); } else { consumer.subscribe(config.getTopicList()); } } public void start() { startFlag = true; while (startFlag) { /* 读取数据,读取超时时间为XXms */ ConsumerRecords<String, String> records = consumer.poll(config.getPollTime()); if (records.count() > 0) { long offset = 0; int partition = 0; for (ConsumerRecord<String, String> record : records) { if (record != null) { offset = record.offset(); partition = record.partition(); try { handler.processObject(record.topic(), record.partition(), record.offset(), record.value()); } catch (Exception e) { e.printStackTrace(); } } } } try { Thread.currentThread(); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } } consumer.close(); } public void stop() { startFlag = false; } }
消费者配置ConsumerConfig类
public class ConsumerConfig { private String bootstrapServers; private String groupID; private int autoCommitInterVal =1000; private int sessionTimeOut = 30000; private List<String> topicList; private boolean processBeforeData; private long pollTime = 100; public ConsumerConfig() { super(); } /** * 创建消费者配置 * @param bootstrapServers 服务器配合 格式为服务器ip:端口号,集群用逗号分隔 例如 192.168.1.1:9092,192.168.1.2:9092 * @param groupID groupID * @param autoCommitInterVal 自动提交时间单位毫秒, 默认1000 * @param sessionTimeOut 超时时间单位毫秒 , 默认30000 * @param topicList topicList列表 * @param processBeforeData 是否处理启动之前的数据,该开关需要配置consumerHandler的跨步存储使用 * @param pollTime 每次获取数据等待时间单位毫秒,默认100毫秒 */ public ConsumerConfig(String bootstrapServers, String groupID, int autoCommitInterVal, int sessionTimeOut ,List<String> topicList,boolean processBeforeData,long pollTime) { this.bootstrapServers = bootstrapServers; this.groupID = groupID; this.autoCommitInterVal = autoCommitInterVal; this.sessionTimeOut = sessionTimeOut; this.topicList = topicList; this.processBeforeData = processBeforeData; this.pollTime = pollTime; } public String getBootstrapServers() { return bootstrapServers; } public void setBootstrapServers(String bootstrapServers) { this.bootstrapServers = bootstrapServers; } public String getGroupID() { return groupID; } public void setGroupID(String groupID) { this.groupID = groupID; } public int getAutoCommitInterVal() { return autoCommitInterVal; } public void setAutoCommitInterVal(int autoCommitInterVal) { this.autoCommitInterVal = autoCommitInterVal; } public int getSessionTimeOut() { return sessionTimeOut; } public void setSessionTimeOut(int sessionTimeOut) { this.sessionTimeOut = sessionTimeOut; } public List<String> getTopicList() { return topicList; } public void setTopicList(List<String> topicList) { this.topicList = topicList; }
public boolean isProcessBeforeData() { return processBeforeData; } public void setProcessBeforeData(boolean processBeforeData) { this.processBeforeData = processBeforeData; } public long getPollTime() { return pollTime; } public void setPollTime(long pollTime) { this.pollTime = pollTime; } }
消费者处理ConsumerHandler类
public interface ConsumerHandler { /** * 处理收到的消息 * @param topic 收到消息的topic名称 * @param partition 收到消息的partition内容 * @param offset 收到消息在队列中的编号 * @param value 收到的消息 */ void processObject(String topic,int partition,long offset,String value); /** * 获取跨步 * @param topic 接受消息的topic * @param partition 接受消息的partition * @return 当前topic,partition下的seek */ long getSeek(String topic , int partition); }
生产者
kafka生产者,工具Producer类
public class Producer { private ProducerConfig config ; private org.apache.kafka.clients.producer.Producer<String,String> producer; public Producer(ProducerConfig config){ this.config = config; init(); } private void init(){ Properties props = new Properties(); props.put("bootstrap.servers",config.getBootstrapServers()); props.put("acks", "all"); props.put("retries", config.getRetries()); props.put("batch.size", config.getBatchSize()); props.put("linger.ms", config.getLingerMs()); props.put("buffer.memory", config.getBufferMemory()); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(props); }
/** * 发送消息 * @param topic 要发送的topic * @param msg */ public void sendMessage(String topic,String msg){ try { producer.send(new ProducerRecord<String, String>(config.getTopic(), String.valueOf(new Date().getTime()), msg)).get(); } catch (InterruptedException e) { // TODO 自动生成的 catch 块 e.printStackTrace(); } catch (ExecutionException e) { // TODO 自动生成的 catch 块 e.printStackTrace(); } producer.flush(); } public void close(){ producer.close(); } }
kafka生产者配置ProducerConfig类
public class ProducerConfig { private String bootstrapServers; private String topic; private int retries = 0; private int batchSize = 16384; private int lingerMs=1; private int bufferMemory=33554432; public ProducerConfig() { super(); } /** * 创建生产者配置文件 * @param bootstrapServers 服务器配合 格式为服务器ip:端口号,集群用逗号分隔 例如 192.168.1.1:9092,192.168.1.2:9092 * @param retries * @param batchSize * @param lingerMs * @param bufferMemory */ public ProducerConfig(String bootstrapServers,int retries, int batchSize, int lingerMs, int bufferMemory) { this.bootstrapServers = bootstrapServers; this.retries = retries; this.batchSize = batchSize; this.lingerMs = lingerMs; this.bufferMemory = bufferMemory; } public String getBootstrapServers() { return bootstrapServers; } public void setBootstrapServers(String bootstrapServers) { this.bootstrapServers = bootstrapServers; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public int getRetries() { return retries; } public void setRetries(int retries) { this.retries = retries; } public int getBatchSize() { return batchSize; } public void setBatchSize(int batchSize) { this.batchSize = batchSize; } public int getLingerMs() { return lingerMs; } public void setLingerMs(int lingerMs) { this.lingerMs = lingerMs; } public int getBufferMemory() { return bufferMemory; } public void setBufferMemory(int bufferMemory) { this.bufferMemory = bufferMemory; } }
测试
消费者处理实现ConsumerHandlerImpl类
public class ConsumerHandlerImpl implements ConsumerHandler{ /** * 处理收到的消息 * @param topic 收到消息的topic名称 * @param partition 收到消息的partition内容 * @param offset 收到消息在队列中的编号 * @param value 收到的消息 */ public void processObject(String topic,int partition,long offset,String value) { System.out.println(topic+"从kafka接收"+partition+"到"+offset+"的消息是:"+value); } /** * 获取跨步 * @param topic 接受消息的topic * @param partition 接受消息的partition * @return 当前topic,partition下的seek */ public long getSeek(String topic , int partition) { return 1; } }
main方法类
public class AppResourceTest{ public static void main(String[] args){ BeanDefinitionRegistry reg=new DefaultListableBeanFactory(); PropertiesBeanDefinitionReader reader=new PropertiesBeanDefinitionReader(reg); reader.loadBeanDefinitions(new ClassPathResource("resources/kafka-consumer.properties")); reader.loadBeanDefinitions(new ClassPathResource("resources/kafka-producer.properties")); BeanFactory factory=(BeanFactory)reg; ConsumerConfig consumerConfig=(ConsumerConfig)factory.getBean("consumerConfig"); System.out.println(consumerConfig.getPollTime()); ProducerConfig producerConfig=(ProducerConfig)factory.getBean("producerConfig"); System.out.println(producerConfig.getBatchSize()); Producer producer = new Producer(producerConfig); producer.sendMessage(producerConfig.getTopic(),"s4335453453454"); producer.close(); System.out.println("consumer"); Consumer consumer = new Consumer(new ConsumerHandlerImpl(),consumerConfig); try{ Thread.currentThread(); Thread.sleep(10000); }catch(Exception e){ e.printStackTrace(); } } }
运行结果
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Java堆内存又溢出了!教你一招必杀技
JAVA堆内存管理是影响性能主要因素之一。堆内存溢出是JAVA项目非常常见的故障,在解决该问题之前,必须先了解下JAVA堆内存是怎么工作的。 先看下JAVA堆内存是如何划分的,如图: JVM内存划分为堆内存和非堆内存,堆内存分为年轻代(Young Generation)、老年代(Old Generation),非堆内存就一个永久代(Permanent Generation)。 年轻代又分为Eden和Survivor区。Survivor区由FromSpace和ToSpace组成。Eden区占大容量,Survivor两个区占小容量,默认比例是8:1:1。 堆内存用途:存放的是对象,垃圾收集器就是收集这些对象,然后根据GC算法回收。 非堆内存用途:永久代,也称为方法区,存储程序运行时长期存活的对象,比如类的元数据、方法、常量、属性等。 在JDK1.8版本废弃了永久代,替代的是元空间(MetaSpace),元空间与永久代上类似,都是方法区的实现,他们最大区别是:元空间并不在JVM中,而是使用本地内存。元空间有注意有两个参数: MetaspaceSize :初始化元空间大小,控制发生GC阈值 M...
- 下一篇
一对一直播交友源码实现即时通讯非常“有一套”
在这个物欲横流的时代,心浮气躁、急功近利更是成为社会的普遍共性。大多数人都承受着巨大的压力,在这个时代小心翼翼的行走,而一对一直播交友源码的出现,带领他们找到了压力宣泄的出口,即陌生人与陌生人之间的社交。回归正题,在直播间里我们通常都会看到网友们在线上互相交流和发礼物,在这里,主要是用到了即时通讯技术。本文主要想给大家分享一下关于搭建即时通讯服务器的相关内容。 1.即时通讯是什么?即时通讯简称IM,是一个终端服务,允许两人或多人使用网络即时的传递文字讯息、档案、语音与视频交流。2.即时通讯技术的原理(socket)是什么?Socket即用于描述IP地址和端口号,是一种网络的通信机制。网络通信底层都是通过socket建立连接的,因为它包含IP和端口,只要有这两个就能准确找到一台主机上的某个应用。3.IM通信原理是什么?举个简单的例子,客户端A要想和客户端B产生通信,但是无法直接进行,这个时候就需要通过IM服务器,从而使两者之间产生通信。客户端A通过socket与IM服务器产生链接,客户端B也通过socket与IM服务器产生链接,客户端A把信息发送给IM应用服务器并且指定发送给客户端B,服...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2整合Redis,开启缓存,提高访问速度
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- 设置Eclipse缩进为4个空格,增强代码规范
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS8编译安装MySQL8.0.19
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7