Flink的DataSource三部曲之二:内置connector
欢迎访问我的GitHub
https://github.com/zq2599/blog_demos
内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;
本篇概览
本文是《Flink的DataSource三部曲》系列的第二篇,上一篇《Flink的DataSource三部曲之一:直接API》学习了StreamExecutionEnvironment的API创建DataSource,今天要练习的是Flink内置的connector,即下图的红框位置,这些connector可以通过StreamExecutionEnvironment的addSource方法使用:
今天的实战选择Kafka作为数据源来操作,先尝试接收和处理String型的消息,再接收JSON类型的消息,将JSON反序列化成bean实例;
Flink的DataSource三部曲文章链接
源码下载
如果您不想写代码,整个系列的源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):
名称 | 链接 | 备注 |
---|---|---|
项目主页 | https://github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |
git仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https协议 |
git仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh协议 |
这个git项目中有多个文件夹,本章的应用在flinkdatasourcedemo文件夹下,如下图红框所示:
环境和版本
本次实战的环境和版本如下:
- JDK:1.8.0_211
- Flink:1.9.2
- Maven:3.6.0
- 操作系统:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
- IDEA:2018.3.5 (Ultimate Edition)
- Kafka:2.4.0
- Zookeeper:3.5.5
请确保上述内容都已经准备就绪,才能继续后面的实战;
Flink与Kafka版本匹配
- Flink官方对匹配Kafka版本做了详细说明,地址是:https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
- 要重点关注的是官方提到的通用版(universal Kafka connector ),这是从Flink1.7开始推出的,对于Kafka1.0.0或者更高版本都可以使用:
3. 下图红框中是我的工程中要依赖的库,蓝框中是连接Kafka用到的类,读者您可以根据自己的Kafka版本在表格中找到适合的库和类:
实战字符串消息处理
- 在kafka上创建名为test001的topic,参考命令:
./kafka-topics.sh \ --create \ --zookeeper 192.168.50.43:2181 \ --replication-factor 1 \ --partitions 2 \ --topic test001
- 继续使用上一章创建的flinkdatasourcedemo工程,打开pom.xml文件增加以下依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.10.0</version> </dependency>
- 新增类Kafka240String.java,作用是连接broker,对收到的字符串消息做WordCount操作:
package com.bolingcavalry.connector; import com.bolingcavalry.Splitter; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; import static com.sun.tools.doclint.Entity.para; public class Kafka240String { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度 env.setParallelism(2); Properties properties = new Properties(); //broker地址 properties.setProperty("bootstrap.servers", "192.168.50.43:9092"); //zookeeper地址 properties.setProperty("zookeeper.connect", "192.168.50.43:2181"); //消费者的groupId properties.setProperty("group.id", "flink-connector"); //实例化Consumer类 FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>( "test001", new SimpleStringSchema(), properties ); //指定从最新位置开始消费,相当于放弃历史消息 flinkKafkaConsumer.setStartFromLatest(); //通过addSource方法得到DataSource DataStream<String> dataStream = env.addSource(flinkKafkaConsumer); //从kafka取得字符串消息后,分割成单词,统计数量,窗口是5秒 dataStream .flatMap(new Splitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1) .print(); env.execute("Connector DataSource demo : kafka"); } }
- 确保kafka的topic已经创建,将Kafka240运行起来,可见消费消息并进行单词统计的功能是正常的:
5. 接收kafka字符串消息的实战已经完成,接下来试试JSON格式的消息;
实战JSON消息处理
- 接下来要接受的JSON格式消息,可以被反序列化成bean实例,会用到JSON库,我选择的是gson;
- 在pom.xml增加gson依赖:
<dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.5</version> </dependency>
- 增加类Student.java,这是个普通的Bean,只有id和name两个字段:
package com.bolingcavalry; public class Student { private int id; private String name; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
- 增加类StudentSchema.java,该类是DeserializationSchema接口的实现,将JSON反序列化成Student实例时用到:
ackage com.bolingcavalry.connector; import com.bolingcavalry.Student; import com.google.gson.Gson; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import java.io.IOException; public class StudentSchema implements DeserializationSchema<Student>, SerializationSchema<Student> { private static final Gson gson = new Gson(); /** * 反序列化,将byte数组转成Student实例 * @param bytes * @return * @throws IOException */ @Override public Student deserialize(byte[] bytes) throws IOException { return gson.fromJson(new String(bytes), Student.class); } @Override public boolean isEndOfStream(Student student) { return false; } /** * 序列化,将Student实例转成byte数组 * @param student * @return */ @Override public byte[] serialize(Student student) { return new byte[0]; } @Override public TypeInformation<Student> getProducedType() { return TypeInformation.of(Student.class); } }
- 新增类Kafka240Bean.java,作用是连接broker,对收到的JSON消息转成Student实例,统计每个名字出现的数量,窗口依旧是5秒:
package com.bolingcavalry.connector; import com.bolingcavalry.Splitter; import com.bolingcavalry.Student; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; public class Kafka240Bean { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度 env.setParallelism(2); Properties properties = new Properties(); //broker地址 properties.setProperty("bootstrap.servers", "192.168.50.43:9092"); //zookeeper地址 properties.setProperty("zookeeper.connect", "192.168.50.43:2181"); //消费者的groupId properties.setProperty("group.id", "flink-connector"); //实例化Consumer类 FlinkKafkaConsumer<Student> flinkKafkaConsumer = new FlinkKafkaConsumer<>( "test001", new StudentSchema(), properties ); //指定从最新位置开始消费,相当于放弃历史消息 flinkKafkaConsumer.setStartFromLatest(); //通过addSource方法得到DataSource DataStream<Student> dataStream = env.addSource(flinkKafkaConsumer); //从kafka取得的JSON被反序列化成Student实例,统计每个name的数量,窗口是5秒 dataStream.map(new MapFunction<Student, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(Student student) throws Exception { return new Tuple2<>(student.getName(), 1); } }) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1) .print(); env.execute("Connector DataSource demo : kafka bean"); } }
- 在测试的时候,要向kafka发送JSON格式字符串,flink这边就会给统计出每个name的数量:
至此,内置connector的实战就完成了,接下来的章节,我们将要一起实战自定义DataSource;
欢迎关注公众号:程序员欣宸
微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界... https://github.com/zq2599/blog_demos

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
前端学数据结构与算法(十一):看似简单又让人抓狂的二分查找算法
前言 二分查找法是一种高效的查找算法,它的思想非常好理解,但编写正确的二分查找并不简单。本章从最基础的二分查找实现开始,讲解其编写技巧,接下来介绍它的四个常见变种的编写,最后应用二分查找来解决真正的面试题,学以致用的同时更加深对其的理解,真正掌握它。 什么是二分查找? 这是一种在有序的数组里快速找到某个元素的高效算法。例如第一章举过的例子:你借了一摞书准备走出书店,其中有一本忘了登记,如何快速找出那本书?你可以一本本的尝试,也可以每一次直接就检测半摞书,再剩下的半摞书依然重复这个过程,这样12本书,你只用4次即可。像这样每次将数据一分为二的进行查找的算法,就是二分查找法。 二分查找的局限性 虽然说查找的效率很高,跟所有的算法一样,也有其局限性。 1. 必须使用数组 需要借助数组通过下标访问元素只需要O(1)的特性。这一点链表就无法做到,因为访问某个元素必须要遍历,会徒增复杂度。 2. 数据必须是有序的 因为每次是与中间元素进行比较,结果是舍弃一半的查找范围,所以这个中间元素需要起到临界值的作用。 3. 数据量太小没必要使用 数据量太小,和遍历的速度区别不大,那就没必要使用二分查找。也有...
- 下一篇
ConcatAdapter 顺序连接其他 Adapter
ConcatAdapter 是 recyclerview: 1.2.0-alpha 04 中提供的一个新组件,它可以帮我们顺序地组合多个 Adapter,并让它们显示在同一个 RecyclerView 中。这使您可以更好地封装 Adapter。您不必再将许多数据源组合到一个 Adapter 中,从而在减少 Adapter 复杂度的同时也让它们可以被复用。 这方面的一个用例,是在列表头部和底部显示加载状态:当列表从网络中检索数据时,我们想显示一个加载中的图标;如果出现错误,我们要显示错误信息和重试按钮。 △ 一个带有底部的 RecyclerView,底部显示了加载状态:加载进度或错误信息 ConcatAdapter 简介 ConcatAdapter 让我们可以顺序显示多个 Adapter 中的内容。例如,假设我们有下面三个 Adapter: val firstAdapter: FirstAdapter = … val secondAdapter: SecondAdapter = … val thirdAdapter: ThirdAdapter = … val concatAdapter=...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Windows10,CentOS7,CentOS8安装Nodejs环境
- MySQL8.0.19开启GTID主从同步CentOS8
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Hadoop3单机部署,实现最简伪集群
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- 设置Eclipse缩进为4个空格,增强代码规范
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS7设置SWAP分区,小内存服务器的救世主