教你如何搞定springboot集成kafka
本文分享自华为云社区《手拉手入门springboot+kafka》,作者:QGS。
安装kafka
启动Kafka本地环境需Java 8+以上
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。
Kafka启动方式有Zookeeper和Kraft,两种方式只能选择其中一种启动,不能同时使用。
Kafka下载https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
解压tar -xzf kafka_2.13-3.7.0.tgz
一、Zookeeper启动Kafka(kafka内置zookeeper)
Kafka依赖Zookeeper
1、启动Zookeeper 2、启动Kafka
使用kafka自带Zookeeper启动
./zookeeper-server-start.sh ../config/zookeeper.properties &
./zookeeper-server-stop.sh ../config/zookeeper.properties
./kafka-server-start.sh ../config/server.properties &
./kafka-server-stop.sh ../config/server.properties
二、Zookeeper服务器启动Kafka
Zookeeper服务器安装
https://zookeeper.apache.org/
https://dlcdn.apache.org/zookeeper/zookeeper-3.9.2/apache-zookeeper-3.9.2-bin.tar.gz
tar zxvf apache-zookeeper-3.9.2-bin.tar.gz
配置Zookeeper服务器
cp zoo_sample.cfg zoo.cfg
启动Zookeeper服务器
./zkServer.sh start
修改Zookeeper端口
Zoo.cfg添加内容
admin.serverPort=8099
apache-zookeeper-3.9.2-bin/bin目录下重启Zookeeper
Zookeeper服务器启动kafka
/opt/kafka_2.13-3.7.0/bin目录下
./kafka-server-start.sh ../config/server.properties &
Kafka配置文件server.properties
三、使用KRaft启动Kafka
UUID通用唯一识别码(Universally Unique Identifier)
1、生成Cluster UUID(集群UUID):./kafka-storage.sh random-uuid
2.格式化kafka日志目录:./kafka-storage.sh format -t 3pMJGNJcT0uLIBsZhbucjQ -c ../config/kraft/server.properties
3.启动kafka:./kafka-server-start.sh ../config/kraft/server.properties &
springboot集成kafka
创建topic时,若不指定topic的分区(partition)数量使,则默认为1个分区(partition)
修改server.properties文件
vim server.properties
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.68.133:9092
springboot加入依赖kafka
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
加入spring-kafka依赖后,springboot自动装配好kafkaTemplate的Bean
application.yml配置连接kafka
spring: kafka: bootstrap-servers: 192.168.68.133:9092
生产者
发送消息
@Resource private KafkaTemplate<String,String> kafkaTemplate; @Test void kafkaSendTest(){ kafkaTemplate.send("kafkamsg01","hello kafka"); }
消费者
接收消息
@Component public class KafkaConsumer { @KafkaListener(topics = {"kafkamsg01","test"},groupId = "123") public void consume(String message){ System.out.println("接收到消息:"+message); } }
若没有配置groupid
Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.
@Component public class KafkaConsumer { @KafkaListener(topics = {"kafkamsg01","test"},groupId = "123") public void consume(String message){ System.out.println("接收到消息:"+message); } }

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
地理数据可视化的神奇组合:Python和Geopandas
本文分享自华为云社区《Python与Geopandas:地理数据可视化与分析指南》,作者:柠檬味拥抱。 地理数据可视化在许多领域都是至关重要的,无论是研究地理空间分布、城市规划、环境保护还是商业决策。Python语言以其强大的数据处理和可视化库而闻名,而Geopandas作为其地理信息系统(GIS)领域的扩展,为处理地理空间数据提供了方便的工具。本文将介绍如何使用Python和Geopandas进行地理数据可视化,并提供实用的代码示例。 1. 准备工作 在开始之前,确保已经安装了Python和Geopandas库。可以使用pip来安装Geopandas: pip install geopandas 2. 加载地理数据 首先,我们需要加载地理数据。Geopandas支持多种地理数据格式,包括Shapefile、GeoJSON、Geopackage等。在本示例中,我们将使用一个Shapefile格式的地图数据。 import geopandas as gpd # 读取Shapefile格式的地图数据 world = gpd.read_file(gpd.datasets.get_pa...
- 下一篇
原理剖析| 一文搞懂 Kafka Producer(上)
01 前言 今天给大家带来的是 Kafka Producer 的全方位解析(基于 Apache Kafka 3.7[2])。考虑到篇幅限制,本文分为上下两篇,上篇将介绍 Kafka Producer 的使用方法与实现原理,下篇将介绍 Kafka Producer 的实现细节与常见问题。 02 使用方法 在介绍 Kafka Producer 的具体实现前,首先看一下如何使用。用 Kafka Producer 向指定 topic 发送一条消息的示例代码如下: // 配置并创建一个 Producer Properties kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers", "localhost:9092"); kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaProps.put("value.serializer", "org.apache.kafka.common.s...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Red5直播服务器,属于Java语言的直播服务器
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库