Iceberg 在基于 Flink 的流式数据入库场景中的应用
本文以流式数据入库的场景为基础,介绍引入 Iceberg 作为落地格式和嵌入 Flink sink 的收益,并分析了当前可实现的框架及要点。
应用场景
流式数据入库,是大数据和数据湖的典型应用场景。上游的流式数据,如日志,或增量修改,通过数据总线,经过必要的处理后,汇聚并存储于数据湖,供下游的应用(如报表或者商业智能分析)使用。
上述的应用场景通常有如下的痛点,需要整个流程不断的优化:
- 支持流式数据写入,并保证端到端的不重不丢(即 exactly-once);
- 尽量减少中间环节,能支持更实时(甚至是 T+0)的读取或导出,给下游提供更实时更准确的基础数据;
- 支持 ACID,避免脏读等错误发生;
- 支持修改已落地的数据,虽然大数据和数据湖长于处理静态的或者缓慢变化的数据,即读多写少的场景,但方便的修改功能可以提升用户体验,避免用户因为极少的修改,手动更换整个数据文件,甚至是重新导出;
- 支持修改表结构,如增加或者变更列;而且变更不要引起数据的重新组织。
引入 Iceberg 作为 Flink sink
为了解决上述痛点,我们引入了 Iceberg 作为数据落地的格式。Iceberg 支持 ACID 事务、修改和删除、独立于计算引擎、支持表结构和分区方式动态变更等特性,很好的满足我们的需求。
同时,为了支持流式数据的写入,我们引入 Flink 作为流式处理框架,并将 Iceberg 作为 Flink sink。
下文主要介绍 Flink Iceberg sink 的实现框架和要点。但在这之前,需要先介绍一些实现中用到的 Flink 基本概念。
Flink 基本概念
从 Flink 的角度如何理解"流"和"批"
Flink 使用 DataFrame API 来统一的处理流和批数据。
Stream, Transformation 和 Operator
一个 Flink 程序由 stream 和 transformation 组成:
- Stream: Transformation 之间的中间结果数据;
- Transformation:对(一个或多个)输入 stream 进行操作,输出(一个或多个)结果 stream。
当 Flink 程序执行时,其被映射成 Streaming Dataflow,由如下的部分组成:
- Source (operator):接收外部输入给 Flink;
- Transformation (operator):中间对 stream 做的任何操作;
- Sink (operator):Flink 输出给外部。
下图为 Flink 官网的示例,展示了一个以 Kafka 作为输入 Source,经过中间两个 transformation,最终通过 sink 输出到 Flink 之外的过程。
State, Checkpoint and Snapshot
Flink 依靠 checkpoint 和基于 snapshot 的恢复机制,保证程序 state 的一致性,实现容错。
Checkpoint 是对分布式的数据流,以及所有 operator 的 state,打 snapshot 的过程。
■ State
一个 operator 的 state,即它包含的所有用于恢复当前状态的信息,可分为两类:
- 系统 state:如 operator 中对数据的缓存。
- 用户自定义 state:和用户逻辑相关,可以利用 Flink 提供的 managed state,如 ValueState、ListState,来存储。
State 的存储位置,可以分为:
- Local:内存,或者本地磁盘
- State backend:远端的持久化存储,如 HDFS。
如下图所示:
■ Checkpoint
Flink 做 checkpoint 的过程如下:
- Checkpoint coordinator 首先发送 barrier 给 source。
- Source 做 snapshot,完成后向 coordinator 确认。
- Source 向下游发送 barrier。
- 下游 operator 收到所有上游的 barrier 后,做 snapshot,完成后向 coordinator 确认。
- 继续往下游发送 barrier,直到 sink。
- Sink 通知 coordinator 自己完成 checkpoint。
- Coordinator 确认本周期 snapshot 做完。
如下图所示:
■ Barrier
Barrier 是 Flink 做分布式 snapshot 的重要概念。它作为一个系统标记,被插入到数据流中,随真实数据一起,按照数据流的方向,从上游向下游传递。
由于每个 barrier 唯一对应 checkpoint id,所以数据流中的 record 实际被 barrier 分组,如下图所示,barrier n 和 barrier n-1 之间的 record,属于 checkpoint n。
Barrier 的作用是在分布式的数据流中,将 operator 的多个输入流按照 checkpoint对齐(align),如下图所示:
Flink Iceberg sink
了解了上述 Flink 的基本概念,这些概念又是如何被应用和映射到 Flink Iceberg sink 当中的呢?
总体框架
如图,Flink Iceberg sink 有两个主要模块和两个辅助模块组成:
实现要点
■ Writer
- 在当前的实现中,Java 的 Map 作为每条记录,输入给 writer。内部逻辑先将其转化为作为中间格式的 Avro IndexedRecord,而后通过 Iceberg 里的 Parquet 相关 API,累积的写入 DataFile。
- 使用 Avro 作为中间格式是一个临时方案,为简化适配,并最大限度的利用现有逻辑。但长期来看,使用中间格式会影响处理效率,社区也在试图通过 ISSUE-870 来去掉 Avro,进而使用 Iceberg 内建的数据类型作为输入,同时也需要加入一个到 Flink 内建数据类型的转换器。
- 在做 checkpoint 的过程中,发送 writer 自己的 barrier 到下游的 committer 之前,关闭单个 Parquet 文件,构建 DataFile,并发送 DataFile 的信息给下游。
■ Committer
- 全局唯一的 Committer 在收到上游所有 writer 的 barrier 以后,将收到的 DataFile 的信息填入 manifest file,并使用 ListState 把 manifest file 作为用户自定义的 state,保存于 snapshot 中。
- 当 checkpoint 完成以后,通过 merge append 将 manifest file 提交给 Iceberg。Iceberg 内部通过后续的一系列操作完成 commit。最终让新加入的数据对其他的读任务可见。
试用 Flink Iceberg sink
社区上 https://github.com/apache/incubator-iceberg/pull/856 提供了可以试用的原型代码。下载该 patch 放入 master 分支,编译并构建即可。如下的程序展示了如何将该 sink 嵌入到 Flink 数据流中:
// Configurate catalog org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration(); hadoopConf.set( org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS.varname, META_STORE_URIS); hadoopConf.set( org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname, META_STORE_WAREHOUSE); Catalog icebergCatalog = new HiveCatalog(hadoopConf); // Create Iceberg table Schema schema = new Schema( ... ); PartitionSpec partitionSpec = builderFor(schema)... TableIdentifier tableIdentifier = TableIdentifier.of(DATABASE_NAME, TABLE_NAME); // If needed, check the existence of table by loadTable() and drop it // before creating it icebergCatalog.createTable(tableIdentifier, schema, partitionSpec); // Obtain an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Enable checkpointing env.enableCheckpointing(...); // Add Source DataStream<Map<String, Object>> dataStream = env.addSource(source, typeInformation); // Configure Ieberg sink Configuration conf = new Configuration(); conf.setString( org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname, META_STORE_URIS); conf.setString(IcebergConnectorConstant.DATABASE, DATABASE_NAME); conf.setString(IcebergConnectorConstant.TABLE, TABLE_NAME); // Append Iceberg sink to data stream IcebergSinkAppender<Map<String, Object>> appender = new IcebergSinkAppender<Map<String, Object>>(conf, "test") .withSerializer(MapAvroSerializer.getInstance()) .withWriterParallelism(1); appender.append(dataStream); // Trigger the execution env.execute("Sink Test");
后续规划
Flink Iceberg sink 有很多需要完善的地方,例如:上文中提到的去掉 Avro 作为中间格式;以及在各种失败的情况下是否仍能保证端到端的 exactly-once;按固定时长做 checkpoint,在高低峰时生成不同大小的 DataFile,是否对后续读不友好等。这些问题都在我们的后续规划中,也会全数贡献给社区。
参考资料:
[1] Iceberg 官网:
https://iceberg.apache.org/
[2] Flink 1.10文 档:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/
[3] Neflix 提供的 Flink Iceberg connector 原型:
https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg
[4] Flink Iceberg sink 设计文档:
https://docs.google.com/document/d/19M-sP6FlTVm7BV7MM4Om1n_MVo1xCy7GyDl_9ZAjVNQ/edit?usp=sharing
[5] Flink 容错机制(checkpoint) :
https://www.cnblogs.com/starzy/p/11439988.html
普惠全球开发者,这一次,格外与众不同!首个 Apache 顶级项目在线会议 Flink Forward 全球直播中文精华版来啦,聚焦 Alibaba、Google、AWS、Uber、Netflix、新浪微博等海内外一线厂商,经典 Flink 应用场景,最新功能、未来规划一览无余。点击下方链接可了解更多大会详情:https://developer.aliyun.com/live/2594?spm=a2c6h.14242504.J_6074706160.2.3fca361f4cYyQx
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
大数据就业前景分析
大数据就业前景分析,大数据广泛应用于电网运行、经营管理及优质服务等各大领域,并正在改变着各行各业,也引领了大数据人才的变革。大数据就业前景怎么样?这对于在就业迷途中的我们是一个很重要的信息。随着大数据时代的到来,这次国家教育部也改革动真格了。大军即将进入,全民开始行动了。2019年各大高校都已经开设大数据专业,真正的竞争压力马上就会来了,已经加入大数据行业的同学很幸运,一定要抓住一切可以抓住机会,全身心的投入。人生不只是低头拉车,更要抬头看路。1、大数据人才需求及现状分析随着国家重视大数据,政府扶持大数据,大数据在企业中生根发芽,开花结果。未来三至五年,中国需要180万数据人才,但目前只有约30万人。PS:各大招聘网站的岗位需求前程无忧大数据岗位搜索,共29854个职位满足条件;智联招聘大数据岗位搜索,共27627个职位满足条件;猎聘网大数据岗位搜索,共1000+个职位满足条件;拉勾网大数据岗位搜索,共500+个职位满足条件。高薪只要你敢想,敢付出,还害怕高薪的人不是自己吗?2、人才缺口大,钱途可观1)JavaJava以及基于Java的框架,被发现俨然成为了硅谷最大的那些高科技公司的骨...
- 下一篇
spark面试该准备点啥
本文转载自公众号: Spark学习技巧作者:浪尖原文链接:https://mp.weixin.qq.com/s/xcQOL2HyNB8Ro7QChu6Ngw 最近很多球友都说在准备面试,不知道准备点啥,尤其是spark,实际上浪尖分享的内容真的都掌握了,应对一般面试绝对没问题,但是遗憾的事情是很多人都是处于不会主动搜集资料,主动梳理知识,主动记忆整理知识,而是伸手要粮的境地。浪尖觉得这个是阻止你成长的罪魁祸手。前天跟朋友聚餐就说道这种情况,不努力,不加班给自己喂粮的,没有足够量和时间积累的人很难在一个领域里有所建树。 主动学习,保持激情,不断提高~ 言归正传,大部分面试者被面试的spark问题估计都会集中于spark core,spark streaming和spark sql,至于mllib和graphx这些估计都是了解项,当然也要逐
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- 设置Eclipse缩进为4个空格,增强代码规范
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- Mario游戏-低调大师作品
- CentOS8编译安装MySQL8.0.19
- CentOS7,8上快速安装Gitea,搭建Git服务器
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长