Spark作业调度阶段分析[转]
Spark作为分布式的大数据处理框架必然或涉及到大量的作业调度,如果能够理解Spark中的调度对我们编写或优化Spark程序都是有很大帮助的;
在Spark中存在转换操作(Transformation Operation)与 行动操作(Action Operation)两种;而转换操作只是会从一个RDD中生成另一个RDD且是lazy的,Spark中只有行动操作(Action Operation)才会触发作业的提交,从而引发作业调度;在一个计算任务中可能会多次调用 转换操作这些操作生成的RDD可能存在着依赖关系,而由于转换都是lazy所以当行动操作(Action Operation )触发时才会有真正的RDD生成,这一系列的RDD中就存在着依赖关系形成一个DAG(Directed Acyclc Graph),在Spark中DAGScheuler是基于DAG的顶层调度模块;
相关名词
Application:使用Spark编写的应用程序,通常需要提交一个或多个作业;
Job:在触发RDD Action操作时产生的计算作业
Task:一个分区数据集中最小处理单元也就是真正执行作业的地方
TaskSet:由多个Task所组成没有Shuffle依赖关系的任务集
Stage:一个任务集对应的调度阶段 ,每个Job会被拆分成诺干个Stage
RDD Action作业提交流程
这里根据Spark源码跟踪触发Action操作时触发的Job提交流程,Count()是RDD中的一个Action操作所以调用Count时会触发Job提交;
在RDD源码count()调用SparkContext的runJob,在runJob方法中根据partitions(分区)大小创建Arrays存放返回结果;
RDD.scala /** * Return the number of elements in the RDD. */ def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum SparkContext.scala def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) }
在SparkContext中将调用DAGScheduler的runJob方法提交作业,DAGScheduler主要任务是计算作业与任务依赖关系,处理调用逻辑;DAGScheduler提供了submitJob与runJob方法用于 提交作业,runJob方法会一直等待作业完成,submitJob则返回JobWaiter对象可以用于判断作业执行结果;
在runJob方法中将调用submitJob,在submitJob中把提交操作放入到事件循环队列(DAGSchedulerEventProcessLoop)中;
def submitJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = { ...... eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) ...... }
在事件循环队列中将调用eventprocessLoop的onReceive方法;
Stage拆分
提交作业时DAGScheduler会从RDD依赖链尾部开始,遍历整个依赖链划分调度阶段;划分阶段以ShuffleDependency为依据,当没有ShuffleDependency时整个Job 只会有一个Stage;在事件循环队列中将会调用DAGScheduler的handleJobSubmitted方法,此方法会拆分Stage、提交Stage;
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null ...... finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite) ...... val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) ...... val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) submitStage(finalStage) submitWaitingStages() }
调度阶段提交
在提交Stage时会先调用getMissingParentStages获取父阶段Stage,迭代该阶段所依赖的父调度阶段如果存在则先提交该父阶段的Stage 当不存在父Stage或父Stage执行完成时会对当前Stage进行提交;
private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) if (missing.isEmpty) { submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } ...... }
参考资料:
http://spark.apache.org/docs/latest/
本文链接:http://www.solinx.co/archives/579#comment-47
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
推荐一个linux 技术站 技术资料大全的导航站
推荐一个linux 技术站 技术资料大全的导航站 这里是全手工挑选的精品技术站点集合有需要的可以收藏下,所谓一站到手天下有我。 http://www.ailinux.net
- 下一篇
异步化,高并发大杀器
今天来聊聊如何让项目异步化的一些事。 1.同步和异步,阻塞和非阻塞 同步和异步,阻塞和非阻塞, 这个几个词已经是老生常谈,当时常常还是有很多同学分不清楚,以为同步肯定就是阻塞,异步肯定就是非阻塞,其他他们不是一回事。 同步和异步关注的是结果消息的通信机制 同步:同步的意思就是调用方需要主动等待结果的返回 异步:异步的意思就是不需要主动等待结果的返回,而是通过其他手段比如,状态通知,回调函数等。 阻塞和非阻塞主要关注的是等待结果返回调用方的状态 阻塞:是指结果返回之前,当前线程被挂起,不做任何事 非阻塞:是指结果在返回之前,线程可以做一些其他事,不会被挂起。 可以看见同步和异步,阻塞和非阻塞主要关注的点不同,有人会问同步还能非阻塞,异步还能阻塞?当然是可以的,下面为了更好的说明他们的组合之间的意思,用几个简单的例子说明: 1.同步阻塞:同步阻塞基本也是编程中最常见的模型,打个比方你去商店买衣服,你去了之后发现衣服卖完了,那你就在店里面一直等,期间不做任何事(包括看手机),等着商家进货,直到有货为止,这个效率很低。 2.同步非阻塞:同步非阻塞在编程中可以抽象为一个轮询模式,你去了商店之后,...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS关闭SELinux安全模块
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8编译安装MySQL8.0.19
- CentOS7安装Docker,走上虚拟化容器引擎之路
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- CentOS7,8上快速安装Gitea,搭建Git服务器
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS7设置SWAP分区,小内存服务器的救世主