数据湖有新解!Apache Hudi 与 Apache Flink 集成
作者:王祥虎(Apache Hudi 社区)
Apache Hudi 是由 Uber 开发并开源的数据湖框架,它于 2019 年 1 月进入 Apache 孵化器孵化,次年 5 月份顺利毕业晋升为 Apache 顶级项目。是当前最为热门的数据湖框架之一。
1. 为何要解耦
Hudi 自诞生至今一直使用 Spark 作为其数据处理引擎。如果用户想使用 Hudi 作为其数据湖框架,就必须在其平台技术栈中引入 Spark。放在几年前,使用 Spark 作为大数据处理引擎可以说是很平常甚至是理所当然的事。因为 Spark 既可以进行批处理也可以使用微批模拟流,流批一体,一套引擎解决流、批问题。然而,近年来,随着大数据技术的发展,同为大数据处理引擎的 Flink 逐渐进入人们的视野,并在计算引擎领域获占据了一定的市场,大数据处理引擎不再是一家独大。在大数据技术社区、论坛等领地,Hudi 是否支持使用 Flink 计算引擎的的声音开始逐渐出现,并日渐频繁。所以使 Hudi 支持 Flink 引擎是个有价值的事情,而集成 Flink 引擎的前提是 Hudi 与 Spark 解耦。
同时,纵观大数据领域成熟、活跃、有生命力的框架,无一不是设计优雅,能与其他框架相互融合,彼此借力,各专所长。因此将 Hudi 与 Spark 解耦,将其变成一个引擎无关的数据湖框架,无疑是给 Hudi 与其他组件的融合创造了更多的可能,使得 Hudi 能更好的融入大数据生态圈。
2. 解耦难点
Hudi 内部使用 Spark API 像我们平时开发使用 List 一样稀松平常。自从数据源读取数据,到最终写出数据到表,无处不是使用 Spark RDD 作为主要数据结构,甚至连普通的工具类,都使用 Spark API 实现,可以说 Hudi 就是用 Spark 实现的一个通用数据湖框架,它与 Spark 的绑定可谓是深入骨髓。
此外,此次解耦后集成的首要引擎是 Flink。而 Flink 与 Spark 在核心抽象上差异很大。Spark 认为数据是有界的,其核心抽象是一个有限的数据集合。而 Flink 则认为数据的本质是流,其核心抽象 DataStream 中包含的是各种对数据的操作。同时,Hudi 内部还存在多处同时操作多个 RDD,以及将一个 RDD 的处理结果与另一个 RDD 联合处理的情况,这种抽象上的区别以及实现时对于中间结果的复用,使得 Hudi 在解耦抽象上难以使用统一的 API 同时操作 RDD 和 DataStream。
3. 解耦思路
理论上,Hudi 使用 Spark 作为其计算引擎无非是为了使用 Spark 的分布式计算能力以及 RDD 丰富的算子能力。抛开分布式计算能力外,Hudi 更多是把 RDD 作为一个数据结构抽象,而 RDD 本质上又是一个有界数据集,因此,把 RDD 换成 List,在理论上完全可行(当然,可能会牺牲些性能)。为了尽可能保证 Hudi Spark 版本的性能和稳定性。我们可以保留将有界数据集作为基本操作单位的设定,Hudi 主要操作 API 不变,将 RDD 抽取为一个泛型,Spark 引擎实现仍旧使用 RDD,其他引擎则根据实际情况使用 List 或者其他有界数据集。
解耦原则:
1)统一泛型。Spark API 用到的 JavaRDD,JavaRDD,JavaRDD 统一使用泛型 I,K,O 代替;
2)去 Spark 化。抽象层所有 API 必须与 Spark 无关。涉及到具体操作难以在抽象层实现的,改写为抽象方法,引入 Spark 子类实现。
例如:Hudi 内部多处使用到了 JavaSparkContext#map() 方法,去 Spark 化,则需要将 JavaSparkContext 隐藏,针对该问题我们引入了 HoodieEngineContext#map() 方法,该方法会屏蔽 map 的具体实现细节,从而在抽象成实现去 Spark 化。
3)抽象层尽量减少改动,保证 Hudi 原版功能和性能;
4)使用 HoodieEngineContext 抽象类替换 JavaSparkContext,提供运行环境上下文。
4.Flink 集成设计
Hudi 的写操作在本质上是批处理,DeltaStreamer 的连续模式是通过循环进行批处理实现的。为使用统一 API,Hudi 集成 Flink 时选择攒一批数据后再进行处理,最后统一进行提交(这里 Flink 我们使用 List 来攒批数据)。
攒批操作最容易想到的是通过使用时间窗口来实现,然而,使用窗口,在某个窗口没有数据流入时,将没有输出数据,Sink 端难以判断同一批数据是否已经处理完。因此我们使用 Flink 的检查点机制来攒批,每两个 Barrier 之间的数据为一个批次,当某个子任务中没有数据时,mock 结果数据凑数。这样在 Sink 端,当每个子任务都有结果数据下发时即可认为一批数据已经处理完成,可以执行 commit。
DAG 如下:
- source 接收 Kafka 数据,转换成 List;
- InstantGeneratorOperator 生成全局唯一的 instant.当上一个 instant 未完成或者当前批次无数据时,不创建新的 instant;
- KeyBy partitionPath 根据 partitionPath 分区,避免多个子任务写同一个分区;
- WriteProcessOperator 执行写操作,当当前分区无数据时,向下游发送空的结果数据凑数;
- CommitSink 接收上游任务的计算结果,当收到 parallelism 个结果时,认为上游子任务全部执行完成,执行 commit.
注:InstantGeneratorOperator 和 WriteProcessOperator 均为自定义的 Flink 算子,InstantGeneratorOperator 会在其内部阻塞检查上一个 instant 的状态,保证全局只有一个 inflight(或 requested)状态的 instant.WriteProcessOperator 是实际执行写操作的地方,其写操作在 checkpoint 时触发。
5. 实现示例
1) HoodieTable
/** * Abstract implementation of a HoodieTable. * * @param <T> Sub type of HoodieRecordPayload * @param <I> Type of inputs * @param <K> Type of keys * @param <O> Type of outputs */ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implements Serializable { protected final HoodieWriteConfig config; protected final HoodieTableMetaClient metaClient; protected final HoodieIndex<T, I, K, O> index; public abstract HoodieWriteMetadata<O> upsert(HoodieEngineContext context, String instantTime, I records); public abstract HoodieWriteMetadata<O> insert(HoodieEngineContext context, String instantTime, I records); public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, String instantTime, I records, Option<BulkInsertPartitioner<I>> bulkInsertPartitioner); ...... }
HoodieTable 是 Hudi 的核心抽象之一,其中定义了表支持的 insert,upsert,bulkInsert 等操作。以 upsert 为例,输入数据由原先的 JavaRDD inputRdds 换成了 I records, 运行时 JavaSparkContext jsc 换成了 HoodieEngineContext context.
从类注释可以看到 T,I,K,O 分别代表了 Hudi 操作的负载数据类型、输入数据类型、主键类型以及输出数据类型。这些泛型将贯穿整个抽象层。
2) HoodieEngineContext
/** * Base class contains the context information needed by the engine at runtime. It will be extended by different * engine implementation if needed. */ public abstract class HoodieEngineContext { public abstract <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism); public abstract <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism); public abstract <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism); ...... }
HoodieEngineContext 扮演了 JavaSparkContext 的角色,它不仅能提供所有 JavaSparkContext 能提供的信息,还封装了 map,flatMap,foreach 等诸多方法,隐藏了 JavaSparkContext#map(),JavaSparkContext#flatMap(),JavaSparkContext#foreach() 等方法的具体实现。
以 map 方法为例,在 Spark 的实现类 HoodieSparkEngineContext 中,map 方法如下:
@Override public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) { return javaSparkContext.parallelize(data, parallelism).map(func::apply).collect(); }
在操作 List 的引擎中其实现可以为(不同方法需注意线程安全问题,慎用 parallel()):
@Override public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) { return data.stream().parallel().map(func::apply).collect(Collectors.toList()); }
注:map 函数中抛出的异常,可以通过包装 SerializableFunction func 解决.
这里简要介绍下 SerializableFunction:
@FunctionalInterface public interface SerializableFunction<I, O> extends Serializable { O apply(I v1) throws Exception; }
该方法实际上是 java.util.function.Function 的变种,与java.util.function.Function 不同的是 SerializableFunction 可以序列化,可以抛异常。引入该函数是因为 JavaSparkContext#map() 函数能接收的入参必须可序列,同时在hudi的逻辑中,有多处需要抛异常,而在 Lambda 表达式中进行 try catch 代码会略显臃肿,不太优雅。
6.现状和后续计划
6.1 工作时间轴
2020 年 4 月,T3 出行(杨华@vinoyang,王祥虎@wangxianghu)和阿里巴巴的同学(李少锋@leesf)以及若干其他小伙伴一起设计、敲定了该解耦方案;
2020 年 4 月,T3 出行(王祥虎@wangxianghu)在内部完成了编码实现,并进行了初步验证,得出方案可行的结论;
2020 年 7 月,T3 出行(王祥虎@wangxianghu)将该设计实现和基于新抽象实现的 Spark 版本推向社区(HUDI-1089);
2020 年 9 月 26 日,顺丰科技基于 T3 内部分支修改完善的版本在 Apache Flink Meetup(深圳站)公开 PR, 使其成为业界第一个在线上使用 Flink 将数据写 Hudi 的企业。
2020 年 10 月 2 日,HUDI-1089 合并入 Hudi 主分支,标志着 Hudi-Spark 解耦完成。
6.2 后续计划
1)推进 Hudi 和 Flink 集成
将 Flink 与 Hudi 的集成尽快推向社区,初期该特性可能只支持 Kafka 数据源。
2)性能优化
为保证 Hudi-Spark 版本的稳定性和性能,此次解耦没有太多考虑 Flink 版本可能存在的性能问题。
3)类 flink-connector-hudi 第三方包开发
将 Hudi-Flink 的绑定做成第三方包,用户可以在 Flink 应用中以编码方式读取任意数据源,通过这个第三方包写入 Hudi。
更多 Flink 技术交流可加入 Apache Flink 社区钉钉交流群:
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
基于 Flink + Hive 构建流批一体准实时数仓
基于 Hive 的离线数仓往往是企业大数据生产系统中不可缺少的一环。Hive 数仓有很高的成熟度和稳定性,但由于它是离线的,延时很大。在一些对延时要求比较高的场景,需要另外搭建基于 Flink 的实时数仓,将链路延时降低到秒级。但是一套离线数仓加一套实时数仓的架构会带来超过两倍的资源消耗,甚至导致重复开发。 想要搭建流式链路就必须得抛弃现有的 Hive 数仓吗?并不是,借助 Flink 可以实现已有的 Hive 离线数仓准实时化。本文整理自 Apache Flink Committer、阿里巴巴技术专家李劲松的分享,文章将分析当前离线数仓实时化的难点,详解 Flink 如何解决 Hive 流批一体准实时数仓的难题,实现更高效、合理的资源配置。文章大纲如下: 离线数仓实时化的难点 Flink 在流批一体的探索 构建流批一体准实时数仓应用实践 离线数仓实时化的难点 离线数仓 上图是一个典型的离线数仓,假设现在公司有一个需求,目前公司的数据量很大,需要每天出一个报表且输出到业务数据库中。首先是刚入库的业务数据,大致分为两种,一种是 MySQL 的 binlog,另外一种是业务系统中的业务打点...
- 下一篇
问答题:如何构建一套满足GPT-3的存储系统?
这几天GPT-3成为人工智能甚至整个科技圈最为热门的话题。作为著名人工智能科研公司 OpenAI 开发的文字生成 (text generation) 人工智能,GPT-3的相关论文在2020年5月份就已经发表,由于使用了45TB的数据,并采用了天文数字级别的1,750亿参数量而引起极大轰动。现在,GPT-3开始开放申请,获得资格的人将通过API来使用GPT-3。如果说软件定义一切,那么API就在定义软件。一些人在使用了GPT-3之后,对其赞不绝口:嗯,真香! 比如用GPT3做的这个页面生成器,只需要输入“给我一个长得像西瓜的按钮”,GPT3就会很快输出一个看上去真的很像西瓜的按钮。 从目前的应用来说,GPT-3 更像是一个更懂你的新的搜索引擎,传统的搜索引擎只是将信息归类后进行展现,而GPT-3 则是将信息进行了加工。仅就45TB的数据而论,如果一部电影按照2G大小来算的话,那么45T的数据相当于23000多部电影,每次GPT-3都相当于将这23000多部电影看一遍,然后写出一篇“影评”。 而写出这篇“影评”不是依靠作者的构思,而是依靠算力。算力可以看作是单位时间内的计算能力。从计算机...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
-
Docker使用Oracle官方镜像安装(12C,18C,19C)
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8编译安装MySQL8.0.19
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
推荐阅读
最新文章
- CentOS关闭SELinux安全模块
- Hadoop3单机部署,实现最简伪集群
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2全家桶,快速入门学习开发网站教程
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- MySQL8.0.19开启GTID主从同步CentOS8
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2整合Redis,开启缓存,提高访问速度