// 提交最后一个stage submitStage(finalStage) // 提交其他正在等待的stage submitWaitingStages()
        从代码我们可以看出,Stage提交的逻辑顺序,是由后往前,即先提交最后一个finalStage,即ResultStage,然后再提交其parent stages,但是实际物理顺序是否如此呢?我们首先看下finalStage的提交,方法submitStage()代码如下:

/** Submits stage, but first recursively submits any missing parents. */ // 提交stage,但是首先要递归的提交所有的missing父stage private def submitStage(stage: Stage) { // 根据stage获取jobId val jobId = activeJobForStage(stage) if (jobId.isDefined) {// 如果jobId已定义 // 记录Debug日志信息:submitStage(stage) logDebug("submitStage(" + stage + ")") // 如果在waitingStages、runningStages或 // failedStages任意一个中,不予处理 // 既不在waitingStages中,也不在runningStages中,还不在failedStages中 // 说明未处理过 if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { // 调用getMissingParentStages()方法,获取stage还没有提交的parent val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing.isEmpty) { // 如果missing为空,说明是没有parent的stage或者其parent stages已提交, // 则调用submitMissingTasks()方法,提交tasks logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get) } else { // 否则,说明其parent还没有提交,递归,循环missing,提交每个stage for (parent <- missing) { submitStage(parent) } // 将该stage加入到waitingStages中 waitingStages += stage } } } else { // 放弃该Stage abortStage(stage, "No active job for stage " + stage.id, None) } }
        代码逻辑比较简单。根据stage获取到jobId,如果jobId未定义,说明该stage不属于明确的Job,则调用abortStage()方法放弃该stage。如果jobId已定义的话,则需要判断该stage属于waitingStages、runningStages、failedStages中任意一个,则该stage忽略,不被处理。顾名思义,waitingStages为等待处理的stages,spark采取由后往前的顺序处理stage提交,即先处理child stage,然后再处理parent stage,所以位于waitingStages中的stage,由于其child stage尚未处理,所以必须等待,runningStages为正在运行的stages,正在运行意味着已经提交了,所以无需再提交,而最后的failedStages就是失败的stages,既然已经失败了,再提交也还是会失败,徒劳无益啊~


        首先调用getMissingParentStages()方法,获取stage还没有提交的parent,即missing;如果missing为空,说明该stage要么没有parent stage,要么其parent stages都已被提交,此时该stage就可以被提交,用于提交的方法submitMissingTasks()我们稍后分析。

        如果missing不为空,则说明该stage还存在尚未被提交的parent stages,那么,我们就需要遍历missing,循环提交每个stage,并将该stage添加到waitingStages中,等待其parent stages都被提交后再被提交。


