Spark Sort Based Shuffle内存分析
前言
- 剖析哪些环节,哪些代码可能会让内存产生问题
- 控制相关内存的参数
Shuffle 概览
- Write 对应的是ShuffleMapTask,具体的写操作ExternalSorter来负责
- Read 阶段由ShuffleRDD里的HashShuffleReader来完成。如果拉来的数据如果过大,需要落地,则也由ExternalSorter来完成的
- 所有Write 写完后,才会执行Read。 他们被分成了两个不同的Stage阶段。
Shuffle Write 内存消耗分析
org.apache.spark.scheduler.ShuffleMapTask ---> org.apache.spark.shuffle.sort.SortShuffleWriter ---> org.apache.spark.util.collection.ExternalSorter
private var map = new PartitionedAppendOnlyMap[K, C]
private var data = new Array[AnyRef](2 * capacity)
spark.shuffle.file.buffer=32k
C * 32k + C * 10000个Record + C * PartitionedAppendOnlyMap
C * PartitionedAppendOnlyMap < ExecutorHeapMemeory * 0.2 * 0.8
estimatedSize = map.estimateSize() if (maybeSpill(map, estimatedSize)) { map = new PartitionedAppendOnlyMap[K, C] }
elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold
spark.shuffle.spill.initialMemoryThreshold = 5 * 1024 * 1024
ExecutorHeapMemeory * 0.2 * 0.8
spark.shuffle.memoryFraction=0.2 spark.shuffle.safetyFraction=0.8
estimatedSize = map.estimateSize()
spark.shuffle.safetyFraction=0.8
org.apache.spark.rdd.ShuffledRDD ---> org.apache.spark.shuffle.sort.HashShuffleReader ---> org.apache.spark.util.collection.ExternalAppendOnlyMap ---> org.apache.spark.util.collection.ExternalSorter
- 获取待拉取数据的迭代器
- 使用AppendOnlyMap/ExternalAppendOnlyMap 做combine
- 如果需要对key排序,则使用ExternalSorter
spark.shuffle.spill=true
private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
private val mergeHeap = new mutable.PriorityQueue[StreamBuffer]
private class StreamBuffer( val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)])
C * 32k + C * mergeHeap + C * SizeTrackingAppendOnlyMap
C * SizeTrackingAppendOnlyMap < ExecutorHeapMemeory * 0.2 * 0.8
C * SizeTrackingAppendOnlyMap + C * PartitionedAppendOnlyMap
C * SizeTrackingAppendOnlyMap < ExecutorHeapMemeory * 0.2 * 0.8
后话

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Spark Streaming Direct Approach (No Receivers) 分析
前言 这个算是Spark Streaming 接收数据相关的第三篇文章了。 前面两篇是: Spark Streaming 数据产生与导入相关的内存分析 Spark Streaming 数据接收优化 Spark Streaming 接受数据的方式有两种: Receiver-based Approach Direct Approach (No Receivers) 上面提到的两篇文章讲的是 Receiver-based Approach 。 而这篇文章则重点会分析Direct Approach (No Receivers) 。 个人认为,DirectApproach 更符合Spark的思维。我们知道,RDD的概念是一个不变的,分区的数据集合。我们将kafka数据源包裹成了一个KafkaRDD,RDD里的partition 对应的数据源为kafka的partition。唯一的区别是数据在Kafka里而不是事先被放到Spark内存里。其实包括FileInputStream里也是把每个文件映射成一个RDD,比较好奇,为什么一开始会有Receiver-based Approach,额外添加了Rec...
- 下一篇
Spark Shuffle Write阶段磁盘文件分析
前言 上篇写了 Spark Shuffle 内存分析后,有不少人提出了疑问,大家也对如何落文件挺感兴趣的,所以这篇文章会详细介绍,Sort Based Shuffle Write 阶段是如何进行落磁盘的 流程分析。 入口处: org.apache.spark.scheduler.ShuffleMapTask.runTask runTask对应的代码为: val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any]( dep.shuffleHandle, partitionId, context) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get 这里manager 拿到的是 org.apache.spark.shuffle.sort.SortShuffleWriter 我们看他是如何...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Windows10,CentOS7,CentOS8安装Nodejs环境
- CentOS关闭SELinux安全模块
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS7设置SWAP分区,小内存服务器的救世主
- Docker安装Oracle12C,快速搭建Oracle学习环境