您现在的位置是:首页 > 文章详情

MapReduce深入源码分析job提交的整个过程

日期:2021-06-01点击:543

/**    * Submit the job to the cluster and wait for it to finish.    * @param verbose print the progress to the user    * @return true if the job succeeded    * @throws IOException thrown if the communication with the     *         JobTracker is lost    */   public boolean waitForCompletion(boolean verbose                                    ) throws IOException, InterruptedException,                                             ClassNotFoundException {     if (state == JobState.DEFINE) {       submit(); //提交作业     }     if (verbose) {       monitorAndPrintJob();     } else {       // get the completion poll interval from the client.       int completionPollIntervalMillis =          Job.getCompletionPollInterval(cluster.getConf());       while (!isComplete()) {         try {           Thread.sleep(completionPollIntervalMillis);         } catch (InterruptedException ie) {         }       }     }     return isSuccessful();   }

进入submit

/**    * Submit the job to the cluster and return immediately.    * @throws IOException    */   public void submit()           throws IOException, InterruptedException, ClassNotFoundException {     ensureState(JobState.DEFINE);     setUseNewAPI(); //老旧的API进行转换 属性兼容性的考虑     connect(); //客户端和集群之间的连接,连接可以是本地的,也可以是yarn     final JobSubmitter submitter =          getJobSubmitter(cluster.getFileSystem(), cluster.getClient());     status = ugi.doAs(new PrivilegedExceptionAction() {       public JobStatus run() throws IOException, InterruptedException,        ClassNotFoundException {         return submitter.submitJobInternal(Job.this, cluster);       }     });     state = JobState.RUNNING;     LOG.info("The url to track the job: " + getTrackingURL());    }

连接时判断集群,如果为空,

private synchronized void connect()           throws IOException, InterruptedException, ClassNotFoundException {     if (cluster == null) {       cluster =          ugi.doAs(new PrivilegedExceptionAction() {                    public Cluster run()                           throws IOException, InterruptedException,                                   ClassNotFoundException {                      return new Cluster(getConfiguration());                    }                  });     }   }

获取切片信息

  private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,       Path jobSubmitDir) throws IOException,       InterruptedException, ClassNotFoundException {     JobConf jConf = (JobConf)job.getConfiguration();     int maps;     if (jConf.getUseNewMapper()) {       maps = writeNewSplits(job, jobSubmitDir);     } else {       maps = writeOldSplits(jConf, jobSubmitDir);     }     return maps;   }

获取块的大小信息goalSize Long的长度大小

protected long computeSplitSize(long goalSize, long minSize,                                        long blockSize) {     return Math.max(minSize, Math.min(goalSize, blockSize));   }      调整切片任务的大小

切片信息

 long bytesRemaining = length;           while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {             String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,                 length-bytesRemaining, splitSize, clusterMap);             splits.add(makeSplit(path, length-bytesRemaining, splitSize,                 splitHosts[0], splitHosts[1]));             bytesRemaining -= splitSize;           }

如果是相除大于1.1倍就不切片,小于1.1倍切片

private static final double SPLIT_SLOP = 1.1;

一个个文件的切片

大概流程

客户端代码

waitForCompletion()

源码

submit();

1. 建立连接

 connect();

1)创建提交job的代理

 new Cluster(getConfiguration());

(1)判断是本地yarn还是远程

 initialize(jobTrackAddr, conf)

2. 提交job

submitter.submitJobInternal(Job.this, cluster)

1)创建给集群提交数据的Stag路径

Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

  1. 获取jobId 并创建job路径

     JobID jobId = submitClient.getNewJobID();

3)拷贝jar包到集群

copyAndConfigureFiles(job, submitJobDir); rUploader.uploadFiles(job, jobSubmitDir);

4)计算切片,生成切片规划文件

writeSplits(job, submitJobDir); maps = writeNewSplits(job, jobSubmitDir); input.getSplits(job);

5)向Stag路径写xml配置文件

writeConf(conf, submitJobFile); conf.writeXml(out);

6)提交job,返回提交状态

status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

               

原文链接:https://blog.51cto.com/bigdata/2842231
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章