Cloud Shuffle Service 在字节跳动 Spark 场景的应用实践
本文整理自字节跳动基础架构的大数据开发工程师魏中佳在 ApacheCon Aisa 2022 「大数据」议题下的演讲,主要介绍 Cloud Shuffle Service(CSS) 在字节跳动 Spark 场景下的设计与实现。
背景介绍
External Shuffle Service
- 由于每次读取的都是这个 Shuffle 文件的 1/R,通常情况下这个数据量是非常非常小的,大概是 KB 级别(从几百 KB 到几 KB 不等),这样会给磁盘(尤其是 HDD )带来大量随机的读请求。
- 同时,大家可以看到,Reduce 进行的 Shuffle Fetch 请求整体看是一个网状结构,也就是说会存在大量的网络请求,量级大概是 M 乘以 R,这个请求的数量级也是非常大的。
Spark 在字节跳动的应用
- 日均 100 万左右个作业
- 日均 300 PB Shuffle 数据
- 大量作业签署 SLA,对稳定性要求非常高,超时严重还会严重影响下游
- 大量 HDD 机器和少量 SSD 机器
- 大量在线业务低峰出让的资源,可用磁盘空间非常小,需要把存储拉远
问题总结
- Chunk Size 过小导致磁盘产生大量随机 IO,降低磁盘的吞吐,引发 Chunk Fetch 请求的堆积、超时甚至引发 Stage Retry;
- 磁盘 IOPS 无法在操作系统层面进行隔离,Shuffle 过程中不同 Application 作业会互相影响;
- 在离线混部场景下,我们希望利用在线服务业务低峰期的 CPU,但缺少对应的磁盘资源。
External Shuffle Service 的优化
参数调优
- 首先,采集 Spark、Yarn 运行时的 Event Log 作为数据源;
- 其次,使用 Flink 对原始数据进行 Join 和计算,得到作业某个 Stage 的 Shuffle 量、Task 数量等指标;
- 针对上述指标,
- 一方面,在计算过程使用可插拔的启发式规则对单个作业进行诊断;
- 另一方面,同时存在着大量的周期作业重复运行生成该作业的历史画像;
-
- 最终,结合历史画像与特征诊断信息对特定作业进行自动调参。
- spark.sql.adaptive.shuffle.targetPostShuffleInputSize: 64M->512M
- spark.sql.files.maxPartitionBytes: 1G->40G
Shuffle 限流
- 正常任务打开限流没有影响,不会触发流量限制;
- 异常任务开启限流,不会让任务变慢或失败,大概率会使得任务变快 (限流减少重试,减轻 Server 压力);
此处有必要解释一下,为什么任务会变得更快呢?原因在于当 Latency 升高时,Chunkr Fetch 开始堆积,大量排队,此时往往容易形成恶性循环,请求过来-开始排队-超时-超时后重试-重试后继续排队-继续超时,Fetch 请求可能永远都得不到正常响应。但当我们开启限流之后,我们主动地让客户端等待,而非发一个请求过来在服务端排队,由此就可以避免大量无效的 Fetch 请求。也正因如此,大概率即便是被限流的作业也会变得更快。
- 不同优先级的任务,在限流情况下,高优先级任务允许更高的流量;
上文提到,我们是根据排队的数量,及作业的优先级综合地划定一个合适的范围。在划定这个范围的时候,更高优的作业大概率是不会被限流的。
- 异常节点快速恢复,2min~5min 能恢复正常。
结合第二点,因为我们让一部分发送大量 Fetch 请求的作业的客户端进行了等待休眠,所以异常节点会得到一个非常快速的恢复,大概 2~5 分钟就能恢复正常,恢复正常后,就可以给所有的 Fetch 继续提供服务。
Cloud Shuffle Service 的设计与实现
基本思路
整体架构
- Zookeeper WorkerList:我们使用 zookeeper 来提供服务发现的功能;
- CSS Worker [Partitions / Disk | HDFS ]:管理磁盘并提供 Shuffle Push 服务节点。每一个机器上都会启动 Worker 进程,当收到启动指令时,它就会向 Zookeeper 进行注册,并定时更新上报信息;
- Spark Driver:集成启动 CSS Master 和 ClusterName + ZK
- CSS Master 的作用是规划和统计,Master 从 Zookeeper 中拉取所有 Worker 的信息,并对 Worker 进行分配,然后把 Worker 和 Shuffle 以及每个 Partition 的对应关系通知到 Executor
- ClusterName + ZK:通过配置的 ClusterName 在 ZK 中寻找对应的 Workerlist
-
- CSS ShuffleClient:Writer 和 Read 的集合,负责跟 Worker通信,读取数据或写入数据。
读写过程
性能分析
Cloud Shuffle Service 的应用实践
- CSS Worker 数量 1000+,对应1000多台机器
- 部署模式灵活:Shell、Yarn、K8S
- 支持作业类型众多:Spark、MR、Flink Batch
- 接入作业数 6w+
- 单日 Shuffle 量 9PB+
集群部署&作业接入
构建运维接入管理平台(CSS-Coordinator)
- 提供用户作业无感知接入功能:直接帮用户注入 CSS 相关的参数;
- 提供 Cluster|Queue|Job等维度的灰度模式:支持以各种纬度接入作业,用户仅需配置对应的接入纬度,该维度下的所有作业都会接入到 CSS 中;
- 异常作业的监控告警:作业运行结果会上报到 Coordinator 平台,对于运行失败的作业会进行报警
- 历史 Shuffle 作业的 HBO 优化:平台在作业接入过程中会针对作业历史的 Shuffle 数据量进行评估,当 CSS 集群资源不足时会拒绝大 Shuffle 的作业接入 CSS;
- 设置 spark.yarn.maxAppAttempts=2
- 保留用户原始配置
- 作业 CSS 失败自动 FallBack 到原生 Shuffle
踩坑记录
CSS 服务相关
- 超大 Register Shuffle 启动缓慢在最初的设计中,Register Shuffle 会对所有 Worker 进行初始化工作。因此,在规模比较大的 Shuffle 的场景下,Register 就会非常慢,用户启动一个 Stage 可能需要 2-3 分钟。后来,我们对 Register Shuffle 进行了精简,把 Worker 的初始化动作改成了 Lazy 模式,即只有第一次数据 Push 过来的时候,Worker 才针对这一个作业的 Partition 进行对应的初始化工作。在 Register Shuffle 的时候,只进行 Worker 和 Partition 之间的分配,大大缓解了超大 Register Shuffle 启动缓慢的问题。
- Client 发送速率过快因为我们是一个有状态的服务,无法把 QPS 通过负载均衡的方式降下来,只能通过一些负反馈的方式让 Client 降速,即当 Server 的服务能力无法满足请求时,就让请求在客户端等待。后续我们尝试了很多方法,包括 Spark 原生的 Max Inflight 等,但效果都不太好,最终我们选择了 Netflix 的一个三方库。大致原理是,针对最近一段时间的 RTT 做一个 Smoth 处理,得到一个理论的 RTT,然后拿当前的 RTT 与理论 RTT 做比较,如果小于这个值的话,就在 QPS 上做爬坡。如果大于这个值的话,系统就认为现在的 Server 有排队现象,然后就启动限流。
- 服务热上线,用户如何不感知在 CSS beta 的过程中,每天都会有新的 Commit 合到主分支,每天也会产生新的问题。但是公司内部的 Spark 发展周期是比较长,跟 CSS 的迭代周期无法 Match。最终,我们在 Spark 里只集成了一个最简单的接口,其他的实现都放到 HDFS 上,这样就把公司内 Spark 版本的周期与 CSS 的版本周期做了解耦,CSS 就可以做到小步快跑。在小步快跑的过程中,那我们解决了大量的问题。
Spark 集成相关
- AQE Skew-Join 读放大问题AQE Skew-Join 原理图上图是 Spark 社区提供的 AQE Skew-Join 原理图,根据这个原理,当 Spark 发现某一个 Partition 数据非常大,远超其他 Partition 的时候,它会主动把该 Partition 的数据拆分成多份数据,然后分别去做 Join。这样最终每个 Task 处理的数据量就会更平均,整体作业的运营时间也会变短。设想一下,当我们把 Map 的数据全部聚合起来后会发生什么?一个文件会读很多遍,每次读的时候还会 Skip 很多无效数据。举个例子,一个倾斜的 Partition 上有 1T 数据,Spark 想把它拆成十份去读,这时会发现什么呢?就是这个被聚合后 1T 的文件要读 10 遍,且每次有 1/9 读到的数据都是 Skip 的。面对这个问题,我们的解决办法也非常朴素,就是不再盲目地追求生成一个非常大的连续文件。实际上我们要解决的就是随机读的问题,所以只要文件足够大就可以。因此,我们把文件默认按照 512G 的大小进行切分,一个大的 Partition 数据最终会被切分成若干个小文件。比如上文的例子,1T 的数据会被切分成很多份 512G 的文件,当 AQE Skew-Join 触发时,就不必把一个超大文件读很多遍,只需把这些 512G 的文件按需分配给不同的 Task 进行 Join 就可以。
- Task Huge Partition 导致 Executor 内存占用过大在最初的设计中,基于 Push 的特性,我们是不想做排序的。最初的思路类似于 By Pass 的实现思路,给每一个 Partition 准备一个 64k 的 Buffer,一旦这个 Partition 的 Buffer 写满,就发送出去。后来发现当 Partition 数量非常大的时候,Buffer 就会占非常大的空间。假设一个极端的场景,当有 10 万个Partition 时,如果一个 Partition 的 Buffer 是 64k,那占用的内存还是非常大的。所以最终我们还是回到了 Sort 的路线,即把数据整体在内存里写满之后,再进行 Source Build, 那么 Spill 也不会再写到磁盘里,Spill 之后也不需要 Merge 把 Spill 的数据发送出去。这样做还可以降低 Push 的请求数,同一个 Worker 不同的 Partition push 数据的时候,就可以把它们放到一起放到 Push Request 里。
收益分析
未来展望
此前,Cloud Shuffle Service 已在 Github 上开源,基于字节跳动大规模实践的火山引擎批式计算 Spark 版也已经上线火山引擎,支持公有云、混合云及多云部署,全面贴合企业上云策略,欢迎了解:https://www.volcengine.com/product/spark
更多资讯欢迎关注公众号【字节跳动云原生计算】

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
MASA MAUI Plugin (四)条形码、二维码扫描功能
背景 MAUI的出现,赋予了广大.Net开发者开发多平台应用的能力,MAUI 是Xamarin.Forms演变而来,但是相比Xamarin性能更好,可扩展性更强,结构更简单。但是MAUI对于平台相关的实现并不完整。所以MASA团队开展了一个实验性项目,意在对微软MAUI的补充和扩展 项目地址https://github.com/BlazorComponent/MASA.Blazor/tree/main/src/Masa.Blazor.Maui.Plugin, 每个功能都有单独的demo演示项目,考虑到app安装文件体积(虽然MAUI已经集成裁剪功能,但是该功能对于代码本身有影响),届时每一个功能都会以单独的nuget包的形式提供,方便测试,现在项目才刚刚开始,但是相信很快就会有可以交付的内容啦。 前言 本系列文章面向移动开发小白,从零开始进行平台相关功能开发,演示如何参考平台的官方文档使用MAUI技术来开发相应功能。 介绍 移动端的扫描条形码、二维码的功能已经随处可见,已经很难找到一个不支持扫描的App了,但是微软的MAUI竟然没有提供,那么我们应该如何实现呢? 其实早在 Xamari...
- 下一篇
区服分析丨更透彻的游戏营运数据解读,助力高效增长
全民买量时代,新服和新区持续增开,对玩家长线留存及付费提升显得尤为重要。在分析游戏活动效果和玩家营运数据时,相信大家都曾有过这样的疑问: 不同区服玩家的表现如何对比分析? 怎样合理评估新开区服对玩家回流的持续吸引力? 高性价比的新服激励是否有效提升了ARPU? … HMS Core分析服务全新上线6.8.0版本,提供更多维度的游戏行业指标解读与埋点方案,支持区服分析,满足开发者对游戏玩家更多深度洞察诉求。 一、开箱即用的埋点方案到核心指标的解读闭环,深度分析游戏赛道的用户表现 在游戏行业中,数据采集不全面、缺乏挖掘能力等痛点是厂商自建数据中台一直难以解决的技术难题。为进一步满足更多细分游戏品类的精益运营诉求,HMS Core分析服务在原有卡牌、MMO等细分游戏品类行业报告的基础上,新增通用游戏行业报告。提供完整的游戏指标数据体系搭建,并配套对应的埋点模板和代码样例,帮助开发者随时掌握游戏的核心数据表现。 *图中数据为虚拟 开发者可通过开箱即用的代码样例,灵活选择代码复制、可视化埋点等快捷方式完成数据采集。当数据成功上报后,游戏行业报告将呈现付费分析、玩家分析、区服分析等不同维度的指标看...
相关文章
文章评论
共有0条评论来说两句吧...