Flink 1.11 新特性之 SQL Hive Streaming 简单示例
7月7日,Flink 1.11 版本发布,与 1.10 版本相比,1.11 版本最为显著的一个改进是 Hive Integration 显著增强,也就是真正意义上实现了基于 Hive 的流批一体。
本文用简单的本地示例来体验 Hive Streaming 的便利性并跟大家分享体验的过程以及我的心得,希望对大家上手使用有所帮助。
添加相关依赖
测试集群上的 Hive 版本为 1.1.0,Hadoop 版本为 2.6.0,Kafka 版本为 1.0.1。
<properties> <scala.bin.version>2.11</scala.bin.version> <flink.version>1.11.0</flink.version> <flink-shaded-hadoop.version>2.6.5-10.0</flink-shaded-hadoop.version> <hive.version>1.1.0</hive.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop-2-uber</artifactId> <version>${flink-shaded-hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> </dependency>
另外,别忘了找到 hdfs-site.xml 和 hive-site.xml,并将其加入项目。
创建执行环境
Flink 1.11 的 Table/SQL API 中,FileSystem Connector 是靠增强版 StreamingFileSink 组件实现,在源码中名为 StreamingFileWriter。我们知道,只有在 Checkpoint 成功时,StreamingFileSink 写入的文件才会由 Pending 状态变成 Finished 状态,从而能够安全地被下游读取。所以,我们一定要打开 Checkpointing,并设定合理的间隔。
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) streamEnv.setParallelism(3) val tableEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings) tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE) tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20))
注册 HiveCatalog
val catalogName = "my_catalog" val catalog = new HiveCatalog( catalogName, // catalog name "default", // default database "/Users/lmagic/develop", // Hive config (hive-site.xml) directory "1.1.0" // Hive version ) tableEnv.registerCatalog(catalogName, catalog) tableEnv.useCatalog(catalogName)
创建 Kafka 流表
Kafka Topic 中存储的是 JSON 格式的埋点日志,建表时用计算列生成事件时间与水印。1.11 版本 SQL Kafka Connector 的参数相比 1.10 版本有一定简化。
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS stream_tmp") tableEnv.executeSql("DROP TABLE IF EXISTS stream_tmp.analytics_access_log_kafka") tableEnv.executeSql( """ |CREATE TABLE stream_tmp.analytics_access_log_kafka ( | ts BIGINT, | userId BIGINT, | eventType STRING, | fromType STRING, | columnType STRING, | siteId BIGINT, | grouponId BIGINT, | partnerId BIGINT, | merchandiseId BIGINT, | procTime AS PROCTIME(), | eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000,'yyyy-MM-dd HH:mm:ss')), | WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND |) WITH ( | 'connector' = 'kafka', | 'topic' = 'ods_analytics_access_log', | 'properties.bootstrap.servers' = 'kafka110:9092,kafka111:9092,kafka112:9092' | 'properties.group.id' = 'flink_hive_integration_exp_1', | 'scan.startup.mode' = 'latest-offset', | 'format' = 'json', | 'json.fail-on-missing-field' = 'false', | 'json.ignore-parse-errors' = 'true' |) """.stripMargin )
前面已经注册了 HiveCatalog,故在 Hive 中可以观察到创建的 Kafka 流表的元数据(注意该表并没有事实上的列)。
hive> DESCRIBE FORMATTED stream_tmp.analytics_access_log_kafka; OK # col_name data_type comment # Detailed Table Information Database: stream_tmp Owner: null CreateTime: Wed Jul 15 18:25:09 CST 2020 LastAccessTime: UNKNOWN Protect Mode: None Retention: 0 Location: hdfs://sht-bdmq-cls/user/hive/warehouse/stream_tmp.db/analytics_access_log_kafka Table Type: MANAGED_TABLE Table Parameters: flink.connector kafka flink.format json flink.json.fail-on-missing-field false flink.json.ignore-parse-errors true flink.properties.bootstrap.servers kafka110:9092,kafka111:9092,kafka112:9092 flink.properties.group.id flink_hive_integration_exp_1 flink.scan.startup.mode latest-offset flink.schema.0.data-type BIGINT flink.schema.0.name ts flink.schema.1.data-type BIGINT flink.schema.1.name userId flink.schema.10.data-type TIMESTAMP(3) flink.schema.10.expr TO_TIMESTAMP(FROM_UNIXTIME(`ts` / 1000, 'yyyy-MM-dd HH:mm:ss')) flink.schema.10.name eventTime flink.schema.2.data-type VARCHAR(2147483647) flink.schema.2.name eventType # 略...... flink.schema.9.data-type TIMESTAMP(3) NOT NULL flink.schema.9.expr PROCTIME() flink.schema.9.name procTime flink.schema.watermark.0.rowtime eventTime flink.schema.watermark.0.strategy.data-type TIMESTAMP(3) flink.schema.watermark.0.strategy.expr `eventTime` - INTERVAL '15' SECOND flink.topic ods_analytics_access_log is_generic true transient_lastDdlTime 1594808709 # Storage Information SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe InputFormat: org.apache.hadoop.mapred.TextInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat Compressed: No Num Buckets: -1 Bucket Columns: [] Sort Columns: [] Storage Desc Params: serialization.format 1 Time taken: 1.797 seconds, Fetched: 61 row(s)
创建 Hive 表
Flink SQL 提供了兼容 HiveQL 风格的 DDL,指定 SqlDialect.HIVE 即可( DML 兼容还在开发中)。
为了方便观察结果,以下的表采用了天/小时/分钟的三级分区,实际应用中可以不用这样细的粒度(10分钟甚至1小时的分区可能更合适)。
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS hive_tmp") tableEnv.executeSql("DROP TABLE IF EXISTS hive_tmp.analytics_access_log_hive") tableEnv.executeSql( """ |CREATE TABLE hive_tmp.analytics_access_log_hive ( | ts BIGINT, | user_id BIGINT, | event_type STRING, | from_type STRING, | column_type STRING, | site_id BIGINT, | groupon_id BIGINT, | partner_id BIGINT, | merchandise_id BIGINT |) PARTITIONED BY ( | ts_date STRING, | ts_hour STRING, | ts_minute STRING |) STORED AS PARQUET |TBLPROPERTIES ( | 'sink.partition-commit.trigger' = 'partition-time', | 'sink.partition-commit.delay' = '1 min', | 'sink.partition-commit.policy.kind' = 'metastore,success-file', | 'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00' |) """.stripMargin )
Hive 表的参数复用了 SQL FileSystem Connector 的相关参数,与分区提交(Partition Commit)密切相关。仅就上面出现的4个参数简单解释一下。
- sink.partition-commit.trigger:触发分区提交的时间特征。默认为 processing-time,即处理时间,很显然在有延迟的情况下,可能会造成数据分区错乱。所以这里使用 partition-time,即按照分区时间戳(即分区内数据对应的事件时间)来提交。
- partition.time-extractor.timestamp-pattern:分区时间戳的抽取格式。需要写成 yyyy-MM-dd HH:mm:ss 的形式,并用 Hive 表中相应的分区字段做占位符替换。显然,Hive 表的分区字段值来自流表中定义好的事件时间,后面会看到。
- sink.partition-commit.delay:触发分区提交的延迟。在时间特征设为 partition-time 的情况下,当水印时间戳大于分区创建时间加上此延迟时,分区才会真正提交。此值最好与分区粒度相同,例如若 Hive 表按1小时分区,此参数可设为 1 h,若按 10 分钟分区,可设为 10 min。
- sink.partition-commit.policy.kind:分区提交策略,可以理解为使分区对下游可见的附加操作。 metastore 表示更新 Hive Metastore 中的表元数据, success-file 则表示在分区内创建 _SUCCESS 标记文件。
当然,SQL FileSystem Connector 的功能并不限于此,还有很大自定义的空间(如可以自定义分区提交策略以合并小文件等)。具体可参见官方文档。
流式写入 Hive
注意将流表中的事件时间转化为 Hive 的分区。
tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) tableEnv.executeSql( """ |INSERT INTO hive_tmp.analytics_access_log_hive |SELECT | ts,userId,eventType,fromType,columnType,siteId,grouponId,partnerId,merchandiseId, | DATE_FORMAT(eventTime,'yyyy-MM-dd'), | DATE_FORMAT(eventTime,'HH'), | DATE_FORMAT(eventTime,'mm') |FROM stream_tmp.analytics_access_log_kafka |WHERE merchandiseId > 0 """.stripMargin )
来观察一下流式 Sink 的结果吧。
上文设定的 Checkpoint Interval 是 20 秒,可以看到,上图中的数据文件恰好是以 20 秒的间隔写入的。由于并行度为 3,所以每次写入会生成 3 个文件。分区内所有数据写入完毕后,会同时生成 _SUCCESS 文件。如果是正在写入的分区,则会看到 .inprogress 文件。
通过 Hive 查询一下,确定数据的时间无误。
hive> SELECT from_unixtime(min(cast(ts / 1000 AS BIGINT))),from_unixtime(max(cast(ts / 1000 AS BIGINT))) > FROM hive_tmp.analytics_access_log_hive > WHERE ts_date = '2020-07-15' AND ts_hour = '23' AND ts_minute = '23'; OK 2020-07-15 23:23:00 2020-07-15 23:23:59 Time taken: 1.115 seconds, Fetched: 1 row(s)
流式读取 Hive
要将 Hive 表作为流式 Source,需要启用 Dynamic Table Options,并通过 Table Hints 来指定 Hive 数据流的参数。以下是简单地通过 Hive 计算商品 PV 的例子。
tableEnv.getConfig.getConfiguration.setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true) val result = tableEnv.sqlQuery( """ |SELECT merchandise_id,count(1) AS pv |FROM hive_tmp.analytics_access_log_hive |/*+ OPTIONS( | 'streaming-source.enable' = 'true', | 'streaming-source.monitor-interval' = '1 min', | 'streaming-source.consume-start-offset' = '2020-07-15 23:30:00' |) */ |WHERE event_type = 'shtOpenGoodsDetail' |AND ts_date >= '2020-07-15' |GROUP BY merchandise_id |ORDER BY pv DESC LIMIT 10 """.stripMargin ) result.toRetractStream[Row].print().setParallelism(1) streamEnv.execute()
三个 Table Hint 参数的含义解释如下。
- streaming-source.enable:设为 true,表示该 Hive 表可以作为 Source。
- streaming-source.monitor-interval:感知 Hive 表新增数据的周期,以上设为 1 分钟。对于分区表而言,则是监控新分区的生成,以增量读取数据。
- streaming-source.consume-start-offset:开始消费的时间戳,同样需要写成 yyyy-MM-dd HH:mm:ss 的形式。
更加具体的说明仍然可参见官方文档。
最后,由于 SQL 语句中有 ORDER BY 和 LIMIT 逻辑,所以需要调用 toRetractStream() 方法转化为回撤流,即可输出结果。
The End
Flink 1.11 的 Hive Streaming 功能大大提高了 Hive 数仓的实时性,对 ETL 作业非常有利,同时还能够满足流式持续查询的需求,具有一定的灵活性。
感兴趣的同学也可以自己上手测试。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
解决问题 1474 个,Flink 1.11 究竟有哪些易用性上的改善?
作者 | 王治江,Apache Flink PMC 7月7日,Flink 1.11.0 正式发布了,作为这个版本的 release manager 之一,我想跟大家分享一下其中的经历感受以及一些代表性 feature 的解读。在进入深度解读前,我们先简单了解下社区发布的一般流程,帮助大家更好的理解和参与 Flink 社区的工作。 首先在每个版本的规划初期,会从志愿者中选出 1-2 名作为 Release Manager。1.11.0 版本我作为中国这边的 Release Manager,同时还有一名来自 Ververica 的 Piotr Nowojski 作为德国方的 Release Manager,这在某种程度上也说明中国的开发者和贡献度在整个社区的占比很重要。 接下来会进行这个版本的 Feature Kickoff。在一些大的方向上,社区的规划周期可能比较久,会分阶段、分步骤跨越多个版本完成,确保质量。每个版本的侧重点也会有所不同,比如前两个版本侧重于批处理的加强,而这个版本更侧重于流处理易用性的提升。社区规划的 Feature 列表会在邮件列表中发起讨论,以收集更多的用户/开发...
- 下一篇
MaxCompute资源管理—— 包年包月资源分时
背景概述 在数据开发过程中,普遍有这样的场景:不同项目(开发、生产)对资源使用的时段不同,如数仓团队有离线生产project在夜间是计算高峰期,开发project和业务分析团队的project计算高峰在白天。即不同project在不同的时段对CU的消耗需求不同,日常峰谷时间段比较固定。 MaxCompute管家之前提供的自定义配额组,只能对配额进行二级切分,但无法进行时段切分,2020年7月23日开始,MaxCompute 管家陆续对各个区域进行升级新版本,新版本支持的功能之一——分时配额,将支持对预留计算资源(预留CU)进行按时段切分,满足时段的差异化资源需求,满足对资源隔离(生产/开发/自助分析)不同工作负载的能力,避免相互干扰,同时更大化提高资源使用率。 本文将介绍如何进行分时配额,协助您更快上手使用分时配额功能,提高资源使用率。 注意事项 基本原则: 只有包年包月的预留计算资源(预留CU)支持分时配置,非预留计算资源不支持分时。 开启分时配置,是对当前region下所有配额组都开启,但默认quota组不支持编辑。 配额组开启分时后,默认是一个时段(00:00:00—23:59:...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Red5直播服务器,属于Java语言的直播服务器
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS7设置SWAP分区,小内存服务器的救世主
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- SpringBoot2全家桶,快速入门学习开发网站教程
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS7安装Docker,走上虚拟化容器引擎之路
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装