您现在的位置是:首页 > 文章详情

Spark FinalStage处理(Stage划分)

日期:2018-12-03点击:364

Spark FinalStage处理(Stage划分)

更多资源

Youtube视频

BiliBili视频

说明

  • 由于DAGScheduler进行stage提交传的参数为FinalStage,所以对FinalStage的构成进行分析
  • RDD依赖为shuffleDep的stage已经进行了缓存,(这个时候已经对Stage进行明显的划分,只是没有提交) shuffleToMapStage.get(shuffleDep.shuffleId)

DAGScheduler事件处理JobSubmitted

  • 调用newResultStage()方法
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) submitStage(finalStage) submitWaitingStages() } 
  • 调用方法getParentStagesAndId()得到上级stage列表
 /** * Create a ResultStage associated with the provided jobId. */ private def newResultStage( rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId) val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage } 
  • 调用方法getParentStages()
 /** * Helper function to eliminate some code re-use when creating new stages. */ private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = { val parentStages = getParentStages(rdd, firstJobId) val id = nextStageId.getAndIncrement() (parentStages, id) } 
  • 该方法计算上级stage
  • 根据当前RDD=rdd4 的依赖类型判断是不是ShuffleDependency
  • 不是,找上级RDD,再继续判断上级RDD的依赖类型
  • 是,创建ShuffleMapStage并还回,此stage的RDD为rdd4的上级RDD
  • 注意只要有上级stage,就会一直先找上级stage,这样找到根上的stage的id为0,依次子stage的id加1
/** * Get or create the list of parent stages for a given RDD. The new Stages will be created with * the provided firstJobId. */ private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { val parents = new HashSet[Stage] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] def visit(r: RDD[_]) { if (!visited(r)) { visited += r // Kind of ugly: need to register RDDs with the cache here since // we can't do it in its constructor because # of partitions is unknown for (dep <- r.dependencies) { dep match { case shufDep: ShuffleDependency[_, _, _] => parents += getShuffleMapStage(shufDep, firstJobId) case _ => waitingForVisit.push(dep.rdd) } } } } waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { visit(waitingForVisit.pop()) } parents.toList } 

图解FinalStage

FinalStage图解

原文链接:https://yq.aliyun.com/articles/675428
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章