Flink-Table-&-SQL
简介
Apache Flink具有两个关系API - 表API和SQL - 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,Table API和SQL接口彼此紧密集成,以及Flink的DataStream和DataSet API。您可以轻松地在基于API构建的所有API和库之间切换。例如,您可以使用CEP库从DataStream中提取模式,然后使用Table API分析模式,或者可以在预处理上运行Gelly图算法之前使用SQL查询扫描,过滤和聚合批处理表数据。
Flink SQL的编程模型
创建一个TableEnvironment
TableEnvironment是Table API和SQL集成的核心概念,它主要负责:
1、在内部目录中注册一个Table
2、注册一个外部目录
3、执行SQL查询
4、注册一个用户自定义函数(标量、表及聚合)
5、将DataStream或者DataSet转换成Table
6、持有ExecutionEnvironment或者StreamExecutionEnvironment的引用
一个Table总是会绑定到一个指定的TableEnvironment中,相同的查询不同的TableEnvironment是无法通过join、union合并在一起。
TableEnvironment有一个在内部通过表名组织起来的表目录,Table API或者SQL查询可以访问注册在目录中的表,并通过名称来引用它们。
在目录中注册表
TableEnvironment允许通过各种源来注册一个表:
1、一个已存在的Table对象,通常是Table API或者SQL查询的结果
Table projTable = tableEnv.scan("X").select(...);
2、TableSource,可以访问外部数据如文件、数据库或者消息系统
TableSource csvSource = new CsvTableSource("/path/to/file", ...);
3、DataStream或者DataSet程序中的DataStream或者DataSet
//将DataSet转换为Table
Table table= tableEnv.fromDataSet(tableset);
注册TableSink
注册TableSink可用于将 Table API或SQL查询的结果发送到外部存储系统,例如数据库,键值存储,消息队列或文件系统(在不同的编码中,例如,CSV,Apache [Parquet] ,Avro,ORC],......):
TableSink csvSink = new CsvTableSink("/path/to/file", ...);
2、 String[] fieldNames = {"a", "b", "c"};
TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);
实战案例一
基于Flink SQL的WordCount:
public class WordCountSQL {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
List list = new ArrayList();
String wordsStr = "Hello Flink Hello TOM";
String[] words = wordsStr.split("\\W+");
for(String word : words){
WC wc = new WC(word, 1);
list.add(wc);
}
DataSet<WC> input = env.fromCollection(list);
tEnv.registerDataSet("WordCount", input, "word, frequency");
Table table = tEnv.sqlQuery(
"SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
DataSet<WC> result = tEnv.toDataSet(table, WC.class);
result.print();
}//main
public static class WC {
public String word;//hello
public long frequency;//1
// public constructor to make it a Flink POJO
public WC() {}
public WC(String word, long frequency) {
this.word = word;
this.frequency = frequency;
}
@Override
public String toString() {
return "WC " + word + " " + frequency;
}
}
}
输出如下:
WC TOM 1
WC Hello 2
WC Flink 1
实战案例二
本例稍微复杂,首先读取一个文件中的内容进行统计,并写入到另外一个文件中:
public class SQLTest {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
env.setParallelism(1);
DataSource<String> input = env.readTextFile("test.txt");
input.print();
//转换成dataset
DataSet<Orders> topInput = input.map(new MapFunction<String, Orders>() {
@Override
public Orders map(String s) throws Exception {
String[] splits = s.split(" ");
return new Orders(Integer.valueOf(splits[0]), String.valueOf(splits[1]),String.valueOf(splits[2]), Double.valueOf(splits[3]));
}
});
//将DataSet转换为Table
Table order = tableEnv.fromDataSet(topInput);
//orders表名
tableEnv.registerTable("Orders",order);
Table tapiResult = tableEnv.scan("Orders").select("name");
tapiResult.printSchema();
Table sqlQuery = tableEnv.sqlQuery("select name, sum(price) as total from Orders group by name order by total desc");
//转换回dataset
DataSet<Result> result = tableEnv.toDataSet(sqlQuery, Result.class);
//将dataset map成tuple输出
/*result.map(new MapFunction<Result, Tuple2<String,Double>>() {
@Override
public Tuple2<String, Double> map(Result result) throws Exception {
String name = result.name;
Double total = result.total;
return Tuple2.of(name,total);
}
}).print();*/
TableSink sink = new CsvTableSink("SQLTEST.txt", "|");
//writeToSink
/*sqlQuery.writeToSink(sink);
env.execute();*/
String[] fieldNames = {"name", "total"};
TypeInformation[] fieldTypes = {Types.STRING, Types.DOUBLE};
tableEnv.registerTableSink("SQLTEST", fieldNames, fieldTypes, sink);
sqlQuery.insertInto("SQLTEST");
env.execute();
}
/**
* 源数据的映射类
*/
public static class Orders {
/**
* 序号,姓名,书名,价格
*/
public Integer id;
public String name;
public String book;
public Double price;
public Orders() {
super();
}
public Orders(Integer id, String name, String book, Double price) {
this.id = id;
this.name = name;
this.book = book;
this.price = price;
}
}
/**
* 统计结果对应的类
*/
public static class Result {
public String name;
public Double total;
public Result() {}
}
}//

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
Flink-Kafka-Connector Flink结合Kafka实战
简介 Flink-kafka-connector用来做什么? Kafka中的partition机制和Flink的并行度机制结合,实现数据恢复Kafka可以作为Flink的source和sink任务失败,通过设置kafka的offset来恢复应用 kafka简单介绍 关于kafka,我们会有专题文章介绍,这里简单介绍几个必须知道的概念。 1.生产者(Producer) 顾名思义,生产者就是生产消息的组件,它的主要工作就是源源不断地生产出消息,然后发送给消息队列。生产者可以向消息队列发送各种类型的消息,如狭义的字符串消息,也可以发送二进制消息。生产者是消息队列的数据源,只有通过生产者持续不断地向消息队列发送消息,消息队列才能不断处理消息。 2.消费者(Consumer) 所谓消费者,指的是不断消费(获取)消息的组件,它获取消息的来源就是消息队列(即Kafka本身)。换句话说,生产者不断向消息队列发送消息,而消费者则不断从消息队列中获取消息。 3.主题(Topic) 主题是Kafka中一个极为重要的概念。首先,主题是一个逻辑上的概念,它用于从逻辑上来归类与存储消息本身。多个生产者可以向一个T...
-
下一篇
Flink实战项目之实时热销排行
需求 某个图书网站,希望看到双十一秒杀期间实时的热销排行榜单。我们可以将“实时热门商品”翻译成程序员更好理解的需求:每隔5秒钟输出最近一小时内点击量最多的前 N 个商品/图书. 需求分解 将这个需求进行分解我们大概要做这么几件事情: 告诉 Flink 框架基于时间做窗口,我们这里用processingTime,不用自带时间戳 过滤出图书点击行为数据 按一小时的窗口大小,每5秒钟统计一次,做滑动窗口聚合(Sliding Window) 聚合,输出窗口中点击量前N名的商品 代码实现 向Kafka发消息模拟购买事件 public class KafkaProducer { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> text = env.addSource(new MyNoParalleSo...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- MySQL数据库在高并发下的优化方案
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作