Spark FinalStage处理(Stage划分)
Spark FinalStage处理(Stage划分)
更多资源
- github: https://github.com/opensourceteams/spark-scala-maven
- csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769
Youtube视频
- Spark FinalStage处理(Stage划分)(Youtube视频) : https://youtu.be/yFJugOV0Fak
BiliBili视频
- Spark FinalStage处理(Stage划分)(bilibili视频) : https://www.bilibili.com/video/av37445057/
说明
- 由于DAGScheduler进行stage提交传的参数为FinalStage,所以对FinalStage的构成进行分析
- RDD依赖为shuffleDep的stage已经进行了缓存,(这个时候已经对Stage进行明显的划分,只是没有提交) shuffleToMapStage.get(shuffleDep.shuffleId)
DAGScheduler事件处理JobSubmitted
- 调用newResultStage()方法
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) submitStage(finalStage) submitWaitingStages() }
- 调用方法getParentStagesAndId()得到上级stage列表
/** * Create a ResultStage associated with the provided jobId. */ private def newResultStage( rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId) val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage }
- 调用方法getParentStages()
/** * Helper function to eliminate some code re-use when creating new stages. */ private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = { val parentStages = getParentStages(rdd, firstJobId) val id = nextStageId.getAndIncrement() (parentStages, id) }
- 该方法计算上级stage
- 根据当前RDD=rdd4 的依赖类型判断是不是ShuffleDependency
- 不是,找上级RDD,再继续判断上级RDD的依赖类型
- 是,创建ShuffleMapStage并还回,此stage的RDD为rdd4的上级RDD
- 注意只要有上级stage,就会一直先找上级stage,这样找到根上的stage的id为0,依次子stage的id加1
/** * Get or create the list of parent stages for a given RDD. The new Stages will be created with * the provided firstJobId. */ private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { val parents = new HashSet[Stage] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] def visit(r: RDD[_]) { if (!visited(r)) { visited += r // Kind of ugly: need to register RDDs with the cache here since // we can't do it in its constructor because # of partitions is unknown for (dep <- r.dependencies) { dep match { case shufDep: ShuffleDependency[_, _, _] => parents += getShuffleMapStage(shufDep, firstJobId) case _ => waitingForVisit.push(dep.rdd) } } } } waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { visit(waitingForVisit.pop()) } parents.toList }
图解FinalStage
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Spark DAG调度器事件循环处理器
Spark DAG调度器事件循环处理器 更多资源 github: https://github.com/opensourceteams/spark-scala-maven csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769 Youtube 视频 Spark DAG调度器事件循环处理器(Youtube视频) : https://youtu.be/fT-dpf0KFOA Bilibili 视频 Spark DAG调度器事件循环处理器(bilibili视频) : https://www.bilibili.com/video/av37445034/ DAGSchedulerEventProcessLoop.scala DAGSchedulerEventProcessLoop(DAG调度器事件循环处理器)继承抽象类EventLoop(事件循环器) 当调用 post方法增加事件时,实际上是往EventLoop中的列表阻塞队列eventQueue增加元素 EventLoop在DAGScheduler类...
- 下一篇
Spark stage提交
Spark stage提交 更多资源 github: https://github.com/opensourceteams/spark-scala-maven csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769 Youtube 视频 Spark Stage提交(Youtube视频) : https://youtu.be/NI8-_X6mbl4 BiliBili 视频 Spark Stage提交(bilibili视频) : https://www.bilibili.com/video/av37445077/ 作业提交事件处理 DAGScheduler 处事作业提交事件 用参数finalStage 调用 submitStage() 方法 private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Arr...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Hadoop3单机部署,实现最简伪集群
- CentOS7,8上快速安装Gitea,搭建Git服务器
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2整合Redis,开启缓存,提高访问速度
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS8编译安装MySQL8.0.19