搞懂分布式技术23:SpringBoot Kafka 整合使用
Spring Boot系列文章(一):SpringBoot Kafka 整合使用
2018-01-05前提
假设你了解过 SpringBoot 和 Kafka。
1、SpringBoot
如果对 SpringBoot 不了解的话,建议去看看 DD 大佬 和 纯洁的微笑 的系列博客。
2、Kafka
Kafka 的话可以看看我前两天写的博客 : Kafka 安装及快速入门 学习的话自己开台虚拟机自己手动搭建环境吧,有条件的买服务器。
注意:一定要亲自自己安装实践,接下来我们将这两个进行整合。
创建项目
项目整体架构:
使用 IDEA 创建 SpringBoot 项目,这个很简单了,这里不做过多的讲解。
1、pom 文件代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | "1.0" encoding="UTF-8" xml version= <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.zhisheng</groupId> <artifactId>kafka-learning</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>kafka-learning</name> <description>Demo project for Spring Boot + kafka</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.1.RELEASE</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project> |
主要引入了 spring-kafka 、lombok 、 gson 依赖。
2、消息实体类 Message.java 如下:
1 2 3 4 5 6 7 8 9 | public class Message { private Long id; //id private String msg; //消息 private Date sendTime; //时间戳 } |
3、消息发送类 KafkaSender.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | 4jpublic class KafkaSender { private KafkaTemplate<String, String> kafkaTemplate; private Gson gson = new GsonBuilder().create(); //发送消息方法 public void send() { Message message = new Message(); message.setId(System.currentTimeMillis()); message.setMsg(UUID.randomUUID().toString()); message.setSendTime(new Date()); log.info("+++++++++++++++++++++ message = {}", gson.toJson(message)); kafkaTemplate.send("zhisheng", gson.toJson(message)); } } |
就这样,发送消息代码就实现了。
这里关键的代码为 kafkaTemplate.send()
方法,zhisheng
是 Kafka 里的 topic ,这个 topic 在 Java 程序中是不需要提前在 Kafka 中设置的,因为它会在发送的时候自动创建你设置的 topic, gson.toJson(message)
是消息内容,这里暂时先说这么多了,不详解了,后面有机会继续把里面源码解读写篇博客出来(因为中途碰到坑,老子跟了几遍源码)。
4、消息接收类 KafkaReceiver.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | 4jpublic class KafkaReceiver { "zhisheng"}) (topics = { public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); log.info("----------------- record =" + record); log.info("------------------ message =" + message); } } } |
客户端 consumer 接收消息特别简单,直接用 @KafkaListener
注解即可,并在监听中设置监听的 topic
,topics
是一个数组所以是可以绑定多个主题的,上面的代码中修改为 @KafkaListener(topics = {"zhisheng","tian"})
就可以同时监听两个 topic
的消息了。需要注意的是:这里的 topic 需要和消息发送类 KafkaSender.java 中设置的 topic 一致。
5、启动类 KafkaApplication.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | public class KafkaApplication { public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args); KafkaSender sender = context.getBean(KafkaSender.class); for (int i = 0; i < 3; i++) { //调用消息发送类中的消息发送方法 sender.send(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } } |
6、配置文件 application.properties
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | #============== kafka =================== # 指定kafka 代理地址,可以多个 spring.kafka.bootstrap-servers=192.168.153.135:9092 #=============== provider ======================= spring.kafka.producer.retries=0 # 每次批量发送消息的数量 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 # 指定消息key和消息体的编解码方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== consumer ======================= # 指定默认消费者group id spring.kafka.consumer.group-id=test-consumer-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 # 指定消息key和消息体的编解码方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer |
spring.kafka.bootstrap-servers 后面设置你安装的 Kafka 的机器 IP 地址和端口号 9092。
如果你只是简单整合下,其他的几个默认就好了。
Kafka 设置
在你安装的 Kafka 目录文件下:
启动 ZK
使用安装包中的脚本启动单节点 Zookeeper 实例:
1 | bin/zookeeper-server-start.sh -daemon config/zookeeper.properties |
启动 KAFKA 服务
使用 kafka-server-start.sh
启动 kafka 服务:
1 | bin/kafka-server-start.sh config/server.properties |
启动成功后!
千万注意: 记得将你的虚拟机或者服务器关闭防火墙或者开启 Kafka 的端口 9092。
运行
出现这就代表整合成功了!
我们看下 Kafka 中的 topic 列表就
1 | bin/kafka-topics.sh --list --zookeeper localhost:2181 |
就会发现刚才我们程序中的 zhisheng
已经自己创建了。
微信公众号【Java技术江湖】一位阿里 Java 工程师的技术小站。(关注公众号后回复”Java“即可领取 Java基础、进阶、项目和架构师等免费学习资料,更有数据库、分布式、微服务等热门技术学习视频,内容丰富,兼顾原理和实践,另外也将赠送作者原创的Java学习指南、Java程序员面试指南等干货资源)
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
基于 Python 自建分布式高并发 RPC 服务
RPC(Remote Procedure Call)服务,也即远程过程调用,在互联网企业技术架构中占据了举足轻重的地位,尤其在当下微服务化逐步成为大中型分布式系统架构的主流背景下,RPC 更扮演了重要角色。 Google 开源了 gRPC,Facebook 开源了 Thrift,Twitter 开源了 Finagle,百度开源了 bRPC,腾讯开源了 Tars,阿里开源了 Dubbo 和 HSF,新浪开源了 Motan 等,一线互联网大厂们纷纷亮出自己研制的 RPC 框架武器,在解决分布式高并发业务问题的同时,也向外界展示自己的技术实力。 互联网公司百花齐放,貌似高深复杂的 RPC 框架解决了哪些业务难题?其技术含量究竟高在哪里?后端开发者该如何深入掌握 RPC 开发?这些都是本小册想要解读的问题。 分布式高并发服务是只有高手才能涉足的领域 虽然大部分后端开发者在日常开发中都会隐式或显式的使用 RPC,但对初级开发者来说 RPC 总是略显神秘而有距离,而即便有多年 RPC 使用经验的程序员,也可能依然对 RPC 背后的原理不甚了解,难以精通,遇到复杂问题时难免误用。 RPC 本身理论繁...
- 下一篇
爬虫入门之Scrapy 框架基础功能(九)
Scrapy是用纯Python实现一个为了爬取网站数据、提取结构性数据而编写的应用框架,用途非常广泛。 框架的力量,用户只需要定制开发几个模块就可以轻松的实现一个爬虫,用来抓取网页内容以及各种图片,非常之方便。 Scrapy 使用了 Twisted(其主要对手是Tornado)多线程异步网络框架来处理网络通讯,可以加快我们的下载速度,不用自己去实现异步框架,并且包含了各种中间件接口,可以灵活的完成各种需求。 1 Scrapy架构图(绿线是数据流向) Scrapy Engine(引擎): 负责Spider、ItemPipeline、Downloader、Scheduler中间通讯,信号、数据传递等。 Scheduler(调度器): 它负责接受引擎发送过来的Request请求,并按照一定的方式进行整理排队,当引擎需要时,交还给引擎。 Downloader(下载器):负责下载Scrapy Engine(引擎)发送的所有Requests请求,并将其获取到的Responses交还给Scrapy Engine(引擎),由引擎交给Spider来处理. Spider(爬虫):它负责处理所有Respon...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
-
Docker使用Oracle官方镜像安装(12C,18C,19C)
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8编译安装MySQL8.0.19
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
推荐阅读
最新文章
- MySQL8.0.19开启GTID主从同步CentOS8
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Windows10,CentOS7,CentOS8安装Nodejs环境
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Docker安装Oracle12C,快速搭建Oracle学习环境
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- CentOS7设置SWAP分区,小内存服务器的救世主