Apache Hudi 在袋鼠云数据湖平台的设计与实践
在大数据处理中,实时数据分析是一个重要的需求。随着数据量的不断增长,对于实时分析的挑战也在不断加大,传统的批处理方式已经不能满足实时数据处理的需求,需要一种更加高效的技术来解决这个问题。Apache Hudi(Hadoop Upserts Deletes and Incremental Processing)就是这样一种技术,提供了高效的实时数据仓库管理功能。
本文将介绍袋鼠云基于 Hudi 构建数据湖的整体方案架构及其在实时数据仓库处理方面的特点,并且为大家展示一个使用 Apache Hudi 的简单示例,便于新手上路。
Apache Hudi 介绍
Apache Hudi 是一个开源的数据湖存储系统,可以在 Hadoop 生态系统中提供实时数据仓库处理功能。Hudi 最早由 Uber 开发,后来成为 Apache 顶级项目。
Hudi 主要特性
· 支持快速插入和更新操作,以便在数据仓库中实时处理数据;
· 提供增量查询功能,可有效提高数据分析效率;
· 支持时间点查询,以便查看数据在某一时刻的状态;
· 与 Apache Spark、Hive 等大数据分析工具兼容。
Hudi 架构
Apache Hudi 的架构包括以下几个主要组件:
· Hudi 数据存储:Hudi 数据存储是 Hudi 的核心组件,负责存储数据,数据存储有两种类型:Copy-On-Write(COW)和 Merge-On-Read(MOR);
· Copy-On-Write:COW 存储类型会在对数据进行更新时,创建一个新的数据文件副本,将更新的数据写入副本中,之后,新的数据文件副本会替换原始数据文件;
· Merge-On-Read:MOR 存储类型会在查询时,将更新的数据与原始数据进行合并,这种方式可以减少数据存储的写入延迟,但会增加查询的计算量;
· Hudi 索引:Hudi 索引用于维护数据记录的位置信息,索引有两种类型:内置索引(如 Bloom 过滤器)和外部索引(如 HBase 索引);
· Hudi 查询引擎:Hudi 查询引擎负责处理查询请求,Hudi 支持多种查询引擎,如 Spark SQL、Hive、Presto 等。
Hudi 的使用场景
Apache Hudi 可以帮助企业和组织实现实时数据处理和分析。实时数据处理需要快速地处理和查询数据,同时还需要保证数据的一致性和可靠性。
Apache Hudi 的增量数据处理、ACID 事务性保证、写时合并等技术特性可以帮助企业更好地实现实时数据处理和分析,基于 Hudi 的特性可以在一定程度上在实时数仓的构建过程中承担上下游数据链路的对接(类似 Kafka 的角色)。既能实现增量的数据处理,也能为批流一体的处理提供存储基础。
Hudi 的优势和劣势
● 优势
· 高效处理大规模数据集;
· 支持实时数据更新和查询;
· 实现了增量写入机制,提高了数据访问效率;
· Hudi 可以与流处理管道集成;
· Hudi 提供了时间旅行功能,允许回溯数据的历史版本。
● 劣势
· 在读写数据时需要付出额外的代价;
· 操作比较复杂,需要使用专业的编程语言和工具。
Hudi 在袋鼠云数据湖平台上的实践
Hudi 在袋鼠云数据湖的技术架构
· 元数据的接入,让用户可以快速的对表进行管理;
· 数据快速接入,包括对符合条件的原有表数据进行转换,快速搭建数据湖能力;
· 湖表的管理,监控小文件定期进行合并,提升表的查询性能,内在丰富的表操作功能,包括 time travel ,孤儿文件清理,过期快照清理等;
· 索引构建,提供多种索引包括 bloom filter,zorder 等,提升计算引擎的查询性能。
Hudi 使用示例
在介绍了 Hudi 的基本信息和袋鼠云数据湖平台的结构之后,我们来看一个使用示例,替换 Flink 在内存中的 join 过程。
在 Flink 中对多流 join 往往是比较头疼的场景,需要考虑 state ttl 时间设置,设置太小数据经常关联不上,设置太大内存又需要很高才能保留,我们通过 Hudi 的方式来换个思路实现。
● 构建 catalog
public String createCatalog(){ String createCatalog = "CREATE CATALOG hudi_catalog WITH (\n" + " 'type' = 'hudi',\n" + " 'mode' = 'hms',\n" + " 'default-database' = 'default',\n" + " 'hive.conf.dir' = '/hive_conf_dir',\n" + " 'table.external' = 'true'\n" + ")"; return createCatalog; }
● 创建 hudi 表
public String createHudiTable(){ String createTable = "CREATE TABLE if not exists hudi_catalog.flink_db.test_hudi_flink_join_2 (\n" + " id int ,\n" + " name VARCHAR(10),\n" + " age int ,\n" + " address VARCHAR(10),\n" + " dt VARCHAR(10),\n" + " primary key(id) not enforced\n" + ")\n" + "PARTITIONED BY (dt)\n" + "WITH (\n" + " 'connector' = 'hudi',\n" + " 'table.type' = 'MERGE_ON_READ',\n" + " 'changelog.enabled' = 'true',\n" + " 'index.type' = 'BUCKET',\n" + " 'hoodie.bucket.index.num.buckets' = '2',\n" + String.format(" '%s' = '%s',\n", FlinkOptions.PRECOMBINE_FIELD.key(), FlinkOptions.NO_PRE_COMBINE) + " 'write.payload.class' = '" + PartialUpdateAvroPayload.class.getName() + "'\n" + ");"; return createTable; }
● 更新 hudi 表的 flink_db.test_hudi_flink_join_2 的 id, name, age, dt 列
01 从 kafka 中读取 topic1
public String createKafkaTable1(){ String kafkaSource1 = "CREATE TABLE source1\n" + "(\n" + " id INT,\n" + " name STRING,\n" + " age INT,\n" + " dt String,\n" + " PROCTIME AS PROCTIME()\n" + ") WITH (\n" + " 'connector' = 'kafka'\n" + " ,'topic' = 'join_topic1'\n" + " ,'properties.bootstrap.servers' = 'localhost:9092'\n" + " ,'scan.startup.mode' = 'earliest-offset'\n" + " ,'format' = 'json'\n" + " ,'json.timestamp-format.standard' = 'SQL'\n" + " )"; return kafkaSource1; }
02 从 kafka 中读取 topic2
public String createKafkaTable2(){ String kafkaSource2 = "CREATE TABLE source2\n" + "(\n" + " id INT,\n" + " name STRING,\n" + " address string,\n" + " dt String,\n" + " PROCTIME AS PROCTIME()\n" + ") WITH (\n" + " 'connector' = 'kafka'\n" + " ,'topic' = 'join_topic2'\n" + " ,'properties.bootstrap.servers' = 'localhost:9092'\n" + " ,'scan.startup.mode' = 'earliest-offset'\n" + " ,'format' = 'json'\n" + " ,'json.timestamp-format.standard' = 'SQL'\n" + " )"; return kafkaSource2; }
● 执行插入逻辑1
String insertSQL = "insert into hudi_catalog.flink_db.test_hudi_flink_join_2(id,name,age,dt) " + "select id, name,age,dt from source1";
● 通过 spark 查询数据
20230323090605515 20230323090605515_1_186 45 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 45 xc 45 NULL 1
20230323090605515 20230323090605515_1_179 30 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 30 xc 30 NULL 1
● 执行插入逻辑2
String insertSQL = "insert into hudi_catalog.flink_db.test_hudi_flink_join_2(id,name,address,dt) " + "select id, name, address,dt from source2";
● 运行成功
运行成功后在 spark 中查询对应的表数据:
20230323090605515 20230323090605515_1_186 45 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 45 xc 45 xc:address45 1
20230323090605515 20230323090605515_1_179 30 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 30 xc 30 xc:address30 1
可以发现在第二次数据运行之后,表数据的对应字段 address 已经更新,达到了类似在 Flink 中直接执行 join 的效果。
insert into hudi_catalog.flink_db.test_hudi_flink_join_2 select a.id, a.name, a.age,b.address a.dt from source1 a left join source2 b on a.id = b.id
《数栈产品白皮书》:https://www.dtstack.com/resources/1004?src=szsm
《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001?src=szsm 想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=szkyzg
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术qun」,交流最新开源技术信息,qun号码:30537511,项目地址:https://github.com/DTStack

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
开发最佳实践|集成声网 iOS SDK,实现语音聊天室
大家好,我是声网 RTE 开发者社区作者 @小曾同学。本次主要分享集成声网SDK实现语音聊天室。 01 前言 在日常生活中经常会看到一些聊天场景,比如在线KTV、连麦开黑、多人相亲、娱乐聊天室等应用场景,随着移动应用开发的需求不断增加,多人语音聊天室成为了一个热门的应用领域。那么聊天室该如何实现呢?你是想从0到1,还是集成第三方SDK呢?答案当然是集成第三方SDK,那么我们这篇文章就来教大家集成声网SDK实现一个语音聊天室Demo。 02 设计思路 在做产品之前需要明确需求,本次需求:实现语音聊天室Demo; 在确定需求之后,还需要对音视频这块有一定的了解,可以参考声网官网提供的音视频时序图,本次我们要实现的是多人语聊房,实现原理可以参考音视频的实现,音频通话不区分主播和观众,所有用户都是主播角色。 了解上述逻辑之后,设计Demo原型图,一个聊天室的构成基本上包含:输入房间名、加房、麦克风、用户界面等。我们本次主要实现一个简易聊天室demo,用户输入房间加入房间后,即可和远端用户保持通话,并可mute/unmute本地麦克风。设计如下 另外,在实现demo之前你需要一些准备工作,可参见...
- 下一篇
百度工程师移动开发避坑指南——Swift语言篇
作者 | 启明星小组 上一篇我们介绍了移动开发常见的内存泄漏问题,见《百度工程师移动开发避坑指南——内存泄漏篇》。本篇我们将介绍Swift语言部分常见问题。 对于Swift开发者,Swift较于OC一个很大的不同就是引入了可选类型(Optional),刚接触Swift的开发者很容易在相关代码上踩坑。 本期我们带来与Swift可选类型相关的几个避坑指南:可选类型要判空;避免使用隐式解包可选类型;合理使用Objective-C标识符;谨慎使用强制类型转换。希望能对Swift开发者有所帮助。 一、可选类型(Optional)要判空 在Objective-C中,可以使用nil来表示对象为空,但是使用一个为nil的对象通常是不安全的,如果使用不慎会出现崩溃或者其它异常问题。在Swift中,开发者可以使用可选类型表示变量有值或者没有值,可以更加清晰的表达类型是否可以安全的使用。如果一个变量可能为空,那么在声明时可以使用?来表示,使用前需要进行解包。例如: var optionalString: String? 在使用可选类型对象时,需要进行解包操作,有两种解包方式:强制解包与可选绑定。 强制解包使...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2整合Redis,开启缓存,提高访问速度
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- MySQL8.0.19开启GTID主从同步CentOS8
- Mario游戏-低调大师作品
- Linux系统CentOS6、CentOS7手动修改IP地址
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7安装Docker,走上虚拟化容器引擎之路
- Docker快速安装Oracle11G,搭建oracle11g学习环境