Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
作者:SelectDB 技术团队
导读:湖仓一体(Data Lakehouse)融合了数据仓库的高性能、实时性以及数据湖的低成本、灵活性等优势,帮助用户更加便捷地满足各种数据处理分析的需求。在过去多个版本中,Apache Doris 持续加深与数据湖的融合,已演进出一套成熟的湖仓一体解决方案。为便于用户快速入门,我们将通过系列文章介绍 Apache Doris 与各类主流数据湖格式及存储系统的湖仓一体架构搭建指南,包括 Hudi、Iceberg、Paimon、OSS、Delta Lake、Kudu、BigQuery 等,欢迎持续关注。
作为一种全新的开放式的数据管理架构,湖仓一体(Data Lakehouse)融合了数据仓库的高性能、实时性以及数据湖的低成本、灵活性等优势,帮助用户更加便捷地满足各种数据处理分析的需求,在企业的大数据体系中已经得到越来越多的应用。
在过去多个版本中,Apache Doris 持续加深与数据湖的融合,当前已演进出一套成熟的湖仓一体解决方案。
- 自 0.15 版本起,Apache Doris 引入 Hive 和 Iceberg 外部表,尝试在 Apache Iceberg 之上探索与数据湖的能力结合。
- 自 1.2 版本起,Apache Doris 正式引入 Multi-Catalog 功能,实现了多种数据源的自动元数据映射和数据访问、并对外部数据读取和查询执行等方面做了诸多性能优化,完全具备了构建极速易用 Lakehouse 架构的能力。
- 在 2.1 版本中,Apache Doris 湖仓一体架构得到全面加强,不仅增强了主流数据湖格式(Hudi、Iceberg、Paimon 等)的读取和写入能力,还引入了多 SQL 方言兼容、可从原有系统无缝切换至 Apache Doris。在数据科学及大规模数据读取场景上, Doris 集成了 Arrow Flight 高速读取接口,使得数据传输效率实现 100 倍的提升。
Apache Doris + Apache Hudi
Apache Hudi 是目前最主流的开放数据湖格式之一,也是事务性的数据湖管理平台,支持包括 Apache Doris 在内的多种主流查询引擎。Apache Doris 同样对 Apache Hudi 数据表的读取能力进行了增强:
- Copy on Write Table: Snapshot Query
- Merge on Read Table:Snapshot Queries, Read Optimized Queries
- 支持 Time Travel
- 支持 Incremental Read
凭借 Apache Doris 的高性能查询执行以及 Apache Hudi 的实时数据管理能力,可以实现高效、灵活、低成本的数据查询和分析,同时也提供了强大的数据回溯、审计和增量处理功能,当前基于 Apache Doris 和 Apache Hudi 的组合已经在多个社区用户的真实业务场景中得到验证和推广:
-
实时数据分析与处理:比如金融行业交易分析、广告行业实时点击流分析、电商行业用户行为分析等常见场景下,都要求实时的数据更新及查询分析。Hudi 能够实现对数据的实时更新和管理,并保证数据的一致性和可靠性,Doris 则能够实时高效处理大规模数据查询请求,二者结合能够充分满足实时数据分析与处理的需求。
-
数据回溯与审计:对于金融、医疗等对数据安全和准确性要求极高的行业来说,数据回溯和审计是非常重要的功能。Hudi 提供了时间旅行(Time Travel)功能,允许用户查看历史数据状态,结合 Apache Doris 高效查询能力,可快速查找分析任何时间点的数据,实现精确的回溯和审计。
-
增量数据读取与分析:在进行大数据分析时往往面临着数据规模庞大、更新频繁的问题,Hudi 支持增量数据读取,这使得用户可以只需处理变化的数据,不必进行全量数据更新;同时 Apache Doris 的 Incremental Read 功能也可使这一过程更加高效,显著提升了数据处理和分析的效率。
-
跨数据源联邦查询:许多企业数据来源复杂,数据可能存储在不同的数据库中。Doris 的 Multi-Catalog 功能支持多种数据源的自动映射与同步,支持跨数据源的联邦查询。这对于需要从多个数据源中获取和整合数据进行分析的企业来说,极大地缩短了数据流转路径,提升了工作效率。
本文将在 Docker 环境下,为读者介绍如何快速搭建 Apache Doris + Apache Hudi 的测试及演示环境,并对各功能操作进行演示,帮助读者快速入门。
使用指南
本文涉及所有脚本和代码可以从该地址获取:https://github.com/apache/doris/tree/master/samples/datalake/hudi
01 环境准备
本文示例采用 Docker Compose 部署,组件及版本号如下:
02 环境部署
- 创建 Docker 网络
sudo docker network create -d bridge hudi-net
- 启动所有组件
sudo ./start-hudi-compose.sh
- 启动后,可以使用如下脚本,登陆 Spark 命令行或 Doris 命令行:
sudo ./login-spark.sh sudo ./login-doris.sh
03 数据准备
接下来先通过 Spark 生成 Hudi 的数据。如下方代码所示,集群中已经包含一张名为 customer
的 Hive 表,可以通过这张 Hive 表,创建一个 Hudi 表:
-- ./login-spark.sh spark-sql> use default; -- create a COW table spark-sql> CREATE TABLE customer_cow USING hudi TBLPROPERTIES ( type = 'cow', primaryKey = 'c_custkey', preCombineField = 'c_name' ) PARTITIONED BY (c_nationkey) AS SELECT * FROM customer; -- create a MOR table spark-sql> CREATE TABLE customer_mor USING hudi TBLPROPERTIES ( type = 'mor', primaryKey = 'c_custkey', preCombineField = 'c_name' ) PARTITIONED BY (c_nationkey) AS SELECT * FROM customer;
04 数据查询
如下所示,Doris 集群中已经创建了名为 hudi
的 Catalog(可通过 HOW CATALOGS
查看)。以下为该 Catalog 的创建语句:
-- 已经创建,无需再次执行 CREATE CATALOG `hive` PROPERTIES ( "type"="hms", 'hive.metastore.uris' = 'thrift://hive-metastore:9083', "s3.access_key" = "minio", "s3.secret_key" = "minio123", "s3.endpoint" = "http://minio:9000", "s3.region" = "us-east-1", "use_path_style" = "true" );
- 手动刷新该 Catalog,对创建的 Hudi 表进行同步:
-- ./login-doris.sh doris> REFRESH CATALOG hive;
- 使用 Spark 操作 Hudi 中的数据,都可以在 Doris 中实时可见,不需要再次刷新 Catalog。我们通过 Spark 分别给 COW 和 MOR 表插入一行数据:
spark-sql> insert into customer_cow values (100, "Customer#000000100", "jD2xZzi", "25-430-914-2194", 3471.59, "BUILDING", "cial ideas. final, furious requests", 25); spark-sql> insert into customer_mor values (100, "Customer#000000100", "jD2xZzi", "25-430-914-2194", 3471.59, "BUILDING", "cial ideas. final, furious requests", 25);
- 通过 Doris 可以直接查询到最新插入的数据:
doris> use hive.default; doris> select * from customer_cow where c_custkey = 100; doris> select * from customer_mor where c_custkey = 100;
- 再通过 Spark 插入
c_custkey=32
已经存在的数据,即覆盖已有数据:
spark-sql> insert into customer_cow values (32, "Customer#000000032_update", "jD2xZzi", "25-430-914-2194", 3471.59, "BUILDING", "cial ideas. final, furious requests", 15); spark-sql> insert into customer_mor values (32, "Customer#000000032_update", "jD2xZzi", "25-430-914-2194", 3471.59, "BUILDING", "cial ideas. final, furious requests", 15);
- 通过 Doris 可以查询更新后的数据:
doris> select * from customer_cow where c_custkey = 32; +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+ | c_custkey | c_name | c_address | c_phone | c_acctbal | c_mktsegment | c_comment | c_nationkey | +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+ | 32 | Customer#000000032_update | jD2xZzi | 25-430-914-2194 | 3471.59 | BUILDING | cial ideas. final, furious requests | 15 | +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+ doris> select * from customer_mor where c_custkey = 32; +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+ | c_custkey | c_name | c_address | c_phone | c_acctbal | c_mktsegment | c_comment | c_nationkey | +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+ | 32 | Customer#000000032_update | jD2xZzi | 25-430-914-2194 | 3471.59 | BUILDING | cial ideas. final, furious requests | 15 | +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
05 Incremental Read
Incremental Read 是 Hudi 提供的功能特性之一,通过 Incremental Read,用户可以获取指定时间范围的增量数据,从而实现对数据的增量处理。对此, Doris 可对插入c_custkey=100
后的变更数据进行查询。如下所示,我们插入了一条c_custkey=32
的数据:
doris> select * from customer_cow@incr('beginTime'='20240603015018572'); +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+ | c_custkey | c_name | c_address | c_phone | c_acctbal | c_mktsegment | c_comment | c_nationkey | +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+ | 32 | Customer#000000032_update | jD2xZzi | 25-430-914-2194 | 3471.59 | BUILDING | cial ideas. final, furious requests | 15 | +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+ spark-sql> select * from hudi_table_changes('customer_cow', 'latest_state', '20240603015018572'); doris> select * from customer_mor@incr('beginTime'='20240603015058442'); +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+ | c_custkey | c_name | c_address | c_phone | c_acctbal | c_mktsegment | c_comment | c_nationkey | +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+ | 32 | Customer#000000032_update | jD2xZzi | 25-430-914-2194 | 3471.59 | BUILDING | cial ideas. final, furious requests | 15 | +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+ spark-sql> select * from hudi_table_changes('customer_mor', 'latest_state', '20240603015058442');
06 TimeTravel
Doris 支持查询指定快照版本的 Hudi 数据,从而实现对数据的 Time Travel 功能。首先,可以通过 Spark 查询两张 Hudi 表的提交历史:
spark-sql> call show_commits(table => 'customer_cow', limit => 10); 20240603033556094 20240603033558249 commit 448833 0 1 1 183 0 0 20240603015444737 20240603015446588 commit 450238 0 1 1 202 1 0 20240603015018572 20240603015020503 commit 436692 1 0 1 1 0 0 20240603013858098 20240603013907467 commit 44902033 100 0 25 18751 0 0 spark-sql> call show_commits(table => 'customer_mor', limit => 10); 20240603033745977 20240603033748021 deltacommit 1240 0 1 1 0 0 0 20240603015451860 20240603015453539 deltacommit 1434 0 1 1 1 1 0 20240603015058442 20240603015100120 deltacommit 436691 1 0 1 1 0 0 20240603013918515 20240603013922961 deltacommit 44904040 100 0 25 18751 0 0
接着,可通过 Doris 执行 c_custkey=32
,查询数据插入之前的数据快照。如下可看到 c_custkey=32
的数据还未更新:
注:Time Travel 语法暂时不支持新优化器,需要先执行
set enable_nereids_planner=false;
关闭新优化器,该问题将会在后续版本中修复。
doris> select * from customer_cow for time as of '20240603015018572' where c_custkey = 32 or c_custkey = 100; +-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+ | c_custkey | c_name | c_address | c_phone | c_acctbal | c_mktsegment | c_comment | c_nationkey | +-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+ | 32 | Customer#000000032 | jD2xZzi UmId,DCtNBLXKj9q0Tlp2iQ6ZcO3J | 25-430-914-2194 | 3471.53 | BUILDING | cial ideas. final, furious requests across the e | 15 | | 100 | Customer#000000100 | jD2xZzi | 25-430-914-2194 | 3471.59 | BUILDING | cial ideas. final, furious requests | 25 | +-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+ -- compare with spark-sql spark-sql> select * from customer_mor timestamp as of '20240603015018572' where c_custkey = 32 or c_custkey = 100; doris> select * from customer_mor for time as of '20240603015058442' where c_custkey = 32 or c_custkey = 100; +-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+ | c_custkey | c_name | c_address | c_phone | c_acctbal | c_mktsegment | c_comment | c_nationkey | +-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+ | 100 | Customer#000000100 | jD2xZzi | 25-430-914-2194 | 3471.59 | BUILDING | cial ideas. final, furious requests | 25 | | 32 | Customer#000000032 | jD2xZzi UmId,DCtNBLXKj9q0Tlp2iQ6ZcO3J | 25-430-914-2194 | 3471.53 | BUILDING | cial ideas. final, furious requests across the e | 15 | +-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+ spark-sql> select * from customer_mor timestamp as of '20240603015058442' where c_custkey = 32 or c_custkey = 100;
查询优化
Apache Hudi 中的数据大致可以分为两类 —— 基线数据和增量数据。基线数据通常是已经经过合并的 Parquet 文件,而增量数据是指由 INSERT、UPDATE 或 DELETE 产生的数据增量。基线数据可以直接读取,增量数据需要通过 Merge on Read 的方式进行读取。
对于 Hudi COW 表的查询或者 MOR 表的 Read Optimized 查询而言,其数据都属于基线数据,可直接通过 Doris 原生的 Parquet Reader 读取数据文件,且可获得极速的查询响应。而对于增量数据,Doris 需要通过 JNI 调用 Hudi 的 Java SDK 进行访问。为了达到最优的查询性能,Apache Doris 在查询时,会将一个查询中的数据分为基线和增量数据两部分,并分别使用上述方式进行读取。
为验证该优化思路,我们通过 EXPLAIN
语句来查看一个下方示例的查询中,分别有多少基线数据和增量数据。对于 COW 表来说,所有 101 个数据分片均为是基线数据(hudiNativeReadSplits=101/101
),因此 COW 表全部可直接通过 Doris Parquet Reader 进行读取,因此可获得最佳的查询性能。对于 ROW 表,大部分数据分片是基线数据(hudiNativeReadSplits=100/101
),一个分片数为增量数据,基本也能够获得较好的查询性能。
-- COW table is read natively doris> explain select * from customer_cow where c_custkey = 32; | 0:VHUDI_SCAN_NODE(68) | | table: customer_cow | | predicates: (c_custkey[#5] = 32) | | inputSplitNum=101, totalFileSize=45338886, scanRanges=101 | | partition=26/26 | | cardinality=1, numNodes=1 | | pushdown agg=NONE | | hudiNativeReadSplits=101/101 | -- MOR table: because only the base file contains `c_custkey = 32` that is updated, 100 splits are read natively, while the split with log file is read by JNI. doris> explain select * from customer_mor where c_custkey = 32; | 0:VHUDI_SCAN_NODE(68) | | table: customer_mor | | predicates: (c_custkey[#5] = 32) | | inputSplitNum=101, totalFileSize=45340731, scanRanges=101 | | partition=26/26 | | cardinality=1, numNodes=1 | | pushdown agg=NONE | | hudiNativeReadSplits=100/101 |
可以通过 Spark 进行一些删除操作,进一步观察 Hudi 基线数据和增量数据的变化:
-- Use delete statement to see more differences spark-sql> delete from customer_cow where c_custkey = 64; doris> explain select * from customer_cow where c_custkey = 64; spark-sql> delete from customer_mor where c_custkey = 64; doris> explain select * from customer_mor where c_custkey = 64;
此外,还可以通过分区条件进行分区裁剪,从而进一步减少数据量,以提升查询速度。如下示例中,通过分区条件c_nationkey = 15
进行分区裁减,使得查询请求只需要访问一个分区(partition=1/26
)的数据即可。
-- customer_xxx is partitioned by c_nationkey, we can use the partition column to prune data doris> explain select * from customer_mor where c_custkey = 64 and c_nationkey = 15; | 0:VHUDI_SCAN_NODE(68) | | table: customer_mor | | predicates: (c_custkey[#5] = 64), (c_nationkey[#12] = 15) | | inputSplitNum=4, totalFileSize=1798186, scanRanges=4 | | partition=1/26 | | cardinality=1, numNodes=1 | | pushdown agg=NONE | | hudiNativeReadSplits=3/4 |
结束语
以上是基于 Apache Doris 与 Apache Hudi 快速搭建测试 / 演示环境的详细指南,后续我们还将陆续推出 Apache Doris 与各类主流数据湖格式及存储系统构建湖仓一体架构的系列指南,包括 Iceberg、Paimon、OSS、Delta Lake 等,欢迎持续关注。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
基于Rspack实现大仓应用构建提效实践|得物技术
一、实践背景 随着项目的逐步迭代,代码量和依赖的逐渐增长,应用的构建速度逐步进入缓慢期。以目前所在团队的业务应用来看(使用webpack构建),应用整体构建耗时已经普遍偏高,影响日常开发测试的使用效率,其中编译耗时大约占50%。 实际上随着近些年前端的技术发展以及业务对前端交互体验的要求提高,前端整个代码量复杂度和代码量增长飞快。随着这一趋势的变化,服务于前端工程构建方案多年的webpack,在构建效率上已经逐渐成为瓶颈。因此业界也存在不少优化思路和方案,主要分两个方向: 基于原有Node.js语言实现,通过缓存等方案来提升构建效率,主要是缓存、预构建的方式来减少编译。此类方案多数存在条件限制,比如缓存方案前提是第一次先生成缓存来提升二次构建效率,对于发布平台等需要冷启场景无法生效。 另外一类是采用Golang、Rust等语言重新实现耗时较为复杂的编译过程,从语言层面实现编译过程的性能提升。比较有代表的有,基于Golang实现的esbuild、基于Rust实现的SWC,都在对应的场景得到不少的性能提升。 二、业界方案 既然是业界的普遍性问题,那么外界也肯定会存在不少优化案例可以借鉴或者...
- 下一篇
GaussDB关键技术原理:高性能(五)
GaussDB关键技术原理:高性能(四)从USTORE存储引擎、计划缓存计划技术、数据分区与分区剪枝、列式存储和向量化引擎、SMP并行执行等五方面对高性能关键技术进行解读,本篇将从LLVM动态查询编译执行、SQL-BYPASS执行优化、线程池化、多核处理器优化、日志无锁刷新与多级流水等方面继续介绍GaussDB高性能关键技术,并对高斯数据库性能优化进行总结。 3.11 LLVM动态查询编译执行 在传统经典执行器算子中基于遍历树的表达式计算框架,这种框架的好处是清晰明了,但是在性能上却不是最优的,主要有以下几个原因: (1)表达式计算其框架的通用性决定了其执行模式要适配各种不同的操作符和数据类型,因此在运行时要根据其表达式遍历的具体结果来确定其执行的函数和类型,对这些类型的判断要引入非常多的分支判断。 (2)表达式计算在整体的执行过程中要进行多次的函数调用,其调用的深度取决于其树的深度,这一部分也有着非常大的开销。 这两个核心原因,分支判断和函数调用同样在执行算子中也是影响性能的关键因素,为了提升其执行速度,GaussDB引入了业界著名的开源编译框架LLVM(Low Level Virt...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS8安装Docker,最新的服务器搭配容器使用
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7设置SWAP分区,小内存服务器的救世主
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Windows10,CentOS7,CentOS8安装Nodejs环境