Databend 源码阅读: Storage 概况和 Read Partitions
作者:张祖前
Databend Labs 成员,数据库研发工程师
❤️ 友情提示:代码演进较快,请注意文档的时效性哦!
引言
Databend 将存储引擎抽象成一个名为 Table
的接口,源码位于 query/catalog/src/table.rs
。
Table
接口定义了 read
、append
、alter
、optimize
、truncate
以及 recluster
等方法,负责数据的读写和变更。解释器(interpreter)通过调用 Table trait
的方法生成物理执行的 pipeline
。
通过实现 Table
接口的方法,可以定义 Databend 的存储引擎,不同的实现对应不同的引擎。
Storage 主要关注 Table
接口的具体实现,涉及表的元信息,索引信息的管理,以及与底层 IO 的交互。
目录
包名 | 作用 |
---|---|
common/cache | 定义与管理缓存,包括磁盘缓存和内存缓存。类型包含表 meta 缓存、查询结果缓存、表数据缓存等。 |
common/index | 定义与使用索引,目前支持 bloom filter index、page index、range index。 |
common/locks | 管理与使用锁,支持表级别的锁。 |
common/pruner | 分区剪裁算法,包括 internal column pruner、limiter pruner、page pruner、topn pruner、range pruner。 |
common/table_meta | 表 meta 的数据结构定义。 |
hive | hive 表的交互 |
iceberg | iceberg 交互 |
information_schema、system | 系统表定义 |
memory、null、random | 用于开发和测试的引擎 |
view | 视图相关 |
stage | stage 数据源的读取 |
parquet | 把 parquet 文件作为数据源 |
fuse | fuse 引擎模块 |
fuse/src/io | table meta、index、block 的读写 IO 交互 |
fuse/src/pruning | fuse 分区裁剪 |
fuse/src/statistics | column statistics 和 cluster statistics 等统计信息 |
fuse/src/table_functions | table function 实现 |
fuse/src/operation | fuse 引擎对 table trait 方法的具体实现。并包含了如 ReadSource、CommitSink 等 processor 算子的定义 |
Read Partitions
以下以 fuse 引擎中 read partitions 的实现流程为例,简要分析 Storage 相关源码。
Partitions 的定义位于 query/catalog/src/plan/partition.rs
。
pub struct Partitions { // partitions 的分发类型。 pub kind: PartitionsShuffleKind, // 一组实现了 PartInfo 接口的 partition, pub partitions: Vec<PartInfoPtr>, // partitions 是否为 lazy。 pub is_lazy: bool, }
Table 接口中的 read_partitions
通过分析查询中的过滤条件,剪裁掉不需要的分区,返回可能满足条件的 Partitions。
#[async_trait::async_trait] impl Table for FuseTable { #[minitrace::trace] #[async_backtrace::framed] async fn read_partitions( &self, ctx: Arc<dyn TableContext>, push_downs: Option<PushDownInfo>, dry_run: bool, ) -> Result<(PartStatistics, Partitions)> { self.do_read_partitions(ctx, push_downs, dry_run).await } }
Fuse 引擎会以 segment 为单位构建 lazy 类型的 FuseLazyPartInfo
。通过这种方式,prune_snapshot_blocks
可以下推到 pipeline 初始化阶段执行,特别是在分布式集群模式下,可以有效提高剪裁执行效率。
pub struct FuseLazyPartInfo { // segment 在 snapshot 中的索引位置。 pub segment_index: usize, pub segment_location: Location, }
分区剪裁流程的实现位于 query/storages/fuse/src/pruning/fuse_pruner.rs
文件中,具体流程如下:
- 基于
push_downs
条件构造各类剪裁器(pruner),并实例化FusePruner
。 - 调用
FusePruner
中的pruning
方法,创建max_concurrency
个分批剪裁任务。每个批次包括多个 segment 位置,首先根据internal_column_pruner
筛选出无需的 segments,再读取SegmentInfo
,并根据 segment 级别的MinMax
索引进行范围剪裁。 - 读取过滤后的
SegmentInfo
中的BlockMetas
,并按照internal_column_pruner
、limit_pruner
、range_pruner
、bloom_pruner
、page_pruner
等算法的顺序,剔除无需的 blocks。 - 执行
TopNPrunner
进行过滤,从而得到最终剪裁后的block_metas
。
pub struct FusePruner { max_concurrency: usize, pub table_schema: TableSchemaRef, pub pruning_ctx: Arc<PruningContext>, pub push_down: Option<PushDownInfo>, pub inverse_range_index: Option<RangeIndex>, pub deleted_segments: Vec<DeletedSegmentInfo>, } pub struct PruningContext { pub limit_pruner: Arc<dyn Limiter + Send + Sync>, pub range_pruner: Arc<dyn RangePruner + Send + Sync>, pub bloom_pruner: Option<Arc<dyn BloomPruner + Send + Sync>>, pub page_pruner: Arc<dyn PagePruner + Send + Sync>, pub internal_column_pruner: Option<Arc<InternalColumnPruner>>, // Other Fields ... } impl FusePruner { pub async fn pruning( &mut self, mut segment_locs: Vec<SegmentLocation>, delete_pruning: bool, ) -> Result<Vec<(BlockMetaIndex, Arc<BlockMeta>)>> { ... } }
剪裁结束后,以 Block 为单位构造 FusePartInfo
,生成 partitions
,接着调用 set_partitions
方法将 partitions
注入 QueryContext
的分区队列中。在执行任务时,可以通过 get_partition
方法从队列中取出。
pub struct FusePartInfo { pub location: String, pub create_on: Option<DateTime<Utc>>, pub nums_rows: usize, pub columns_meta: HashMap<ColumnId, ColumnMeta>, pub compression: Compression, pub sort_min_max: Option<(Scalar, Scalar)>, pub block_meta_index: Option<BlockMetaIndex>, }
Conclusion
Databend 的存储引擎设计采用了抽象接口的方式,具有高度的可扩展性,可以很方便地支持多种不同的存储引擎。Storage 模块的主要职责是实现 Table 接口的方法,其中 Fuse 引擎部分尤为关键。
通过对数据的并行处理,以及数据剪裁等手段,可以有效地提高数据的处理效率。鉴于篇幅限制,本文仅对读取分区的流程进行了简单阐述,更深入的解析将在后续的文章中逐步展开。
关于 Databend
Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。
👨💻 Databend Cloud:databend.cn
📖 Databend 文档:databend.rs/
💻 Wechat:Databend
✨ GitHub:github.com/datafuselab…
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
微软推出全新“Windows App”
微软在Ignite 2023 上宣布推出一款适用于 iOS、iPadOS、Web 和 Windows 的全新“Windows App”,目前正以预览版形式提供。 用户可以通过该 APP 连接 Windows 365、Azure 虚拟桌面、Microsoft Dev Box 或个人远程桌面 PC。在手机或 iPad 上,用户无需下载或安装任何内容即可启动 Windows 环境。 新的 Windows APP 带来了多项新功能:如支持多显示器、自定义显示分辨率、动态显示缩放、网络摄像头、音频和打印机的设备重定向等。用户可以固定最喜欢的应用程序,以便快速访问,还可以在不同账户之间轻松切换。 虽然微软官方表示该应用程序仅上架 iOS、Windows、macOS 以及网页端,但提供的屏幕截图显示 Android平板上也能运行该应用。或许 Android 版本将会在不久的未来推出。 不过,该 Windows APP 仅面向微软企业用户提供,个人账户无法访问相关服务。但有迹象表明,微软计划通过 Windows 365 为普通消费者提供云 PC。
- 下一篇
你真的了解@Async吗? | 京东云技术团队
使用场景: 开发中会碰到一些耗时较长或者不需要立即得到执行结果的逻辑,比如消息推送、商品同步等都可以使用异步方法,这时我们可以用到@Async。但是直接使用 @Async 会有风险,当我们没有指定线程池时,他会默认使用其Spring自带的 SimpleAsyncTaskExecutor 线程池,会不断的创建线程,当并发大的时候会严重影响性能。所以可以将异步指定线程池使用 简介: @Async是Spring的注解,可以加在类或方法上。通俗的来讲,如果加上了这个注解,那么该类或者该方法在使用时将会进行异步处理,也就是创建一个线程来实现这个类或者方法,实现多线程。 线程池的执行顺序: 两种使用方式: 第一种: 使用的是Spring默认的线程池SimpleAsyncTaskExecutor。 接入步骤: 1.需要在@SpringBootApplication启动类或者@configure注解类上 添加注解@EnableAsync启动多线程注解。 2.在需要异步执行的方法上添加@Async注解。 默认的线程池配置: 如果需要修改默认的配置可以在yaml或者properties中添加,修改默认配置...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7安装Docker,走上虚拟化容器引擎之路
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS8编译安装MySQL8.0.19
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7