Spark stage提交
Spark stage提交
更多资源
- github: https://github.com/opensourceteams/spark-scala-maven
- csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769
Youtube 视频
- Spark Stage提交(Youtube视频) : https://youtu.be/NI8-_X6mbl4
BiliBili 视频
- Spark Stage提交(bilibili视频) : https://www.bilibili.com/video/av37445077/
作业提交事件处理
- DAGScheduler 处事作业提交事件
- 用参数finalStage 调用 submitStage() 方法
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
submitWaitingStages()
}
submitStage 方法处理
- 验证当前stage所在的job是活动的才继续(如Job取消了,再继续也没有意义)
- 对waitingStages,runningStages,failedStages 进行验证(stage不能重复提交)
- stage提交之前先验证当前stage的上级stage是否为空,只有为空的才可以提交
- 当ShuffleMapStage的所有partition处理完成后,会设置isAvailable为真,也就是该stage已被处理完成,不需要再处理了,这时他的子Stage就可以提交了
/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
- 查找上级Stage
- 内部会递归一直找到祖先Stage
- (在这里判断)当ShuffleMapStage的所有partition处理完成后,会设置isAvailable为真,也就是该stage已被处理完成,不需要再处理了,这时他的子Stage就可以提交了
- getShuffleMapStage 跟 FinalStage的构建,那时的Stage划分一样,并且在FinalStage已对ShuffleDenpendency的Stage进行了缓存,这时直接根据ShuffleId匹配,直接用
private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new Stack[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
if (rddHasUncachedPartitions) {
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
case narrowDep: NarrowDependency[_] =>
waitingForVisit.push(narrowDep.rdd)
}
}
}
}
}
waitingForVisit.push(stage.rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}
missing.toList
}

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
Spark FinalStage处理(Stage划分)
Spark FinalStage处理(Stage划分) 更多资源 github: https://github.com/opensourceteams/spark-scala-maven csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769 Youtube视频 Spark FinalStage处理(Stage划分)(Youtube视频) : https://youtu.be/yFJugOV0Fak BiliBili视频 Spark FinalStage处理(Stage划分)(bilibili视频) : https://www.bilibili.com/video/av37445057/ 说明 由于DAGScheduler进行stage提交传的参数为FinalStage,所以对FinalStage的构成进行分析 RDD依赖为shuffleDep的stage已经进行了缓存,(这个时候已经对Stage进行明显的划分,只是没有提交) shuffleToMapStage.get(shuffleDep...
-
下一篇
阿里云HBase全新发布X-Pack NoSQL数据库再上新台阶
一、八年双十一,造就国内最大最专业HBase技术团队 阿里巴巴集团早在2010开始研究并把HBase投入生产环境使用,从最初的淘宝历史交易记录,到蚂蚁安全风控数据存储。持续8年的投入,历经8年双十一锻炼。4个PMC,6个committer,造就了国内最大最专业的HBase技术团队,其中HBase内核中超过200+重要的feature是阿里贡献。集团内部超过万台的规模,单集群超过千台,全球领先。 二、HBase技术团队重磅发布X-Pack,NoSQL数据库再上新台阶 阿里云自从17年8月提供HBase云服务以来,到18年12月累计服务了上千大B客户,已经有上千个在线的集群。是阿里云增长最为快速的数据库服务,也是大B客户比例最高的云服务之一。并于6月6日全球第一个推出HBase 2.0,是HBase领域当之无愧的排头兵。 为了满足客户对数据库
相关文章
文章评论
共有0条评论来说两句吧...