A Complete Example
A Complete Example
这个例子将关于人员的记录流作为输入,并将其过滤为只包含成人。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.api.common.functions.FilterFunction; public class Example { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Person> flintstones = env.fromElements( new Person("Fred", 35), new Person("Wilma", 35), new Person("Pebbles", 2) ); DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() { @Override public boolean filter(Person person) throws Exception { return person.age >= 18; } }); adults.print(); env.execute(); } public static class Person { public String name; public Integer age; public Person() {}; public Person(String name, Integer age) { this.name = name; this.age = age; }; public String toString() { return this.name.toString() + ": age" + this.age.toSrting(); }; } }
流执行环境
每个Flink应用程序都需要一个执行环境,在这个例子中是 env
。流式应用需要使用 StreamExecutionEnvironment
。
在你的应用程序中DataStream API的调用会建立一个关联到StreamExecutionEnvironment
的作业图。当env.execute()
被调用这个作业图就会被打包并发送给 Job Manager(作业管理器),作业管理器将作业并行化并将其片段分发给Task Manager(任务管理器)用于执行。每个作业的并行切片将会在task slot(任务槽)中执行。
需要注意的是,如果你不调用 execute()你的应用就不会跑。
此分布式运行时取决于您的应用程序是否可序列化。它还要求集群中的每个节点都可以使用所有依赖项。
基本流源
在上面的例子中我们通过env.fromElements(...)
构建了一个DataStream<Person>
。这是将简单流集合在一起以便在原型或测试中使用的便捷方式。在StreamExecutionEnvironment
上同样有一个方法fromCollection(Collection)
。我们可以这样实现:
List<Person> people = new ArrayList<Person>(); people.add(new Person("Fred", 35)); people.add(new Person("Wilma", 35)); people.add(new Person("Pebbles", 2)); DataStream<Person> flintstones = env.fromCollection(people);
在原型设计时将一些数据放入流中的另一种简单方式是使用套接字
DataStream<String> lines = env.socketTextStream("localhost", 9999)
或文件
DataStream<String> lines = env.readTextFile("file:///path")
在实际应用中,最常用的数据源是那些支持低延迟,高吞吐量并行度去以及倒带和重放的数据源 - 高性能和容错的先决条件 - 例如Apache Kafka, Kinesis 以及各种文件系统。REST APIs和数据库也经常用于丰富流。
基本流下沉
上例使用adults.print()
来显示结果到任务管理器的日志中(如果运行在IDE上则会出现在IDE的控制台中)。这个方法会为流中的每个元素调用toString()
。
输出看上去是这样的:
1> Fred: age 35 2> Wilma: age 35
1> 和 2> 指出了产生输出的子任务
你也可以写到文本文件
stream.writeAsText("/path/to/file")
或者CSV文件
stream.writeAsCsv("/path/to/file")
或者套接字
stream.writeToSocket(host, port, SerializationSchema)
在生产中,常用的接收器包括Kafka以及各种数据库和文件系统。
调试
在生产中,你将向应用程序运行的远程集群提交应用程序JAR文件。如果失败,远程也会失败。作业管理器和任务管理器日志在调试此类故障时非常游泳,但在IDE内部进行本地调试要容易的多,这是Flink支持的。你也可以设置断点,检查局部变量,并逐步执行代码。你也可以进入Flink代码,如果你想了解Flink的工作原理,这可能是了解更多内部信息的好方法。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
What can be Streamed
What can be Streamed Flink用于Java和Scala的DataStream APIs将允许传输他们可以序列化的任何内容。 Flink的序列化器用于: 简单类型:String,Long,integer,Boolean,Array 复合类型:Tuples,POJOs,Scala case classes 而Flink对于其他类型则回归于Kryo。 Java Tuples 对于Java而言,Flink定义了Tuple1到Tuple25类型。 Tuple2<String, Integer> person = new Tuple2<>("Fred", 35); // zero based index! String name = person.f0; Integer age = person.f1 POJOs 一个POJOs(普通的旧Java对象)是任何Java类: 有一个空的默认构造函数 所有域都是以下之一: public 有一个默认的getter和setter 例如: public class Person { public String na...
- 下一篇
云HBase X-Pack解决传统数据仓库瓶颈,赋能客户计算分析业务
某游戏公司随着业务快速发展,用户行为日志快速增长,需要从海量的点击流日志和激活日志中挖掘数据的价值,比如广告转化率、激活率,每日安装用户成本等等。 业务挑战 原来使用GreenPlum做实时计算和统计分析。但是GreenPlum存在以下缺陷,难以应对业务的快速发展: GreenPlum架构难以应对日益复杂的计算任务; Greenplum的单表分区数目有限制,同时多级分区支持不够友好,不适用单表数据量比较大且需要永久保存的日志,如果单个分区表数据量比较大的时候查询性能无法满足业务性能需求; Greenplum扩容时由于数据要重分布会比较慢; Greenplum不适合处理非结构化的数据。 解决方案 在这样的背景下选择了阿里云HBase SQL服务(Phoenix)+ Spark服务构建实时计算和数据仓库解决方案,其中HBase SQL提供
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- 设置Eclipse缩进为4个空格,增强代码规范
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- Mario游戏-低调大师作品
- CentOS8编译安装MySQL8.0.19
- CentOS7,8上快速安装Gitea,搭建Git服务器
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长