Spark Shuffle Write阶段磁盘文件分析
前言
org.apache.spark.scheduler.ShuffleMapTask.runTask runTask对应的代码为: val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any]( dep.shuffleHandle, partitionId, context) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get
org.apache.spark.shuffle.sort.SortShuffleWriter
sorter = if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") new ExternalSorter[K, V, C]( dep.aggregator, Some(dep.partitioner), dep.keyOrdering, de.serializer)
sorter.insertAll(records)
while (records.hasNext) { addElementsRead() kv = records.next() map.changeValue((getPartition(kv._1), kv._1), update) maybeSpillCollection(usingMap = true)}
"temp_shuffle_" + id
val (blockId, file) = diskBlockManager.createTempShuffleBlock() def createTempShuffleBlock(): (TempShuffleBlockId, File) = { var blockId = new TempShuffleBlockId(UUID.randomUUID()) while (getFile(blockId).exists()) { blockId = new TempShuffleBlockId(UUID.randomUUID()) } (blockId, getFile(blockId)) }
private val spills = new ArrayBuffer[SpilledFile]
(p, mergeWithAggregation( iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"
val partitionLengths = sorter.writePartitionedFile( blockId, context, outputFile)
for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) { val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get) for (elem <- elements) { writer.write(elem._1, elem._2) } writer.commitAndClose() val segment = writer.fileSegment() lengths(id) = segment.length } }
"shuffle_" + shuffleId + "_" + mapId + "_" + NOOP_REDUCE_ID + ".data"
shuffleBlockResolver.writeIndexFile( dep.shuffleId, mapId, partitionLengths)
"shuffle_" + shuffleId + "_" + mapId + "_" + NOOP_REDUCE_ID + ".index"
最终结论
MapNum (注:不包含index文件)
CoreNum

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Spark Sort Based Shuffle内存分析
前言 借用和董神的一段对话说下背景: shuffle共有三种,别人讨论的是hash shuffle,这是最原始的实现,曾经有两个版本,第一版是每个map产生r个文件,一共产生mr个文件,由于产生的中间文件太大影响扩展性,社区提出了第二个优化版本,让一个core上map共用文件,减少文件数目,这样共产生corer个文件,好多了,但中间文件数目仍随任务数线性增加,仍难以应对大作业,但hash shuffle已经优化到头了。为了解决hash shuffle性能差的问题,又引入sort shuffle,完全借鉴mapreduce实现,每个map产生一个文件,彻底解决了扩展性问题 目前Sort Based Shuffle 是作为默认Shuffle类型的。Shuffle 是一个很复杂的过程,任何一个环节都足够写一篇文章。所以这里,我尝试换个方式,从实用的角度出发,让读者有两方面的收获: 剖析哪些环节,哪些代码可能会让内存产生问题 控制相关内存的参数 有时候,我们宁可程序慢点,也不要OOM,至少要先跑步起来,希望这篇文章能够让你达成这个目标。 同时我们会提及一些类名,这些类方便你自己想更深入了解时,...
- 下一篇
Spring Boot + Elasticsearch
spring data elasticsearch elasticsearch 2.0.0.RELEASE 2.2.0 1.4.0.M1 1.7.3 1.3.0.RELEASE 1.5.2 1.2.0.RELEASE 1.4.4 1.1.0.RELEASE 1.3.2 1.0.0.RELEASE https://github.com/helloworldtang/spring-data-elasticsearch 1、None of the configured nodes are available 或者 org.elasticsearch.transport.RemoteTransportException: Failed to deserialize exception response from stream 原因:spring data elasticSearch 的版本与Spring boot、Elasticsearch版本不匹配。 解决: Spring Boot Version (x) Spring Data Elasticsearch Version (y) Ela...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Windows10,CentOS7,CentOS8安装Nodejs环境
- CentOS关闭SELinux安全模块
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS7设置SWAP分区,小内存服务器的救世主
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS6,CentOS7官方镜像安装Oracle11G
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS7安装Docker,走上虚拟化容器引擎之路