Spark stage提交
Spark stage提交
更多资源
- github: https://github.com/opensourceteams/spark-scala-maven
- csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769
Youtube 视频
- Spark Stage提交(Youtube视频) : https://youtu.be/NI8-_X6mbl4
BiliBili 视频
- Spark Stage提交(bilibili视频) : https://www.bilibili.com/video/av37445077/
作业提交事件处理
- DAGScheduler 处事作业提交事件
- 用参数finalStage 调用 submitStage() 方法
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) submitStage(finalStage) submitWaitingStages() }
submitStage 方法处理
- 验证当前stage所在的job是活动的才继续(如Job取消了,再继续也没有意义)
- 对waitingStages,runningStages,failedStages 进行验证(stage不能重复提交)
- stage提交之前先验证当前stage的上级stage是否为空,只有为空的才可以提交
- 当ShuffleMapStage的所有partition处理完成后,会设置isAvailable为真,也就是该stage已被处理完成,不需要再处理了,这时他的子Stage就可以提交了
/** Submits stage, but first recursively submits any missing parents. */ private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) } }
- 查找上级Stage
- 内部会递归一直找到祖先Stage
- (在这里判断)当ShuffleMapStage的所有partition处理完成后,会设置isAvailable为真,也就是该stage已被处理完成,不需要再处理了,这时他的子Stage就可以提交了
- getShuffleMapStage 跟 FinalStage的构建,那时的Stage划分一样,并且在FinalStage已对ShuffleDenpendency的Stage进行了缓存,这时直接根据ShuffleId匹配,直接用
private def getMissingParentStages(stage: Stage): List[Stage] = { val missing = new HashSet[Stage] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] def visit(rdd: RDD[_]) { if (!visited(rdd)) { visited += rdd val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil) if (rddHasUncachedPartitions) { for (dep <- rdd.dependencies) { dep match { case shufDep: ShuffleDependency[_, _, _] => val mapStage = getShuffleMapStage(shufDep, stage.firstJobId) if (!mapStage.isAvailable) { missing += mapStage } case narrowDep: NarrowDependency[_] => waitingForVisit.push(narrowDep.rdd) } } } } } waitingForVisit.push(stage.rdd) while (waitingForVisit.nonEmpty) { visit(waitingForVisit.pop()) } missing.toList }
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Spark FinalStage处理(Stage划分)
Spark FinalStage处理(Stage划分) 更多资源 github: https://github.com/opensourceteams/spark-scala-maven csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769 Youtube视频 Spark FinalStage处理(Stage划分)(Youtube视频) : https://youtu.be/yFJugOV0Fak BiliBili视频 Spark FinalStage处理(Stage划分)(bilibili视频) : https://www.bilibili.com/video/av37445057/ 说明 由于DAGScheduler进行stage提交传的参数为FinalStage,所以对FinalStage的构成进行分析 RDD依赖为shuffleDep的stage已经进行了缓存,(这个时候已经对Stage进行明显的划分,只是没有提交) shuffleToMapStage.get(shuffleDep...
- 下一篇
阿里云HBase全新发布X-Pack NoSQL数据库再上新台阶
一、八年双十一,造就国内最大最专业HBase技术团队 阿里巴巴集团早在2010开始研究并把HBase投入生产环境使用,从最初的淘宝历史交易记录,到蚂蚁安全风控数据存储。持续8年的投入,历经8年双十一锻炼。4个PMC,6个committer,造就了国内最大最专业的HBase技术团队,其中HBase内核中超过200+重要的feature是阿里贡献。集团内部超过万台的规模,单集群超过千台,全球领先。 二、HBase技术团队重磅发布X-Pack,NoSQL数据库再上新台阶 阿里云自从17年8月提供HBase云服务以来,到18年12月累计服务了上千大B客户,已经有上千个在线的集群。是阿里云增长最为快速的数据库服务,也是大B客户比例最高的云服务之一。并于6月6日全球第一个推出HBase 2.0,是HBase领域当之无愧的排头兵。 为了满足客户对数据库
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- MySQL8.0.19开启GTID主从同步CentOS8
- Hadoop3单机部署,实现最简伪集群
- CentOS8编译安装MySQL8.0.19
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Mario游戏-低调大师作品