Meetup 回顾|Data Infra 研究社第十六期(含资料发布)
本文整理于上周六(10月21日)Data Infra 第 16 期的活动内容。本次活动由 Databend 研发工程师-王旭东为大家带来了一场主题为《Databend hash join spill 设计与实现》的分享,让我们一起回顾一下吧~
以下是本次活动的相关视频、资料及文字:
通过本次分享,我们能更加了解 Databend 的 hash join spill 的设计与实现,以及学习如何使用 spill 功能。
本次活动回放也可在 B 站上找到:
🔗 https://www.bilibili.com/video/BV1pC4y137sN?t=3.7
《 Databend hash join spill 设计与实现 》
此次活动的讲稿和相关资料都可以在 Data Infra 第 16 期的 PDF 文件中找到:🔗 https://github.com/databendcn/data-infra/tree/main/第16期-20231021
Hash join 在 pipeline 架构下的设计
左侧是一个典型的两表 join plan,通过 pipeline builder 会生成右侧的 pipeline,包括 main pipeline 和一条子 pipeline ( build pipeline )。
probe pipeline 和 build pipeline 之间通过 bridge 结构关联,hash table 以及 build 和 probe 共用的一些 states 都会存在 bridge 里面,等 hash join build 侧生成 hash table 后会通过 bridge 把 hash table 给 probe 侧用。
Hash join 是多线程的,假设 build side 有 N 个 threads,probe side 有 M 个 threads。Probe 需要等待 build 完成后才能开始。因为两条 pipeline 是同时开始的,我们没法确定 build 先到达还是 probe 先到达,所以 probe 可能先于 build 发生,又因为是多线程执行,可能所有 probe 的线程都先与build 线程到达,也可能发生交错,这时提前到达的 probe 线程需要异步等待状态。
最直观的想法是用 notify 来控制 build 和 probe 之间的等待,因为是多线程的,所以考虑 notify waiters()
,但是 notify 不知道预知有多少 waiters,它只会唤醒 register 过的 waiters,在 build 和 probe 这种模式下找到合适的地方进行注册不太可能的,所以不考虑 notify 而是用 tokio 的 watch channel 来解决 Hash join 的多线程模型。
channel 中的初始值是 0,当 build 侧完成后,最后一个 build 线程把 1 发送到 channel 中来唤醒所有的 probe 线程。probe 在开始等待 build 的时候会订阅 watcher channel,得到一个 receiver,如果此时已经是 1,可以直接进行 probe, 否则就要等待 channel 中发生 change,及 build 的最后一个线程把 1 写到 channel 里。
pub async fn wait_first_round_build_done(&self) -> Result<()> { let mut rx = self.build_done_watcher.subscribe(); if *rx.borrow() == 1_u8 { return Ok(()); } rx.changed() .await .map_err(|_| ErrorCode::TokioError("build_done_watcher's sender is dropped"))?; debug_assert!(*rx.borrow() == 1_u8); Ok(()) }
梳理完 build 和 probe 之间的交互后,看一下 build 的状态。不考虑 spill 的时候,它的状态比较简单,只有三个 steps,不同的 step 对应不同的 event,触发不同的行为,有异步的有同步的,一些比较重的 IO 会进行异步,还有线程之间的等待也会异步,比如在 finalize 之前需要等所有的 threads 都完成 running step (即搜集完所有的 data )。
enum HashJoinBuildStep { // The running step of the build phase. Running, // The finalize step is waiting all build threads to finish and build the hash table. Finalize, // The fast return step indicates there is no data in build side, // so we can directly finish the following steps for hash join and return empty result. FastReturn, // Wait to spill WaitSpill, // Start the first spill FirstSpill, // Following spill after the first spill FollowSpill, // Wait probe WaitProbe, // The whole build phase is finished. Finished, }
首先所有的线程都开始运行,进入第一个 step—running,这一步主要收集 input data,到 chunk 里面,一个线程完成当前任务后需要等待其他完成,这里我们可以用 Barrier 这个 sync 结构。最后一个线程负责切分 finalize tasks 和初始化 hash table,之后所有的线程进入 finalize 阶段,并行的写 hash table。
FastReturn 是一个 fast path,如果 build side 数据为空,那么对于一些特定的 join 类型,probe 可以直接返回,不需要 probe 一个 空的 hash table。
接下来看下 probe 的状态
enum HashJoinProbeStep { // The step is to wait build phase finished. WaitBuild, // The running step of the probe phase. Running, // The final scan step is used to fill missing rows for non-inner join. FinalScan, // The fast return step indicates we can directly finish the probe phase. FastReturn, // Spill step is used to spill the probe side data. Spill, // Async running will read the spilled data, then go to probe AsyncRunning, }
第一个 step 就是我们之前提到的:等待 build 的阶段。这个阶段完成后,进入 probe 阶段。等所有的线程都完成了 probe,对于 non-inner join 要进行 Final Scan,来进行 补 NULL。
Spiller 模块的设计
Spiller 是一个比较独立的模块,也就是说不局限在某一个 operator 上,所有有 spill 需求的 operator 都可以利用 Spiller 模块完成 spill 操作。
具体来说,spiller 负责以下工作:
- 收集需要 spill 的数据
- partition 需要 spill 的数据
- 序列化和反序列化数据
- 与存储进行读写交互
每一个 partition 都有一个 file lists,通过 opendal 把对应的 files 写到存储上。
Hash join spill 设计与实现
首先看一下 build 侧,80% 的工作量都在 build 侧,probe 只需要根据 build 的 spill 信息进行 spill 就可以。
enum HashJoinBuildStep { // The running step of the build phase. Running, // The finalize step is waiting all build threads to finish and build the hash table. Finalize, // The fast return step indicates there is no data in build side, // so we can directly finish the following steps for hash join and return empty result. FastReturn, // Wait to spill WaitSpill, // Start the first spill FirstSpill, // Following spill after the first spill FollowSpill, // Wait probe WaitProbe, // The whole build phase is finished. Finished, }
引入 spill 后,build step 多了四个主要的 step,WaitSpill、FirstSpill 以及 FollowSpill 和 WaitProbe。
每个线程都有自己的 Spiller,否则这个线程的 spill 工作,不同线程的 spill 通过 BuildSpillCoordinator
来协调。
如果一个线程对当前内存数据大小进行判断,发现需要 spill 后,会进入 WaitSpill 状态,BuildSpillCoordinator
会记录当前等待 spill 的线程数量,最后一个线程不会进入等待状态,而是直接作为 coordinator,来协调第一次 spill,它会把 buffer 中所有等待 spill 的数据收集起来,进行 partition,均匀的生成 tasks,分发给每个线程,每个线程的 partition set 都是一样的。完成第一次 spill 后,之后的 spill 不需要再 buffer 数据,如果数据有对应的 partition 可以直接进行 spill,否则 buffer 起来,看后续是否还需要 spill,如果内存够用,可以直接生成 hash table。
等所有的 spill 工作完成后对内存中的数据,进行正常的 hash join build 过程,生成 hash table,通过 bridge 发给 probe 后进入 wait probe 状态。
接下来先看下 hash join probe 侧 spill 的工作,然后再回到 build。
probe 和 build 一样,每个线程都有一个 Spiller。
enum HashJoinProbeStep { // The step is to wait build phase finished. WaitBuild, // The running step of the probe phase. Running, // The final scan step is used to fill missing rows for non-inner join. FinalScan, // The fast return step indicates we can directly finish the probe phase. FastReturn, // Spill step is used to spill the probe side data. Spill, // Async running will read the spilled data, then go to probe AsyncRunning, }
有了 spill 后,当 WaitBuild 阶段结束后,就要进入 Spill 阶段了。
build 会通过 bridge 把它的 partition set 发过来,比如 {0, 1, 2 3},probe 也会利用 Spiller 对数据计算 partition,如果 partition id 在 build 的 partition set 中,会下刷,对于不在的数据,如果是第一轮,会跟 build 发送过来的 hash table 进行 probe。
spill 完成后,会选出一个 partition id,发送给 build,build 拿到 id 后,会把相关 partition 的数据读上来,进行正常的 hash join build 流程,生成 hash table 给 probe,probe 也会读取对应 id 的数据进行 probe,这就是正常的 hash join 过程。每完成一轮,就取一个 partition id,直到没有需要读取的 partition。
未来规划
- 支持递归 spill
- 应用具体的场景
- 进一步优化
Connect With Us
Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
云图说|华为云CodeArts Build,云端化的编译构建平台
阅识风云是华为云信息大咖,擅长将复杂信息多元化呈现,其出品的一张图(云图说)、深入浅出的博文(云小课)或短视频(云视厅)总有一款能让您快速上手华为云。更多精彩内容请单击此处。 本文分享自华为云社区《云图说|华为云CodeArts Build,云端化的编译构建平台》,作者:阅识风云。 互联网企业业务种类多,业务跨平台,多语言编程成为常态? 传统应用软件为本地应用,业务复杂,软件规模大,编译构建耗时长? 移动终端APP业务变化快,交付要求短平快? 对于以上的问题,又该如何解决呢? 华为云编译构建 CodeArtsBuild又放大招啦!隆重上线了适用于Web应用前台、后端应用程序,支持按需分配编译构建资源,支持Android系列移动终端APP的编译构建服务,快来解锁您的编译构建之旅吧! 点击我了解更多,华为云编译构建CodeArts Build服务等着您! 点击关注,第一时间了解华为云新鲜技术~
- 下一篇
灵活、可用、高扩展,EasyMR 带来全新 Yarn 的队列管理功能及可视化配置
YARN(Yet Another Resource Negotiator)是 Hadoop 生态系统中的资源调度器,主要用于资源管理和作业调度。YARN 自身具备队列管理功能,通过对 YARN 资源队列进行配置和管理,实现集群资源的分配,以满足不同应用和用户的需求。YARN 的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。 在大数据环境下,企业通常会有多个应用程序同时运行,这些应用程序可能具有不同的资源需求和优先级。为了合理分配和管理资源,避免资源争夺和冲突,需要对资源进行划分和调度。 本文将为大家介绍各类资源划分和队列管理方式,以及 EasyMR 新上线的 YARN 的队列管理功能,如何通过可视化界面管理,给广大用户带来更高效和便捷的队列管理体验。 资源划分方式 在大数据领域中,常见的资源划分方式通常有以下几种: 按照应用程序的类型或特性进行分类 例如,可以将 CPU 密集型的应用程序放置在一个队列中,将内存密集型的应用程序放置在另一个队列中。通过这种方式,可以确保不同类型的应用程序获得各自所需的资源,并避免资源浪费和不均衡的情况发生。 按照应用程序的优先级进行分类...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- CentOS7,8上快速安装Gitea,搭建Git服务器
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- CentOS8编译安装MySQL8.0.19
- CentOS7,CentOS8安装Elasticsearch6.8.6