MapReduce深入源码分析job提交的整个过程
/** * Submit the job to the cluster and wait for it to finish. * @param verbose print the progress to the user * @return true if the job succeeded * @throws IOException thrown if the communication with the * JobTracker is lost */ public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); //提交作业 } if (verbose) { monitorAndPrintJob(); } else { // get the completion poll interval from the client. int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try { Thread.sleep(completionPollIntervalMillis); } catch (InterruptedException ie) { } } } return isSuccessful(); }
进入submit
/** * Submit the job to the cluster and return immediately. * @throws IOException */ public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI(); //老旧的API进行转换 属性兼容性的考虑 connect(); //客户端和集群之间的连接,连接可以是本地的,也可以是yarn final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster); } }); state = JobState.RUNNING; LOG.info("The url to track the job: " + getTrackingURL()); }
连接时判断集群,如果为空,
private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { if (cluster == null) { cluster = ugi.doAs(new PrivilegedExceptionAction() { public Cluster run() throws IOException, InterruptedException, ClassNotFoundException { return new Cluster(getConfiguration()); } }); } }
获取切片信息
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { JobConf jConf = (JobConf)job.getConfiguration(); int maps; if (jConf.getUseNewMapper()) { maps = writeNewSplits(job, jobSubmitDir); } else { maps = writeOldSplits(jConf, jobSubmitDir); } return maps; }
获取块的大小信息goalSize Long的长度大小
protected long computeSplitSize(long goalSize, long minSize, long blockSize) { return Math.max(minSize, Math.min(goalSize, blockSize)); } 调整切片任务的大小
切片信息
long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length-bytesRemaining, splitSize, clusterMap); splits.add(makeSplit(path, length-bytesRemaining, splitSize, splitHosts[0], splitHosts[1])); bytesRemaining -= splitSize; }
如果是相除大于1.1倍就不切片,小于1.1倍切片
private static final double SPLIT_SLOP = 1.1;
一个个文件的切片
大概流程
客户端代码
waitForCompletion()
源码
submit();
1. 建立连接
connect();
1)创建提交job的代理
new Cluster(getConfiguration());
(1)判断是本地yarn还是远程
initialize(jobTrackAddr, conf)
2. 提交job
submitter.submitJobInternal(Job.this, cluster)
1)创建给集群提交数据的Stag路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
获取jobId 并创建job路径
JobID jobId = submitClient.getNewJobID();
3)拷贝jar包到集群
copyAndConfigureFiles(job, submitJobDir); rUploader.uploadFiles(job, jobSubmitDir);
4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir); maps = writeNewSplits(job, jobSubmitDir); input.getSplits(job);
5)向Stag路径写xml配置文件
writeConf(conf, submitJobFile); conf.writeXml(out);
6)提交job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
快速入门Kafka系列(6)——Kafka的JavaAPI操作
作为快速入门Kafka系列的第六篇博客,本篇为大家带来的是Kafka的JavaAPI操作~ 码字不易,先赞后看! 文章目录 1. 创建Maven工程并添加jar包 2. 生产者代码 4. Kafka Streams API开发 3.1 自动提交offset 3.2 手动提交offset 3.3 消费完每个分区之后手动提交offset 3.4 指定分区数据进行消费 3.5 重复消费与数据丢失 1. 使用生产者,生产数据 2. kafka当中的数据分区 3. 消费者代码 4.1 创建一个Topic 4.2 开发StreamsAPI Kafka的JavaAPI操作 Kafka的JavaAPI操作 1. 创建Maven工程并添加jar包 首先在IDEA中我们创建一个maven工程,并添加以下依赖的jar包的坐标到pom.xml <dependencies> <dependency> <groupId>org.apache.kafkagroupId> <artifactId>kafka-clientsartifactId> <v...
- 下一篇
从零开始学keras之使用预训练的卷积神经网络
想要将深度学习应用于小型图像数据集,一种常用且非常高效的方法是使用预训练网络。 预训练网络(pretrained network)是一个保存好的网络,之前已在大型数据集(通常是大规模图像分类任务)上训练好。如果这个原始数据集足够大且足够通用,那么预训练网络学到的特征的空间层次结构可以有效地作为视觉世界的通用模型,因此这些特征可用于各种不同的计算机视觉问题,即使这些新问题涉及的类别和原始任务完全不同。举个例子,你在 ImageNet 上训练了一个网络(其类别主要是动物和日常用品),然后将这个训练好的网络应用于某个不相干的任务,比如在图像中识别家具。这种学到的特征在不同问题之间的可移植性,是深度学习与许多早期浅层学习方法相比的重要优势,它使得深度学习对小数据问题非常有效。 本例中,假设有一个在 ImageNet 数据集(140 万张标记图像,1000 个不同的类别)上训练好的大型卷积神经网络。ImageNet 中包含许多动物类别,其中包括不同种类的猫和狗,因此可以认为它在猫狗分类问题上也能有良好的表现。 我们将使用 VGG16 架构,它由 Karen Simonyan 和 Andrew Z...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS6,CentOS7官方镜像安装Oracle11G
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS关闭SELinux安全模块
- CentOS7设置SWAP分区,小内存服务器的救世主
- Docker安装Oracle12C,快速搭建Oracle学习环境