Spark作业调度

    Spark在任务提交时,主要存在于Driver和Executor的两个节点.

(1)Driver的作用: 用于将所有要处理的RDD的操作转化为DAG,并且根据RDD DAG将JBO分割为多个Stage,最后生成相应的task,分发到各个Executor执行.

流程:sc.runJob -> DAGScheduler.runJob ->submitJob ->DAGEventProcessActor ->dagScheduler.handleJobSubmitted ->submitStage ->submitMissingTasks ->taskScheduler.submitTasks -> schedulerBackend.reviveOffers ->ReviveOffers ->DriverActor ->makeOffers -> resourceOffers ->launchTasks ->CoarseGrainedExecutorBackend(Executor)

其中handleJobSubmitted和submitStage主要负责依赖性分析,生成finalStage,根据finalStage来生成job.

源码newStage用来创建一个新的Stage

private def newStage(
        rdd:RDD[],
        numTasks: Int,
        shuffleDep: Option[ShuffleDependency[_,_,_]],
        jobId:Int,
        callSite:CallSite)
    :stage =
    {
        val id = nextStageId.getAndIncrement()
        val stage = new Stage(id,rdd,numTasks,shuffleDep,getParentStages(rdd,jobId),jobId,callSite)
        stageIdToStage(id) = stage 
        updateJobIdStageIdMaps(jobId,stage)
        stageToInfos(stage) = StageInfo.fromStage(stage)
        stage
}

spark在创建一个Stage之前,必须知道该Stage需要从多少个Partition读入数据,据此来创建Task数。源码Stage:

private[spark] class stage(
    val id:Int //stage的序号越大,数值越大
    val rdd: RDD[_], //归属于本stage的最后一个rdd
    val numTasks:Int, //创建的Task的数目,等于父rdd的输出Partition数目
    
    val shuffleDep:Option[ShuffleDependency[_,_,_]],//是否存在shuffle
    val parents:List[Stage],//父stage列表
    val jobId:Int,//作业id
    val callSite:CallSite)

Stage的划分的重要依据就在于是否有Shuffle操作,既宽依赖(RDD的宽依赖和窄依赖请参考前文,或者百度- -),如果有,则创建一个新的stage.Stage的划分完毕就明确了很多内容了,如下:

(1)产生的stage需要从多少个Partition中读取数据

(2)产生的stage会生成多少个Partition

(3)产生的stage是否属于shuffle

当确认了有多少个Partition,其实就确认了有多少个task。

 

当作业提交及执行期间,Spark集群中存在大量的消息的交互,所以使用AKKA 进行消息的接收,消息的处理和消息的发送。

下面开始在各个Executor中执行Task。然而Task又被分为ShuffleMapTask和ResultTask两种,相当于Hadoop的Map和Reduce.每个Stage根据isShuffleMap来标记确定Task类型,来区分ShuffleMapTask和ResultTask.一旦task类型和数量确定,下来就分发到各个executor,由Executor启动县城来执行。(从计划到执行)

TaskschedulerImple发送ReviveOffers消息给DriverActor,DriverActor在收到ReviveOffers消息后,调用makeOffers函数进行处理。源码如下:

def makeOffers(){
    launchTasks(scheduler.resourceOffers(
    executorHost.toArray.map{case(id,host)=>new WorkerOffer(id,host,freeCores(id))}))

makeOffers函数主要用来找寻空闲的Executor,随机分发,尽可能的将任务平摊到各个executor中。发现有空闲的Executor,将任务列表中的部分任务利用launchTasks发送给制定的Executor.Task执行完毕.

优秀的个人博客,低调大师

微信关注我们

原文链接:https://yq.aliyun.com/articles/609167

转载内容版权归作者及来源网站所有!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

相关文章

发表评论

资源下载

更多资源
Mario,低调大师唯一一个Java游戏作品

Mario,低调大师唯一一个Java游戏作品

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Eclipse(集成开发环境)

Eclipse(集成开发环境)

Eclipse 是一个开放源代码的、基于Java的可扩展开发平台。就其本身而言,它只是一个框架和一组服务,用于通过插件组件构建开发环境。幸运的是,Eclipse 附带了一个标准的插件集,包括Java开发工具(Java Development Kit,JDK)。

Java Development Kit(Java开发工具)

Java Development Kit(Java开发工具)

JDK是 Java 语言的软件开发工具包,主要用于移动设备、嵌入式设备上的java应用程序。JDK是整个java开发的核心,它包含了JAVA的运行环境(JVM+Java系统类库)和JAVA工具。

Sublime Text 一个代码编辑器

Sublime Text 一个代码编辑器

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。