Flink 1.11:更好用的流批一体 SQL 引擎
许多的数据科学家,分析师和 BI 用户依赖交互式 SQL 查询分析数据。Flink SQL 是 Flink 的核心模块之一。作为一个分布式的 SQL 查询引擎。Flink SQL 提供了各种异构数据源的联合查询。开发者可以很方便地在一个程序中通过 SQL 编写复杂的分析查询。通过 CBO 优化器、列式存储、和代码生成技术,Flink SQL 拥有非常高的查询效率。同时借助于 Flink runtime 良好的容错和扩展性,Flink SQL 可以轻松处理海量数据。
在保证优秀性能的同时,易用性是 1.11 版本 Flink SQL 的重头戏。易用性的提升主要体现在以下几个方面:
- 更方便的追加或修改表定义
- 灵活的声明动态的查询参数
- 加强和统一了原有 TableEnv 上的 SQL 接口
- 简化了 connector 的属性定义
- 对 Hive 的 DDL 做了原生支持
- 加强了对 python UDF 的支持
下面逐一为大家介绍 ~
Create Table Like
在生产中,用户常常有调整现有表定义的需求。例如用户想在一些外部的表定义(例如 Hive metastore)基础上追加 Flink 特有的一些定义比如 watermark。在 ETL 场景中,将多张表的数据合并到一张表,目标表的 schema 定义其实是上游表的合集,需要一种方便合并表定义的方式。
从 1.11 版本开始,Flink 提供了 LIKE 语法,用户可以很方便的在已有的表定义上追加新的定义。
例如我们可以使用下面的语法给已有表 base_table 追加 watermark 定义:
CREATE [TEMPORARY] TABLE base_table ( id BIGINT, name STRING, tstmp TIMESTAMP, PRIMARY KEY(id) ) WITH ( 'connector': 'kafka' ) CREATE [TEMPORARY] TABLE derived_table ( WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND ) LIKE base_table;
这里 derived_table 表定义等价于如下定义:
CREATE [TEMPORARY] TABLE derived_table ( id BIGINT, name STRING, tstmp TIMESTAMP, PRIMARY KEY(id), WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND ) WITH ( ‘connector’: ‘kafka’ )
对比之下,新的语法省去了重复的 schema 定义,用户只需要定义追加属性,非常方便简洁。
多属性策略
有的小伙伴会问,原表和新表的属性只是新增或追加吗?如果我想覆盖或者排除某些属性该如何操作?这是一个好问题,Flink LIKE 语法提供了非常灵活的表属性操作策略。
LIKE 语法支持使用不同的 keyword 对表属性分类:
- ALL:完整的表定义
- CONSTRAINTS: primary keys, unique key 等约束
- GENERATED: 主要指计算列和 watermark
- OPTIONS: WITH (...) 语句内定义的 table options
- PARTITIONS: 表分区信息
在不同的属性分类上可以追加不同的属性行为:
- INCLUDING:包含(默认行为)
- EXCLUDING:排除
- OVERWRITING:覆盖
下面这张表格说明了不同的分类属性允许的行为:
例如下面的语句:
CREATE [TEMPORARY] TABLE base_table ( id BIGINT, name STRING, tstmp TIMESTAMP, PRIMARY KEY(id) ) WITH ( 'connector': 'kafka', 'scan.startup.specific-offsets': 'partition:0,offset:42;partition:1,offset:300', 'format': 'json' ) CREATE [TEMPORARY] TABLE derived_table ( WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND ) WITH ( 'connector.starting-offset': '0' ) LIKE base_table (OVERWRITING OPTIONS, EXCLUDING CONSTRAINTS);
等价的表属性定义为:
CREATE [TEMPORARY] TABLE derived_table ( id BIGINT, name STRING, tstmp TIMESTAMP, WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND ) WITH ( 'connector': 'kafka', 'scan.startup.specific-offsets': 'partition:0,offset:42;partition:1,offset:300', 'format': 'json' )
细节参见:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
Dynamic Table Options
在生产中,调整参数是一个常见需求,很多的时候是临时修改(比如通过终端查询和展示),比如下面这张 Kafka 表:
create table kafka_table ( id bigint, age int, name STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'employees', 'scan.startup.mode' = 'timestamp', 'scan.startup.timestamp-millis' = '123456', 'format' = 'csv', 'csv.ignore-parse-errors' = 'false' )
在之前的版本,如果用户有如下需求:
- 用户需要指定特性的消费时间戳,即修改 scan.startup.timestamp-millis 属性
- 用户想忽略掉解析错误,需要将 format.ignore-parse-errors 改为 true
只能使用 ALTER TABLE 这样的语句修改表的定义,从 1.11 开始,用户可以通过动态参数的形式灵活地设置表的属性参数,覆盖或者追加原表的 WITH (...) 语句内定义的 table options。
基本语法为:
table_name /*+ OPTIONS('k1'='v1', 'aa.bb.cc'='v2') */
OPTIONS 内的键值对会覆盖原表的 table options,用户可以在各种 SQL 语境中使用这样的语法,例如:
CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...); CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...); -- override table options in query source select id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */; -- override table options in join select * from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1 join kafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2 on t1.id = t2.id; -- override table options for INSERT target table insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select * from kafka_table2;
动态参数的使用没有语境限制,只要是引用表的地方都可以追加定义。在指定的表后面追加的动态参数会自动追加到原表定义中,是不是很方便呢 :)
由于可能对查询结果有影响,动态参数功能默认是关闭的, 使用下面的方式开启该功能:
// instantiate table environment TableEnvironment tEnv = ... // access flink configuration Configuration configuration = tEnv.getConfig().getConfiguration(); // set low-level key-value options configuration.setString("table.dynamic-table-options.enabled", "true");
细节参见:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/hints.html
SQL API 改进
随着 Flink SQL 支持的语句越来越丰富,老的 API 容易引起一些困惑:
- 原先的 sqlUpdate() 方法传递 DDL 语句会立即执行,而 INSERT INTO 语句在调用 execute 方法时才会执行
- Table 程序的执行入口不够清晰,像 TableEnvironment.execute() 和 StreamExecutionEnvironment.execute() 都可以触发 table 程序执行
- execute 方法没有返回值。像 SHOW TABLES 这样的语句没有很好地方式返回结果。另外,sqlUpdate 方法加入了越来越多的语句导致接口定义不清晰,sqlUpdate 可以执行 SHOW TABLES 就是一个反例
- 在 Blink planner 一直提供多 sink 优化执行的能力,但是在 API 层没有体现出来
1.11 重新梳理了 TableEnv 上的 sql 相关接口,提供了更清晰的执行语义,同时执行任意 sql 语句现在都有返回值,用户可以通过新的 API 灵活的组织多行 sql 语句一起执行。
更清晰的执行语义
新的接口 TableEnvironment#executeSql 统一返回抽象 TableResult,用户可以迭代 TableResult 拿到执行结果。根据执行语句的不同,返回结果的数据结构也有变化,比如 SELECT 语句会返回查询结果,而 INSERT 语句会异步提交作业到集群。
组织多条语句一起执行
新的接口 TableEnvironment#createStatementSet 允许用户添加多条 INSERT 语句并一起执行,在多 sink 场景,Blink planner 会针对性地对执行计划做优化。
新旧 API 对比
一张表格感受新老 API 的变化:
详情参见:https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
Hive 语法兼容加强
从 1.11 开始,Flink SQL 将 Hive parser 模块独立出来,用以兼容 Hive 的语法,目前 DDL 层面,DB、Table、View、Function 相关的语法均已支持。搭配 HiveCatalog,Hive 的同学可以直接使用 Hive 的语法来进行相关的操作。
在使用 hive 语句之前需要设置正确的 Dialect:
EnvironmentSettings settings = EnvironmentSettings.newInstance()...build(); TableEnvironment tableEnv = TableEnvironment.create(settings); // to use hive dialect tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); // use the hive catalog tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); tableEnv.useCatalog(hiveCatalog.getName());
之后我们便可以使用 Hive 的语法来执行一些 DDL,例如最常见的建表操作:
create external table tbl1 ( d decimal(10,0), ts timestamp) partitioned by (p string) location '%s' tblproperties('k1'='v1'); create table tbl2 (s struct<ts:timestamp,bin:binary>) stored as orc; create table tbl3 ( m map<timestamp,binary> ) partitioned by (p1 bigint, p2 tinyint) row format serde 'org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe'; create table tbl4 ( x int, y smallint) row format delimited fields terminated by '|' lines terminated by '\n';
对于 DQL 的 Hive 语法兼容已经在规划中,1.12 版本会兼容更多 query 语法 ~
详情参见:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_dialect.html
更简洁的 connector 属性
1.11 重新规范了 connector 的属性定义,新的属性 key 更加直观简洁,和原有的属性 key 相比主要做了如下改动:
- 使用 connector 作为 connector 的类型 key,connector 版本信息直接放到 value 中,比如 0.11 的 kafka 为 kafka-0.11
- 去掉了其余属性中多余的 connector 前缀
- 使用 scan 和 sink 前缀标记 source 和 sink 专有属性
- format.type 精简为 format ,同时 format 自身属性使用 format 的值作为前缀,比如 csv format 的自身属性使用 csv 统一作前缀
例如,1.11 Kafka 表的定义如下:
CREATE TABLE kafkaTable ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'csv', 'scan.startup.mode' = 'earliest-offset' )
JDBC catalog
在之前的版本中,用户只能通过显示建表的方式创建关系型数据库的镜像表。用户需要手动追踪 Flink SQL 的表 schema 和数据库的 schema 变更。在 1.11,Flink SQL 提供了一个 JDBC catalog 接口对接各种外部的数据库系统,例如 Postgres、MySQL、MariaDB、AWS Aurora、etc。
当前 Flink 内置了 Postgres 的 catalog 实现,使用下面的代码配置 JDBC catalog:
CREATE CATALOG mypg WITH( 'type' = 'jdbc', 'default-database' = '...', 'username' = '...', 'password' = '...', 'base-url' = '...' ); USE CATALOG mypg;
用户也可以实现 JDBCCatalog 接口定制其他数据库的 catalog ~
Python UDF 增强
1.11 版本的 py-flink 在 python UDF 方面提供了很多增强,包括 DDL 的定义方式、支持了标量的向量化 python UDF,支持全套的 python UDF metrics 定义,以及在 SQL-CLI 中定义 python UDF。
DDL 定义 python UDF
1.10.0 版本引入了对 python UDF 的支持。但是仅仅支持 python table api 的方式。1.11 提供了 SQL DDL 的方式定义 python UDF, 用户可以在 Java/Scala table API 以及 SQL-CLI 场景下使用。
例如,现在用户可以使用如下方式定义 Java table API 程序使用 python UDF:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); tEnv.getConfig().getConfiguration().setString("python.files", "/home/my/test1.py"); tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3"); tEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python"); Table table = tEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str").select("func1(str)"); tEnv.toDataSet(table, String.class).collect();
向量化支持
向量化 Python UDF 相较于普通函数大大提升了性能。用户可以使用流行的 python 库例如 Pandas、Numpy 来实现向量化的 python UDF。用户只需在装饰器 udf 中添加额外的参数 udf_type="pandas" 即可。
例如,下面的样例展示了如何定义向量化的 Python 标量函数以及在 python table api 中的应用:
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT(), udf_type="pandas") def add(i, j): return i + j table_env = BatchTableEnvironment.create(env) # register the vectorized Python scalar function table_env.register_function("add", add) # use the vectorized Python scalar function in Python Table API my_table.select("add(bigint, bigint)") # use the vectorized Python scalar function in SQL API table_env.sql_query("SELECT add(bigint, bigint) FROM MyTable")
详情参见:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/python/vectorized_python_udfs.html
另外,1.11 对 python UDF 的 metrics 做了全面支持,现在用户可以在 UDF 中方便地定义各种类型的 metrics,由于篇幅关系,这里不作详细描述,见 python UDF metrics。
详情参见:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/python/metrics.html
展望后续
在后续版本,易用性仍然是 Flink SQL 的核心主题,比如 schema 的易用性增强,Descriptor API 简化以及更丰富的流 DDL 将会是努力的方向,让我们拭目以待 ~
了解更多 Flink 1.11 重大变更与新增功能特性可点击「阅读原文」~

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
MaxCompute/Dataworks云数仓高可用最佳实践
学习工具通过MaxCompute官方文档和DataWorks官方文档了解相关产品的最新功能和使用方式MaxCompute官方文档:MaxCompute官方文档DataWorks官方文档:DataWorks官方文档一、logview排查作业在日常的开发过程中我们偶尔会发现某些任务突然耗时比较长,或者某些任务突然挂掉需要排查原因。Logview将用来协助我们完成这件事情。Logview是MaxCompute Job提交后查看和Debug任务的工具。通过Logview可看到一个Job的运行状态、运行结果以及运行细节和每个步骤的进度。当Job提交到MaxCompute后,会生成Logview的链接,用户可以直接在浏览器上打开Logview链接,进入查看Job的信息,而对于Logview上的诸多参数信息,究竟应该怎么发现问题所在呢?又如何通过Logview了解每个instance、task运行状态及资源占用情况,如何分析执行计划,分析query存在问题。下面主要介绍如何去使用Logview。1、Logview参数详解主要的信息包括ODPS Instance,其涵盖了队列信息以及子状态信息,另外一...
- 下一篇
字节跳动基于 Flink 的 MQ-Hive 实时数据集成
在数据中台建设过程中,一个典型的数据集成场景是将 MQ (Message Queue,例如 Kafka、RocketMQ 等)的数据导入到 Hive 中,以供下游数仓建设以及指标统计。由于 MQ-Hive 是数仓建设第一层,因此对数据的准确性以及实时性要求比较高。 本文主要围绕 MQ-Hive 场景,针对目前字节跳动内已有解决方案的痛点,提出基于 Flink 的实时解决方案,并介绍新方案在字节跳动内部的使用现状。 已有方案及痛点 字节跳动内已有解决方案如下图所示,主要分了两个步骤: 通过 Dump 服务将 MQ 的数据写入到 HDFS 文件 再通过 Batch ETL 将 HDFS 数据导入到 Hive 中,并添加 Hive 分区 痛点 任务链较长,原始数据需要经过多次转换最终才能进入 Hive 实时性比较差,Dump Service、Batch ETL 延迟都会导致最终数据产出延迟 存储、计算开销大,MQ 数据重复存储和计算 基于原生 Java 打造,数据流量持续增长后,存在单点故障和机器负载不均衡等问题 运维成本较高,架构上无法复用公司内 Hadoop/Flink/Yarn 等现有...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS6,CentOS7官方镜像安装Oracle11G
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- SpringBoot2全家桶,快速入门学习开发网站教程
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- MySQL8.0.19开启GTID主从同步CentOS8
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8编译安装MySQL8.0.19
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程