private def getMissingParentStages(stage: Stage): List[Stage] = { // 存储尚未提交的parent stages,用于最后结果的返回 val missing = new HashSet[Stage] // 已被处理的RDD集合 val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting // 待处理RDD栈,后入先出 val waitingForVisit = new Stack[RDD[_]] // 定义函数visit def visit(rdd: RDD[_]) { // 通过visited判断rdd是否已处理 if (!visited(rdd)) { // 添加到visited,下次不会再处理 visited += rdd val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil) if (rddHasUncachedPartitions) { // 循环rdd的dependencies for (dep <- rdd.dependencies) { dep match { // 宽依赖 case shufDep: ShuffleDependency[_, _, _] => // 调用getShuffleMapStage,获取ShuffleMapStage val mapStage = getShuffleMapStage(shufDep, stage.firstJobId) if (!mapStage.isAvailable) { missing += mapStage } // 窄依赖,直接将RDD压入waitingForVisit栈 case narrowDep: NarrowDependency[_] => waitingForVisit.push(narrowDep.rdd) } } } } } // 将stage的rdd压入到waitingForVisit顶部 waitingForVisit.push(stage.rdd) // 循环处理waitingForVisit,对弹出的每个rdd调用函数visit while (waitingForVisit.nonEmpty) { visit(waitingForVisit.pop()) } // 返回stage列表 missing.toList }
        有没有些似曾相识的感觉呢?对了,和 《Spark源码分析之Stage划分》一文中getParentStages()方法、getAncestorShuffleDependencies()方法结构类似,也是定义了三个数据结构和一个visit()方法。三个数据结构分别是:

        1、missing:HashSet[Stage]类型,存储尚未提交的parent stages,用于最后结果的返回;





        那么,整个missing的获取就一目了然,将final stage即ResultStage的RDD压入到waitingForVisit顶部,循环处理即可得到missing。


        submitStage()方法已分析完毕,go on,我们再回归到handleJobSubmitted()方法,在调用submitStage()方法提交finalStage之后,实际上只是将最原始的parent stage提交,其它child stage均存储在了waitingStages中,那么,接下来,我们就要调用submitWaitingStages()方法提交其中的stage。代码如下:

/** * Check for waiting or failed stages which are now eligible for resubmission. * Ordinarily run on every iteration of the event loop. */ private def submitWaitingStages() { // TODO: We might want to run this less often, when we are sure that something has become // runnable that wasn't before. logTrace("Checking for newly runnable parent stages") logTrace("running: " + runningStages) logTrace("waiting: " + waitingStages) logTrace("failed: " + failedStages) // 将waitingStages转换为数组 val waitingStagesCopy = waitingStages.toArray // 清空waitingStages waitingStages.clear() // 循环waitingStagesCopy,挨个调用submitStage()方法进行提交 for (stage <- waitingStagesCopy.sortBy(_.firstJobId)) { submitStage(stage) } }



/** * Returns true if the map stage is ready, i.e. all partitions have shuffle outputs. * This should be the same as `outputLocs.contains(Nil)`. * 如果map stage已就绪的话返回true,即所有分区均有shuffle输出。这个将会和outputLocs.contains保持一致。 */ def isAvailable: Boolean = _numAvailableOutputs == numPartitions
        它是通过判断_numAvailableOutputs和numPartitions是否相等来确定stage是否已被提交(或者说准备就绪可以提交is ready)的,而numPartitions很好理解,就是stage中的全部分区数目,那么_numAvailableOutputs是什么呢?

private[this] var _numAvailableOutputs: Int = 0 /** * Number of partitions that have shuffle outputs. * When this reaches [[numPartitions]], this map stage is ready. * This should be kept consistent as `outputLocs.filter(!_.isEmpty).size`. * * 拥有shuffle的分区数量。 * 当这个numAvailableOutputs达到numPartitions时,这个map stage也就准备好了。 * 这个应与outputLocs.filter(!_.isEmpty).size保持一致 */ def numAvailableOutputs: Int = _numAvailableOutputs
        可以看出,_numAvailableOutputs就是拥有shuffle outputs的分区数量,当这个numAvailableOutputs达到numPartitions时,这个map stage也就准备好了。


def addOutputLoc(partition: Int, status: MapStatus): Unit = { val prevList = outputLocs(partition) outputLocs(partition) = status :: prevList if (prevList == Nil) { _numAvailableOutputs += 1 } } def removeOutputLoc(partition: Int, bmAddress: BlockManagerId): Unit = { val prevList = outputLocs(partition) val newList = prevList.filterNot(_.location == bmAddress) outputLocs(partition) = newList if (prevList != Nil && newList == Nil) { _numAvailableOutputs -= 1 } }

if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { // 如果mapOutputTracker中存在 // 根据shuffleId从mapOutputTracker中获取序列化的多个MapOutputStatus对象 val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId) // 反序列化 val locs = MapOutputTracker.deserializeMapStatuses(serLocs) // 循环 (0 until locs.length).foreach { i => if (locs(i) ne null) { // locs(i) will be null if missing // 将 stage.addOutputLoc(i, locs(i)) } } } else { // 如果mapOutputTracker中不存在,注册一个 // Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of partitions is unknown logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")") // 注册的内容为 // 1、根据shuffleDep获取的shuffleId; // 2、rdd中分区的个数 mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length) }



/** Called when stage's parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")") // Get our pending tasks and remember them in our pendingTasks entry // 清空stage的pendingPartitions stage.pendingPartitions.clear() // First figure out the indexes of partition ids to compute. // 首先确定该stage需要计算的分区ID索引 val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() // Create internal accumulators if the stage has no accumulators initialized. // Reset internal accumulators only if this stage is not partially submitted // Otherwise, we may override existing accumulator values from some tasks if (stage.internalAccumulators.isEmpty || stage.numPartitions == partitionsToCompute.size) { stage.resetInternalAccumulators() } // Use the scheduling pool, job group, description, etc. from an ActiveJob associated // with this Stage val properties = jobIdToActiveJob(jobId).properties // 将stage加入到runningStages中 runningStages += stage // SparkListenerStageSubmitted should be posted before testing whether tasks are // serializable. If tasks are not serializable, a SparkListenerStageCompleted event // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. // 开启一个stage时,需要调用outputCommitCoordinator的stageStart()方法, stage match { // 如果为ShuffleMapStage case s: ShuffleMapStage => outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1) // 如果为ResultStage case s: ResultStage => outputCommitCoordinator.stageStart( stage = s.id, maxPartitionId = s.rdd.partitions.length - 1) } // 创建一个Map:taskIdToLocations,存储的是id->Seq[TaskLocation]的映射关系 // 对stage中指定RDD的每个分区获取位置信息,映射成id->Seq[TaskLocation]的关系 val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { stage match { // 如果是ShuffleMapStage case s: ShuffleMapStage => partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap // 如果是ResultStage case s: ResultStage => val job = s.activeJob.get partitionsToCompute.map { id => val p = s.partitions(id) (id, getPreferredLocs(stage.rdd, p)) }.toMap } } catch { case NonFatal(e) => stage.makeNewStageAttempt(partitionsToCompute.size) listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e)) runningStages -= stage return } // 标记新的stage attempt stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq) // 发送一个SparkListenerStageSubmitted事件 listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast // the serialized copy of the RDD and for each task we will deserialize it, which means each // task gets a different copy of the RDD. This provides stronger isolation between tasks that // might modify state of objects referenced in their closures. This is necessary in Hadoop // where the JobConf/Configuration object is not thread-safe. // 对stage进行序列化,如果是ShuffleMapStage,序列化rdd和shuffleDep,如果是ResultStage,序列化rdd和func var taskBinary: Broadcast[Array[Byte]] = null try { // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). // 对于ShuffleMapTask,序列化并广播,广播的是rdd和shuffleDep // For ResultTask, serialize and broadcast (rdd, func). // 对于ResultTask,序列化并广播,广播的是rdd和func val taskBinaryBytes: Array[Byte] = stage match { case stage: ShuffleMapStage => // 序列化ShuffleMapStage closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array() case stage: ResultStage => // 序列化ResultStage closureSerializer.serialize((stage.rdd, stage.func): AnyRef).array() } // 通过sc广播序列化的task taskBinary = sc.broadcast(taskBinaryBytes) } catch { // In the case of a failure during serialization, abort the stage. case e: NotSerializableException => abortStage(stage, "Task not serializable: " + e.toString, Some(e)) runningStages -= stage // Abort execution return case NonFatal(e) => abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}", Some(e)) runningStages -= stage return } // 针对stage的每个分区构造task,形成tasks:ShuffleMapStage生成ShuffleMapTasks,ResultStage生成ResultTasks val tasks: Seq[Task[_]] = try { stage match { // 如果是ShuffleMapStage case stage: ShuffleMapStage => partitionsToCompute.map { id => // 位置信息 val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) // 创建ShuffleMapTask,其中包括位置信息 new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, stage.internalAccumulators) } // 如果是ResultStage case stage: ResultStage => val job = stage.activeJob.get partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) // 创建ResultTask new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id, stage.internalAccumulators) } } } catch { case NonFatal(e) => abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e)) runningStages -= stage return } // 如果存在tasks,则利用taskScheduler.submitTasks()提交task,否则标记stage已完成 if (tasks.size > 0) { logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") // 赋值pendingPartitions stage.pendingPartitions ++= tasks.map(_.partitionId) logDebug("New pending partitions: " + stage.pendingPartitions) // 利用taskScheduler.submitTasks()提交task taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) // 记录提交时间 stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run // 标记stage已完成 markStageAsFinished(stage, None) val debugString = stage match { case stage: ShuffleMapStage => s"Stage ${stage} is actually done; " + s"(available: ${stage.isAvailable}," + s"available outputs: ${stage.numAvailableOutputs}," + s"partitions: ${stage.numPartitions})" case stage : ResultStage => s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})" } logDebug(debugString) } }






        6、标记新的stage attempt,并发送一个SparkListenerStageSubmitted事件;















