从零开始搭建Kafka+SpringBoot分布式消息系统
前言 由于kafka强依赖于zookeeper,所以需先搭建好zookeeper集群。由于zookeeper是由java编写的,需运行在jvm上,所以首先应具备java环境。 (ps:默认您的centos系统可联网,本教程就不教配置ip什么的了) (ps2:没有wget的先装一下:yum install wget) (ps3:人啊,就是要条理。东边放一点,西边放一点,过段时间就不知道自己装在哪里了。本教程所有下载均放在/usr/local目录下) (ps4:kafka可能有内置zookeeper,感觉可以越过zookeeper教程,但是这里也配置出来了。我没试过) 文章首发公众号:Java架构师联盟,每日更新技术好文 一、配置jdk 因为oracle 公司不允许直接通过wget 下载官网上的jdk包。所以你直接wget以下地址下载下来的是一个只有5k的网页文件而已,并不是需要的jdk包。(垄断地位就是任性)。 (请通过java -version判断是否自带jdk,我的没带) 1、官网下载 下面是jdk8的官方下载地址: https://www.oracle.com/technetwork/java/javase/downloads/java-archive-javase8u211-later-5573849.html 2、上传解压 这里通过xftp上传到服务器指定位置:/usr/local 对压缩文件进行解压: tar -zxvf jdk-8u221-linux-x64.tar.gz 对解压后的文件夹进行改名: mv jdk1.8.0_221 jdk1.8 3、配置环境变量 vim /etc/profile #java environment export JAVA_HOME=/usr/local/jdk1.8 export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar export PATH=$PATH:${JAVA_HOME}/bin 操作之后的界面如下: 运行命令使环境生效 source /etc/profile 二、搭建zookeeper集群 1、下载zookeeper 创建zookeeper目录,在该目录下进行下载: mkdir /usr/local/zookeeper 这一步如果出现连接被拒绝时可多试几次,我就是第二次请求才成功的。 wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz 等待下载完成之后解压: tar -zxvf zookeeper-3.4.6.tar.gz 重命名为zookeeper1 mv zookeeper-3.4.6 zookeeper1 cp -r zookeeper1 zookeeper2 cp -r zookeeper1 zookeeper3 2、创建data、logs文件夹 在zookeeper1目录下创建 在data目录下新建myid文件。内容为1 3、修改zoo.cfg文件 cd /usr/local/zookeeper/zookeeper1/conf/ cp zoo_sample.cfg zoo.cfg 进行过上面两步之后,有zoo.cfg文件了,现在修改内容为: dataDir=/usr/local/zookeeper/zookeeper1/data dataLogDir=/usr/local/zookeeper/zookeeper1/logs server.1=192.168.233.11:2888:3888 server.2=192.168.233.11:2889:3889 server.3=192.168.233.11:2890:3890 4、搭建zookeeper2 首先,复制改名。 cd /usr/local/zookeeper/ cp -r zookeeper1 zookeeper2 然后修改具体的某些配置: vim zookeeper2/conf/zoo.cfg 将下图三个地方1改成2 vim zookeeper2/data/myid 同时将myid中的值改成2 5、搭建zookeeper3 同上,复制改名 cp -r zookeeper1 zookeeper3 vim zookeeper3/conf/zoo.cfg 修改为3 vim zookeeper3/data/myid 修改为3 6、测试zookeeper集群 cd /usr/local/zookeeper/zookeeper1/bin/ 由于启动所需代码比较多,这里简单写了一个启动脚本: vim start start的内容如下 cd /usr/local/zookeeper/zookeeper1/bin/ ./zkServer.sh start ../conf/zoo.cfg cd /usr/local/zookeeper/zookeeper2/bin/ ./zkServer.sh start ../conf/zoo.cfg cd /usr/local/zookeeper/zookeeper3/bin/ ./zkServer.sh start ../conf/zoo.cfg 下面是连接脚本: vim login login内容如下: ./zkCli.sh -server 192.168.233.11:2181,192.168.233.11:2182,192.168.233.11:2183 脚本编写完成,接下来启动: sh start sh login 启动集群成功,如下图: 这里zookeeper就告一段落了,由于zookeeper占用着输入窗口,这里可以在xshell右键标签,新建ssh渠道。然后就可以在新窗口继续操作kafka了! 三、搭建kafka集群 1、下载kafka 首先创建kafka目录: mkdir /usr/local/kafka 然后在该目录下载 cd /usr/local/kafka/ wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz 下载成功之后解压: tar -zxvf kafka_2.11-1.1.0.tgz 2、修改集群配置 首先进入conf目录下: cd /usr/local/kafka/kafka_2.11-1.1.0/config 修改server.properties 修改内容: broker.id=0 log.dirs=/tmp/kafka-logs listeners=PLAINTEXT://192.168.233.11:9092 复制两份server.properties cp server.properties server2.properties cp server.properties server3.properties 修改server2.properties vim server2.properties 修改主要内容为: broker.id=1 log.dirs=/tmp/kafka-logs1 listeners=PLAINTEXT://192.168.233.11:9093 如上,修改server3.properties 修改内容为: broker.id=2 log.dirs=/tmp/kafka-logs2 listeners=PLAINTEXT://192.168.233.11:9094 3、启动kafka 这里还是在bin目录编写一个脚本: cd ../bin/ vim start 脚本内容为: ./kafka-server-start.sh ../config/server.properties & ./kafka-server-start.sh ../config/server2.properties & ./kafka-server-start.sh ../config/server3.properties & 通过jps命令可以查看到,共启动了3个kafka。 4、创建Topic cd /usr/local/kafka/kafka_2.11-1.1.0 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic kafka打印了几条日志 在启动的zookeeper中可以通过命令查询到这条topic! ls /brokers/topics 查看kafka状态 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic 可以看到此时有三个节点 1 , 2 , 0 Leader 是1 , 因为分区只有一个 所以在0上面, Replicas:主从备份是 1,2,0, ISR(in-sync):现在存活的信息也是 1,2,0 5、启动生产者 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic 由于不能按删除,不能按左右键去调整,所以语句有些乱啊。em… 6、启动消费者 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic 可以看出,启动消费者之后就会自动消费。 在生产者又造了一条。 消费者自动捕获成功! 四、集成springboot 先贴一张kafka兼容性目录: 不满足的话启动springboot的时候会抛异常的!!!ps:该走的岔路我都走了o(╥﹏╥)o (我的kafka-clients是1.1.0,spring-kafka是2.2.2,中间那列暂时不用管) 回归正题,搞了两个小时,终于搞好了,想哭… 遇到的问题基本就是jar版本不匹配。 上面的步骤我也都会相应的去修改,争取大家按照本教程一遍过!!! 1、pom文件 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.gzky</groupId> <artifactId>study</artifactId> <version>0.0.1-SNAPSHOT</version> <name>study</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-redis</artifactId> <version>1.3.8.RELEASE</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.0.RELEASE</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project> pom文件中,重点是下面这两个版本。 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.0.RELEASE</version> </dependency> 2、application.yml spring: redis: cluster: #设置key的生存时间,当key过期时,它会被自动删除; expire-seconds: 120 #设置命令的执行时间,如果超过这个时间,则报错; command-timeout: 5000 #设置redis集群的节点信息,其中namenode为域名解析,通过解析域名来获取相应的地址; nodes: 192.168.233.11:9001,192.168.233.11:9002,192.168.233.11:9003,192.168.233.11:9004,192.168.233.11:9005,192.168.233.11:9006 kafka: # 指定kafka 代理地址,可以多个 bootstrap-servers: 192.168.233.11:9092,192.168.233.11:9093,192.168.233.11:9094 producer: retries: 0 # 每次批量发送消息的数量 batch-size: 16384 buffer-memory: 33554432 # 指定消息key和消息体的编解码方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: # 指定默认消费者group id group-id: test-group auto-offset-reset: earliest enable-auto-commit: true auto-commit-interval: 100 # 指定消息key和消息体的编解码方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer server: port: 8085 servlet: #context-path: /redis context-path: /kafka 没有配置Redis的可以把Redis部分删掉,也就是下图: 想学习配置Redis集群的可以参考:《Redis集群redis-cluster的搭建及集成springboot》 3、生产者 package com.gzky.study.utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; /** * kafka生产者工具类 * * @author biws * @date 2019/12/17 **/ @Component public class KfkaProducer { private static Logger logger = LoggerFactory.getLogger(KfkaProducer.class); @Autowired private KafkaTemplate<String, String> kafkaTemplate; /** * 生产数据 * @param str 具体数据 */ public void send(String str) { logger.info("生产数据:" + str); kafkaTemplate.send("testTopic", str); } } 4、消费者 package com.gzky.study.utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * kafka消费者监听消息 * * @author biws * @date 2019/12/17 **/ @Component public class KafkaConsumerListener { private static Logger logger = LoggerFactory.getLogger(KafkaConsumerListener.class); @KafkaListener(topics = "testTopic") public void onMessage(String str){ //insert(str);//这里为插入数据库代码 logger.info("监听到:" + str); System.out.println("监听到:" + str); } } 5、对外接口 package com.gzky.study.controller; import com.gzky.study.utils.KfkaProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; /** * kafka对外接口 * * @author biws * @date 2019/12/17 **/ @RestController public class KafkaController { @Autowired KfkaProducer kfkaProducer; /** * 生产消息 * @param str * @return */ @RequestMapping(value = "/sendKafkaWithTestTopic",method = RequestMethod.GET) @ResponseBody public boolean sendTopic(@RequestParam String str){ kfkaProducer.send(str); return true; } } 6、postman测试 这里首先应该在服务器启动监听器(kafka根目录),下面命令必须是具体的服务器ip,不能是localhost,是我踩过的坑: 推荐此处重启一下集群 关闭kafka命令: cd /usr/local/kafka/kafka_2.11-1.1.0/bin ./kafka-server-stop.sh ../config/server.properties & ./kafka-server-stop.sh ../config/server2.properties & ./kafka-server-stop.sh ../config/server3.properties & 此处应该jps看一下,等待所有的kafka都关闭(关不掉的kill掉),再重新启动kafka: ./kafka-server-start.sh ../config/server.properties & ./kafka-server-start.sh ../config/server2.properties & ./kafka-server-start.sh ../config/server3.properties & 等待kafka启动成功后,启动消费者监听端口: cd /usr/local/kafka/kafka_2.11-1.1.0 bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.11:9092 --from-beginning --topic testTopic 曾经我乱输的测试信息全部被监听过来了! 启动springboot服务 然后用postman生产消息: 然后享受成果,服务器端监听成功。 项目中也监听成功!