Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(一)
我们知道,如果想要在Yarn上运行MapReduce作业,仅需实现一个ApplicationMaster组件即可,而MRAppMaster正是MapReduce在Yarn上ApplicationMaster的实现,由其控制MR作业在Yarn上的执行。如此,随之而来的一个问题就是,MRAppMaster是如何控制MapReduce作业在Yarn上运行的,换句话说,MRAppMaster上MapReduce作业处理总流程是什么?这就是本文要研究的重点。
通过MRAppMaster类的定义我们就能看出,MRAppMaster继承自CompositeService,而CompositeService又继承自AbstractService,也就是说MRAppMaster也是Hadoop中的一种服务,我们看下服务启动的serviceStart()方法中关于MapReduce作业的处理,关键代码如下:
@SuppressWarnings("unchecked") @Override protected void serviceStart() throws Exception { // ......省略部分代码 // 调用createJob()方法创建作业Job实例job // /////////////////// Create the job itself. job = createJob(getConfig(), forcedState, shutDownMessage); // End of creating the job. // ......省略部分代码 // 作业初始化失败标志位initFailed默认为false,即初始化成功,没有错误 boolean initFailed = false; if (!errorHappenedShutDown) { // create a job event for job intialization // 创建一个Job初始化事件initJobEvent JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT); // Send init to the job (this does NOT trigger job execution) // This is a synchronous call, not an event through dispatcher. We want // job-init to be done completely here. // 调用jobEventDispatcher的handle()方法,处理Job初始化事件initJobEvent,即将Job初始化事件交由事件分发器jobEventDispatcher处理, jobEventDispatcher.handle(initJobEvent); // If job is still not initialized, an error happened during // initialization. Must complete starting all of the services so failure // events can be processed. // 获取Job初始化结果initFailed initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED); // JobImpl's InitTransition is done (call above is synchronous), so the // "uber-decision" (MR-1220) has been made. Query job and switch to // ubermode if appropriate (by registering different container-allocator // and container-launcher services/event-handlers). // ......省略部分代码 // Start ClientService here, since it's not initialized if // errorHappenedShutDown is true // 启动客户端服务clientService clientService.start(); } //start all the components // 调用父类的serviceStart(),启动所有组件 super.serviceStart(); // finally set the job classloader // 最终设置作业类加载器 MRApps.setClassLoader(jobClassLoader, getConfig()); if (initFailed) { // 如果作业初始化失败,构造作业初始化失败JOB_INIT_FAILED事件,并交由事件分发器jobEventDispatcher处理 JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED); jobEventDispatcher.handle(initFailedEvent); } else { // All components have started, start the job. // 调用startJobs()方法启动作业 startJobs(); } }
通过MRAppMaster服务启动的serviceStart()方法我们大致知道,MapReduce作业在MRAppMaster中经历了创建--初始化--启动三个主要过程,剪去枝叶,保留主干,具体如下:
1、创建:调用createJob()方法创建作业Job实例job;
2、初始化:
2.1、创建一个Job初始化事件initJobEvent;
2.2、调用jobEventDispatcher的handle()方法,处理Job初始化事件initJobEvent,即将Job初始化事件交由事件分发器jobEventDispatcher处理;
2.3、获取Job初始化结果initFailed;
2.4、如果作业初始化失败,构造作业初始化失败JOB_INIT_FAILED事件,并交由事件分发器jobEventDispatcher处理。
3、启动:调用startJobs()方法启动作业。
实际上,作业启动后不可能永远都不停止,MRAppMaster最终会将作业停止,这也是作业处理流程的第四步,即最后一步,作业停止!在哪里处理的呢?我们先卖个关子,请您暂时忽略这个问题,我们稍后会给出答案!
下面,我们针对MapReduce作业的上述三个主要过程,分别展开描述。
一、创建
首先看作业创建,createJob()方法如下:
/** Create and initialize (but don't start) a single job. * @param forcedState a state to force the job into or null for normal operation. * @param diagnostic a diagnostic message to include with the job. */ protected Job createJob(Configuration conf, JobStateInternal forcedState, String diagnostic) { // create single job // 创建一个作业Job实例newJob,其实现为JobImpl Job newJob = new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(), taskAttemptListener, jobTokenSecretManager, jobCredentials, clock, completedTasksFromPreviousRun, metrics, committer, newApiCommitter, currentUser.getUserName(), appSubmitTime, amInfos, context, forcedState, diagnostic); // 将新创建的作业newJob的jobId与其自身的映射关系存储到应用运行上下文信息context中的jobs集合中 ((RunningAppContext) context).jobs.put(newJob.getID(), newJob); // 异步事件分发器dispatcher注册作业完成事件JobFinishEvent对应的事件处理器,通过createJobFinishEventHandler()方法获得 dispatcher.register(JobFinishEvent.Type.class, createJobFinishEventHandler()); // 返回新创建的作业newJob return newJob; } // end createJob()其主要逻辑如下:
1、创建一个作业Job实例newJob,其实现为JobImpl,传入作业艾迪jobId、应用尝试艾迪appAttemptID、任务尝试监听器taskAttemptListener、输出提交器committer、用户名currentUser.getUserName()、应用运行上下文信息context等关键成员变量;
2、将新创建的作业newJob的jobId与其自身的映射关系存储到应用运行上下文信息context中的jobs集合中;
3、异步事件分发器dispatcher注册作业完成事件JobFinishEvent对应的事件处理器,通过createJobFinishEventHandler()方法获得;
4、返回新创建的作业newJob。
关于作业创建中的一些细节,我们暂时先不做过多关注,留待以后的文章专门进行分析。这里,我们先重点看看第3步,异步事件分发器dispatcher注册作业完成事件JobFinishEvent对应的事件处理器,通过createJobFinishEventHandler()方法获得,而createJobFinishEventHandler()方法代码如下:
/** * create an event handler that handles the job finish event. * @return the job finish event handler. */ protected EventHandler<JobFinishEvent> createJobFinishEventHandler() { return new JobFinishEventHandler(); }也就是说,当作业被创建后,它就被定义了作业完成事件JobFinishEvent的处理器为JobFinishEventHandler,而JobFinishEventHandler的定义如下:
private class JobFinishEventHandler implements EventHandler<JobFinishEvent> { @Override public void handle(JobFinishEvent event) { // Create a new thread to shutdown the AM. We should not do it in-line // to avoid blocking the dispatcher itself. new Thread() { @Override public void run() { shutDownJob(); } }.start(); } }这就是我们上面没有详细介绍的第四步--作业停止,它最终是调用的shutDownJob()方法,并开启一个新的线程来完成作业停止的,我们稍后再做介绍。
二、初始化
我们再来看作业的初始化,它是通过创建一个Job初始化事件JobEvent实例initJobEvent,事件类型为JobEventType.JOB_INIT,然后交由事件分发器jobEventDispatcher处理的。我们先来看下这个jobEventDispatcher的定义及实例化,如下:
// 作业事件分发器 private JobEventDispatcher jobEventDispatcher;jobEventDispatcher是一个JobEventDispatcher类型的作业事件分发器,其实例化为:
this.jobEventDispatcher = new JobEventDispatcher();而JobEventDispatcher的定义如下:
private class JobEventDispatcher implements EventHandler<JobEvent> { @SuppressWarnings("unchecked") @Override public void handle(JobEvent event) { // 从应用运行上下文信息context中根据jobId获取Job实例,即JobImpl对象,调用其handle()方法,处理对应事件 ((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event); } }很简单,从应用运行上下文信息context中根据jobId获取Job实例,即JobImpl对象,调用其handle()方法,处理对应事件,而这个Job实例,还记得上面描述的吗,就是在Job最初被创建时,被添加到应用运行上下文信息context中jobs集合中的,key为jobId,value就是JobImpl对象。context的实现RunningAppContext中,根据jobId获取job实例的代码如下:
@Override public Job getJob(JobId jobID) { return jobs.get(jobID); }好了,我们就看下JobImpl中handle()方法是如何对类型为JobEventType.JOB_INIT的JobEvent进行处理的吧!
@Override /** * The only entry point to change the Job. */ public void handle(JobEvent event) { if (LOG.isDebugEnabled()) { LOG.debug("Processing " + event.getJobId() + " of type " + event.getType()); } try { writeLock.lock(); JobStateInternal oldState = getInternalState(); try { getStateMachine().doTransition(event.getType(), event); } catch (InvalidStateTransitonException e) { LOG.error("Can't handle this event at current state", e); addDiagnostic("Invalid event " + event.getType() + " on Job " + this.jobId); eventHandler.handle(new JobEvent(this.jobId, JobEventType.INTERNAL_ERROR)); } //notify the eventhandler of state change if (oldState != getInternalState()) { LOG.info(jobId + "Job Transitioned from " + oldState + " to " + getInternalState()); rememberLastNonFinalState(oldState); } } finally { writeLock.unlock(); } }最核心的就是通过语句getStateMachine().doTransition(event.getType(), event)进行处理,实际上这牵着到了Yarn中MapReduce作业的状态机,为了本文叙述的流畅性、简洁性、重点明确性,我们对于作业状态机先不做解释,这部分内容留待以后的文章专门进行介绍,这里你只要知道作业初始化最终是通过JobImpl静态内部类InitTransition的transition()方法来实现的就行。我们看下InitTransition的transition()方法,如下:
/** * Note that this transition method is called directly (and synchronously) * by MRAppMaster's init() method (i.e., no RPC, no thread-switching; * just plain sequential call within AM context), so we can trigger * modifications in AM state from here (at least, if AM is written that * way; MR version is). */ @Override public JobStateInternal transition(JobImpl job, JobEvent event) { // 调用作业度量指标体系metrics的submittedJob()方法,提交作业 job.metrics.submittedJob(job); // 调用作业度量指标体系metrics的preparingJob()方法,开始作业准备 job.metrics.preparingJob(job); // 新旧API创建不同的作业上下文JobContextImpl实例 if (job.newApiCommitter) { job.jobContext = new JobContextImpl(job.conf, job.oldJobId); } else { job.jobContext = new org.apache.hadoop.mapred.JobContextImpl( job.conf, job.oldJobId); } try { // 调用setup()方法,完成作业启动前的部分初始化工作 setup(job); // 设置作业job对应的文件系统fs job.fs = job.getFileSystem(job.conf); //log to job history // 创建作业已提交事件JobSubmittedEvent实例jse JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId, job.conf.get(MRJobConfig.JOB_NAME, "test"), job.conf.get(MRJobConfig.USER_NAME, "mapred"), job.appSubmitTime, job.remoteJobConfFile.toString(), job.jobACLs, job.queueName, job.conf.get(MRJobConfig.WORKFLOW_ID, ""), job.conf.get(MRJobConfig.WORKFLOW_NAME, ""), job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""), getWorkflowAdjacencies(job.conf), job.conf.get(MRJobConfig.WORKFLOW_TAGS, "")); // 将作业已提交事件JobSubmittedEvent实例jse封装成作业历史事件JobHistoryEvent交由作业的时事件处理器eventHandler处理 job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse)); //TODO JH Verify jobACLs, UserName via UGI? // 调用createSplits()方法,创建分片,并获取任务分片元数据信息TaskSplitMetaInfo数组taskSplitMetaInfo TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId); // 确定Map Task数目numMapTasks:分片元数据信息数组的长度,即有多少分片就有多少numMapTasks job.numMapTasks = taskSplitMetaInfo.length; // 确定Reduce Task数目numReduceTasks,取作业参数mapreduce.job.reduces,参数未配置默认为0 job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0); // 确定作业的map和reduce权重mapWeight、reduceWeight if (job.numMapTasks == 0 && job.numReduceTasks == 0) { job.addDiagnostic("No of maps and reduces are 0 " + job.jobId); } else if (job.numMapTasks == 0) { job.reduceWeight = 0.9f; } else if (job.numReduceTasks == 0) { job.mapWeight = 0.9f; } else { job.mapWeight = job.reduceWeight = 0.45f; } checkTaskLimits(); // 根据分片元数据信息计算输入长度inputLength,也就是作业大小 long inputLength = 0; for (int i = 0; i < job.numMapTasks; ++i) { inputLength += taskSplitMetaInfo[i].getInputDataLength(); } // 根据作业大小inputLength,调用作业的makeUberDecision()方法,决定作业运行模式是Uber模式还是Non-Uber模式 job.makeUberDecision(inputLength); // 根据作业的Map、Reduce任务数目之和,外加10, // 初始化任务尝试完成事件TaskAttemptCompletionEvent列表taskAttemptCompletionEvents job.taskAttemptCompletionEvents = new ArrayList<TaskAttemptCompletionEvent>( job.numMapTasks + job.numReduceTasks + 10); // 根据作业的Map任务数目,外加10, // 初始化Map任务尝试完成事件TaskCompletionEvent列表mapAttemptCompletionEvents job.mapAttemptCompletionEvents = new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10); // 根据作业的Map、Reduce任务数目之和,外加10, // 初始化列表taskCompletionIdxToMapCompletionIdx job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>( job.numMapTasks + job.numReduceTasks + 10); // 确定允许Map、Reduce任务失败百分比, // 取参数mapreduce.map.failures.maxpercent、mapreduce.reduce.failures.maxpercent, // 参数未配置均默认为0,即不允许Map和Reduce任务失败 job.allowedMapFailuresPercent = job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0); job.allowedReduceFailuresPercent = job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0); // create the Tasks but don't start them yet // 创建Map Task createMapTasks(job, inputLength, taskSplitMetaInfo); // 创建Reduce Task createReduceTasks(job); // 调用作业度量指标体系metrics的endPreparingJob()方法,结束作业准备 job.metrics.endPreparingJob(job); // 返回作业内部状态,JobStateInternal.INITED,即已经初始化 return JobStateInternal.INITED; } catch (Exception e) { // 记录warn级别日志信息:Job init failed,并打印出具体异常 LOG.warn("Job init failed", e); // 调用作业度量指标体系metrics的endPreparingJob()方法,结束作业准备 job.metrics.endPreparingJob(job); job.addDiagnostic("Job init failed : " + StringUtils.stringifyException(e)); // Leave job in the NEW state. The MR AM will detect that the state is // not INITED and send a JOB_INIT_FAILED event. // 返回作业内部状态,JobStateInternal.NEW,即初始化失败后的新建 return JobStateInternal.NEW; } }为了主体逻辑清晰,我们去掉部分细节,保留主干,将作业初始化总结如下:
1、调用setup()方法,完成作业启动前的部分初始化工作,实际上最重要的两件事就是:
1.1、获取并设置作业远程提交路径remoteJobSubmitDir;
1.2、获取并设置作业远程配置文件remoteJobConfFile;
2、调用createSplits()方法,创建分片,并获取任务分片元数据信息TaskSplitMetaInfo数组taskSplitMetaInfo:
通过SplitMetaInfoReader的静态方法readSplitMetaInfo(),从作业远程提交路径remoteJobSubmitDir中读取作业分片元数据信息,也就是每个任务的分片元数据信息,以此确定Map任务数、作业运行方式等一些列后续内容;
3、确定Map Task数目numMapTasks:分片元数据信息数组的长度,即有多少分片就有多少numMapTasks;
4、确定Reduce Task数目numReduceTasks,取作业参数mapreduce.job.reduces,参数未配置默认为0;
5、根据分片元数据信息计算输入长度inputLength,也就是作业大小;
6、根据作业大小inputLength,调用作业的makeUberDecision()方法,决定作业运行模式是Uber模式还是Non-Uber模式:
小作业会通过Uber模式运行,相反,大作业会通过Non-Uber模式运行,可参见《Yarn源码分析之MRAppMaster:作业运行方式Local、Uber、Non-Uber》一文!
7、确定允许Map、Reduce任务失败百分比,取参数mapreduce.map.failures.maxpercent、mapreduce.reduce.failures.maxpercent,参数未配置均默认为0,即不允许Map和Reduce任务失败;
8、创建Map Task;
9、创建Reduce Task;
10、返回作业内部状态,JobStateInternal.INITED,即已经初始化;
11、如果出现异常:
11.1、记录warn级别日志信息:Job init failed,并打印出具体异常;
11.2、返回作业内部状态,JobStateInternal.NEW,即初始化失败后的新建;
未完待续,后续作业初始化部分详细描述、作业启动、作业停止等内容,请关注《Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(二)》。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
CCAH-CCA-500-4题:Where are Hadoop task log files stored?
4.Where are Hadoop task log files stored? For each YARN job, the Hadoop framework generates task log file. Where are Hadoop task log files stored? A. Cached by the NodeManager managing the job containers, then written to a log directory on the NameNode B. Cached in the YARN container running the task, then copied into HDFS on job completion C. In HDFS, in the directory of the user who generates the job D. On the local disk of the slave mode running the task 问题: 对于每个yarn job,hadoop框架产生的task日志文件存...
- 下一篇
Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(二)
本文继《Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(一)》,接着讲述MapReduce作业在MRAppMaster上处理总流程,继上篇讲到作业初始化之后的作业启动,关于作业初始化主体流程的详细介绍,请参见《Yarn源码分析之MRAppMaster上MapReduce作业初始化解析》一文。 (三)启动 作业的启动是通过MRAppMaster的startJobs()方法实现的,其代码如下: /** * This can be overridden to instantiate multiple jobs and create a * workflow. * * TODO: Rework the design to actually support this. Currently much of the * job stuff has been moved to init() above to support uberization (MR-1220). * In a typical workflow, one presumably would...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- CentOS7设置SWAP分区,小内存服务器的救世主
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS7安装Docker,走上虚拟化容器引擎之路
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- CentOS关闭SELinux安全模块