MapReduce源码分析之新API作业提交(二):连接集群
MapReduce作业提交时连接集群是通过Job的connect()方法实现的,它实际上是构造集群Cluster实例cluster,代码如下:
private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { // 如果cluster为null,构造Cluster实例cluster, // Cluster为连接MapReduce集群的一种工具,提供了一种获取MapReduce集群信息的方法 if (cluster == null) { cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() { public Cluster run() throws IOException, InterruptedException, ClassNotFoundException { return new Cluster(getConfiguration()); } }); } }这个方法用synchronized关键字标识,处理逻辑为:如果cluster为null,构造Cluster实例cluster。
Cluster为连接MapReduce集群的一种工具,提供了一种获取MapReduce集群信息的方法,我们看下它的成员变量,如下所示:
// 客户端通信协议提供者 private ClientProtocolProvider clientProtocolProvider; // 客户端通信协议实例 private ClientProtocol client; // 用户信息 private UserGroupInformation ugi; // 配置信息 private Configuration conf; // 文件系统实例 private FileSystem fs = null; // 系统路径 private Path sysDir = null; // 阶段区域路径 private Path stagingAreaDir = null; // 作业历史路径 private Path jobHistoryDir = null; // 日志 private static final Log LOG = LogFactory.getLog(Cluster.class); // 客户端通信协议提供者加载器 private static ServiceLoader<ClientProtocolProvider> frameworkLoader = ServiceLoader.load(ClientProtocolProvider.class);Cluster最重要的两个成员变量是客户端通信协议提供者ClientProtocolProvider实例clientProtocolProvider,客户端通信协议ClientProtocol实例client,而后者是依托前者的create()方法生成的。
Cluster提供了两个构造函数,如下:
public Cluster(Configuration conf) throws IOException { this(null, conf); } public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { <span style="white-space:pre"> </span>// 设置配置信息 this.conf = conf; // 获取当前用户 this.ugi = UserGroupInformation.getCurrentUser(); // 调用initialize()方法完成初始化 initialize(jobTrackAddr, conf); }最终会调用initialize()方法完成初始化,代码如下:
// 确定客户端ClientProtocol实例client private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { synchronized (frameworkLoader) { // 取出每个ClientProtocolProvider实例provider,通过其create()方法, // 构造ClientProtocol实例clientProtocol, // 并将两者赋值给对类应成员变量,退出循环 for (ClientProtocolProvider provider : frameworkLoader) { LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName()); ClientProtocol clientProtocol = null; try { // 通过ClientProtocolProvider的create()方法,获取客户端与集群通讯ClientProtocol实例clientProtocol if (jobTrackAddr == null) { clientProtocol = provider.create(conf); } else { clientProtocol = provider.create(jobTrackAddr, conf); } // 设置类成员变量clientProtocolProvider、client,并退出循环 if (clientProtocol != null) { clientProtocolProvider = provider; client = clientProtocol; // 记录debug级别日志信息 LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider"); break; } else { // 记录debug级别日志信息 LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol"); } } catch (Exception e) { LOG.info("Failed to use " + provider.getClass().getName() + " due to error: " + e.getMessage()); } } } // 如果clientProtocolProvider、client任一为空,直接抛出IO异常 if (null == clientProtocolProvider || null == client) { throw new IOException( "Cannot initialize Cluster. Please check your configuration for " + MRConfig.FRAMEWORK_NAME + " and the correspond server addresses."); } }initialize()方法唯一的一个任务就是确定客户端通信协议提供者clientProtocolProvider,并通过其create()方法构造客户端通信协议ClientProtocol实例client。
MapReduce中,ClientProtocolProvider抽象类的实现共有YarnClientProtocolProvider、LocalClientProtocolProvider两种,前者为Yarn模式,而后者为Local模式。
我们先看下Yarn模式,看下YarnClientProtocolProvider的create()方法,代码如下:
@Override public ClientProtocol create(Configuration conf) throws IOException { // 如果参数mapreduce.framework.name配置的为yarn,构造一个YARNRunner实例并返回,否则返回null if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) { return new YARNRunner(conf); } return null; }Yarn模式下,如果参数mapreduce.framework.name配置的为yarn,构造一个YARNRunner实例并返回,否则返回null,关于YARNRunner,我们待会再讲,我们接着再看下Local模式,LocalClientProtocolProvider的create()方法,代码如下:
@Override public ClientProtocol create(Configuration conf) throws IOException { // 初始化framework:取参数mapreduce.framework.name,参数未配置默认为local String framework = conf.get(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME); // 如果framework是local,,则返回LocalJobRunner实例,并设置map任务数量为1,否则返回null if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) { return null; } conf.setInt(JobContext.NUM_MAPS, 1); return new LocalJobRunner(conf); }Local模式也是需要看参数mapreduce.framework.name的配置是否为local,是的话,返回LocalJobRunner实例,并设置map任务数量为1,否则返回null,值得一提的是,这里参数mapreduce.framework.name未配置的话,默认为local,也就是说,MapReduce需要看参数mapreduce.framework.name确定连接模式,但默认是Local模式的。
到了这里,我们就能够知道一个很重要的信息,Cluster中客户端通信协议ClientProtocol实例,要么是Yarn模式下的YARNRunner,要么就是Local模式下的LocalJobRunner,记住这点,对透彻了解MapReduce作业提交的整体流程非常重要。
好了,我们继续以Yarn模式来分析MapReduce集群连接,看下YARNRunner的实现,先看下它的成员变量,如下:
// 记录工厂RecordFactory实例 private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); // ResourceManager代理ResourceMgrDelegate实例 private ResourceMgrDelegate resMgrDelegate; // 客户端缓存ClientCache实例 private ClientCache clientCache; // 配置信息Configuration实例 private Configuration conf; // 文件上下文FileContext实例 private final FileContext defaultFileContext;
其中,最重要的一个变量就是ResourceManager代理ResourceMgrDelegate实例resMgrDelegate,Yarn模式下整个MapReduce客户端就是由它负责与Yarn集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息,其内部有一个YarnClient实例YarnClient,负责与Yarn进行通信,还有ApplicationId、ApplicationSubmissionContext等与特定应用程序相关的成员变量。关于ResourceMgrDelegate的详细介绍,请阅读《MapReduce源码分析ResourceMgrDelegate》一文,这里不再做详细介绍。
另外一个比较重要的变量就是客户端缓存ClientCache实例clientCache,
接下来,我们看下YARNRunner的构造函数,如下:
/** * Yarn runner incapsulates the client interface of * yarn * @param conf the configuration object for the client */ public YARNRunner(Configuration conf) { // 先构造ResourceManager代理ResourceMgrDelegate实例,然后再调用两个参数的构造函数 this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf))); } /** * Similar to {@link #YARNRunner(Configuration)} but allowing injecting * {@link ResourceMgrDelegate}. Enables mocking and testing. * @param conf the configuration object for the client * @param resMgrDelegate the resourcemanager client handle. */ public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) { // 先构造客户端缓存ClientCache实例,然后再调用三个参数的构造函数 this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate)); } /** * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)} * but allowing injecting {@link ClientCache}. Enable mocking and testing. * @param conf the configuration object * @param resMgrDelegate the resource manager delegate * @param clientCache the client cache object. */ public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate, ClientCache clientCache) { // 成员变量赋值 this.conf = conf; try { this.resMgrDelegate = resMgrDelegate; this.clientCache = clientCache; // 获取文件山下文FileContext实例defaultFileContext this.defaultFileContext = FileContext.getFileContext(this.conf); } catch (UnsupportedFileSystemException ufe) { throw new RuntimeException("Error in instantiating YarnClient", ufe); } }YARNRunner一共提供了三个构造函数,而我们之前说的WordCount作业提交时,其内部调用的是YARNRunner带有一个参数的构造函数,它会先构造ResourceManager代理ResourceMgrDelegate实例,然后再调用两个参数的构造函数,继而构造客户端缓存ClientCache实例,然后再调用三个参数的构造函数,而最终的构造函数只是进行简单的类成员变量赋值,然后通过FileContext的静态getFileContext()方法获取文件山下文FileContext实例defaultFileContext。
总结
MapReduce作业提交时连接集群是通过Job的connect()方法实现的,它实际上是构造集群Cluster实例cluster。Cluster为连接MapReduce集群的一种工具,提供了一种获取MapReduce集群信息的方法。在Cluster内部,有一个与集群进行通信的客户端通信协议ClientProtocol实例client,它由ClientProtocolProvider的静态create()方法构造,而Hadoop2.6.0中提供了两种模式的ClientProtocol,分别为Yarn模式的YARNRunner和Local模式的LocalJobRunner,Cluster实际上是由它们负责与集群进行通信的,而Yarn模式下,ClientProtocol实例YARNRunner对象内部有一个ResourceManager代理ResourceMgrDelegate实例resMgrDelegate,Yarn模式下整个MapReduce客户端就是由它负责与Yarn集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
基于超出内存可加载范围的数据集的逻辑回归分类器LR分类器
假如你想创建一个机器学习模型,但却发现你的输入数据集与你的计算机内存不相符?对于多机器的计算集群环境中通常可以使用如Hadoop和Apache Spark分布式计算工具。然而,Apache Spark能够在本地机器独立模式上,甚至在当输入数据集大于你的计算机内存时通过创建模型处理你的数据。 1.输入数据和预期结果 在上一篇文章我们讨论了“How To Find Simple And Interesting Multi-Gigabytes Data Set”,本文将使用上文中提及数据集的Posts.xml文件。文件大小是34.6千兆字节,这个xml文件包含stackoverflow.com文章数据作为xml属性: 标题 – 文章标题 主体 – 文章文本 标签 – 文章的标签列表 10+ 更多的xml -我们不需要使用的属性 关于stackoverflow.com的Posts.xml完整数据集信息请点击:https://archive.org/details/stackexchange. 另外我创建一个较小版本的这种文件,里面只有10个条目或文章。此文件包含一个小尺寸的原始数据集,这个数据...
- 下一篇
MapReduce源码分析之JobSubmitter(一)
JobSubmitter,顾名思义,它是MapReduce中作业提交者,而实际上JobSubmitter除了构造方法外,对外提供的唯一一个非private成员变量或方法就是submitJobInternal()方法,它是提交Job的内部方法,实现了提交Job的所有业务逻辑。本文,我们将深入研究MapReduce中用于提交Job的组件JobSubmitter。 首先,我们先看下JobSubmitter的类成员变量,如下: // 文件系统FileSystem实例 private FileSystem jtFs; // 客户端通信协议ClientProtocol实例 private ClientProtocol submitClient; // 提交作业的主机名 private String submitHostName; // 提交作业的主机地址 private String submitHostAddress; 它一共有四个类成员变量,分别为: 1、文件系统FileSystem实例jtFs:用于操作作业运行需要的各种文件等; 2、客户端通信协议ClientP...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8编译安装MySQL8.0.19
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Windows10,CentOS7,CentOS8安装Nodejs环境
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- 设置Eclipse缩进为4个空格,增强代码规范
- MySQL8.0.19开启GTID主从同步CentOS8