Drill-on-YARN之源码解析
1. 概要
前面介绍了如何把Drill部署在YARN上,然后通过Drill-on-YARN客户端,你可以启动、停止、调整、清零命令操作Drill。但是在这么命令背后,到底是如何执行的呢,下面会对Drill-on-YARN的源码进行详细的解析,重点解析启动过程,其他命令简单介绍。
说明:下面涉及到的代码,以drill 1.14.0为准,并且为了减少篇幅,进行了删减。
2. Drill-on-YARN start
2.1 drill-on-yarn.sh
通过查看drill-on-yarn.sh脚本,很容易发现最终执行的java类是CLIENT_CMD="$JAVA $VM_OPTS -cp $CP org.apache.drill.yarn.client.DrillOnYarn ${args[@]}"
。 org.apache.drill.yarn.client.DrillOnYarn
便是启动Drill-on-YARN的入口。我们可以总览一下这个类:
public class DrillOnYarn { public static void main(String argv[]) { BasicConfigurator.configure(); ClientContext.init(); run(argv); } public static void run(String argv[]) { ClientContext context = ClientContext.instance(); CommandLineOptions opts = new CommandLineOptions(); if (!opts.parse(argv)) { opts.usage(); context.exit(-1); } if (opts.getCommand() == null) { opts.usage(); context.exit(-1); } try { DrillOnYarnConfig.load().setClientPaths(); } catch (DoyConfigException e) { ClientContext.err.println(e.getMessage()); context.exit(-1); } ClientCommand cmd; switch (opts.getCommand()) { case UPLOAD: cmd = new StartCommand(true, false); break; case START: cmd = new StartCommand(true, true); break; case DESCRIBE: cmd = new PrintConfigCommand(); break; case STATUS: cmd = new StatusCommand(); break; case STOP: cmd = new StopCommand(); break; case CLEAN: cmd = new CleanCommand(); break; case RESIZE: cmd = new ResizeCommand(); break; default: cmd = new HelpCommand(); } cmd.setOpts(opts); try { cmd.run(); } catch (ClientException e) { displayError(opts, e); context.exit(1); } } }
可以看到入口main方法,其中最关键的便是run方法,包含了很多的命令,我们重点看start命令,代码如下:
public void run() throws ClientException { checkExistingApp(); dryRun = opts.dryRun; config = DrillOnYarnConfig.config(); FileUploader uploader = upload(); if (launch) { launch(uploader); } }
概括的来说,它主要包含以下流程:
- 检查application是否已经存在,如果已经存在,便不允许启动,否则执行启动操作(此处检查的application是YARN的application,启动成功会将YARN的applicationId写入本地磁盘的一个文件,通过此文件来检查)。
-
上传Drill二方包和site目录下的内容至DFS上,其中site目录下的内容会被打包为site.tar.gz
public void run() throws ClientException { setup(); uploadDrillArchive(); if (hasSiteDir()) { uploadSite(); } }
-
启动ApplicationMaster,主要流程为:
-
创建YARN客户端,并启动
// AMRunner#connectToYarn private void connectToYarn() { System.out.print("Loading YARN Config..."); client = new YarnRMClient(); System.out.println(" Loaded."); }
-
创建ApplicationMaster
// AMRunner#createApp private void createApp() throws ClientException { try { appResponse = client.createAppMaster(); } catch (YarnClientException e) { throw new ClientException("Failed to allocate Drill application master", e); } appId = appResponse.getApplicationId(); System.out.println("Application ID: " + appId.toString()); }
- 设置ApplicationMaster上下文,包括:Heap memory、Class Path、启动的命令(dirll-am.sh)、启动am容器使用的资源(memory、vCores、disks)
- 校验资源,主要是ApplicationMaster使用资源是否超过了YARN的设置
-
提交ApplicationMaster
private void launchApp(AppSpec master) throws ClientException { try { client.submitAppMaster(master); } catch (YarnClientException e) { throw new ClientException("Failed to start Drill application master", e); } }
- 等待启动,并打印启动日志
- 将ApplicationMaster的appid写入文件(在第1步,检测Application是否存在,就是使用这个文件)
-
ApplicationMaster启动后,会向RM申请资源,启动Drillbits,下面详细介绍ApplicationMaster启动后的操作
2.2 drill-am.sh
通过查看drill-am.sh脚本,很容易发现最终执行的java类是AMCMD="$JAVA $AM_JAVA_OPTS ${args[@]} -cp $CP org.apache.drill.yarn.appMaster.DrillApplicationMaster"
。org.apache.drill.yarn.appMaster.DrillApplicationMaste
表示ApplicationMaster执行的入口,下面总览一下这个类:
public class DrillApplicationMaster { public static void main(String[] args) { LOG.trace("Drill Application Master starting."); try { DrillOnYarnConfig.load().setAmDrillHome(); } catch (DoyConfigException e) { System.err.println(e.getMessage()); System.exit(-1); } Dispatcher dispatcher; try { dispatcher = (new DrillControllerFactory()).build(); } catch (ControllerFactoryException e) { LOG.error("Setup failed, exiting: " + e.getMessage(), e); System.exit(-1); return; } try { if (!dispatcher.start()) { return; } } catch (Throwable e) { LOG.error("Fatal error, exiting: " + e.getMessage(), e); System.exit(-1); } WebServer webServer = new WebServer(dispatcher); try { webServer.start(); } catch (Exception e) { LOG.error("Web server setup failed, exiting: " + e.getMessage(), e); System.exit(-1); } try { dispatcher.run(); } catch (Throwable e) { LOG.error("Fatal error, exiting: " + e.getMessage(), e); System.exit(-1); } finally { try { webServer.close(); } catch (Exception e) { } } } }
概况的来说,它主要包含以下流程:
- 加载Drill-on-YARN的配置,并设置AM的DirllHome,比如
/home/admin/tmp2/hadoop/nm-local-dir/usercache/admin/appcache/application_1534698866098_0022/container_1534698866098_0022_01_000001/drill/apache-drill-1.14.0
-
构造Dispatcher,Dispatcher用于分配YARN、timer、ZooKeeper事件给给集群控制器,它是轻量级多线程的,用于响应RM、NM、timer线程的事件,对于某一个事件,它是连续的,所以需要同步,但是不同类型的事件不需要同步。整个的构造流程如下:
-
准备资源,包括:drill二方包、site压缩包的目录
private Map<String, LocalResource> prepareResources() { ... drillArchivePath = drillConfig.getDrillArchiveDfsPath(); siteArchivePath = drillConfig.getSiteArchiveDfsPath(); ... }
-
定义任务启动的规格(TaskSpec),包括:运行时环境、YARN container的规格、dirllbit的规格
private TaskSpec buildDrillTaskSpec(Map<String, LocalResource> resources) throws DoyConfigException { ... ContainerRequestSpec containerSpec = new ContainerRequestSpec(); containerSpec.memoryMb = config.getInt(DrillOnYarnConfig.DRILLBIT_MEMORY); ... LaunchSpec drillbitSpec = new LaunchSpec(); ... TaskSpec taskSpec = new TaskSpec(); taskSpec.name = "Drillbit"; taskSpec.containerSpec = containerSpec; taskSpec.launchSpec = drillbitSpec; }
-
设置Dispatcher的控制器:实现类为ClusterControllerImpl,它主要通过状态来控制Drill集群、调整整个集群的任务(Drill启动、停止等任务)、处理container的回调
public void setYarn(AMYarnFacade yarn) throws YarnFacadeException { this.yarn = yarn; controller = new ClusterControllerImpl(yarn); }
-
为控制器注册Scheduler,比如DrillbitScheduler,此外Scheduler配置来源于之前drill-on-yarn.conf
cluster: [ { name: "drill-group1" type: "basic" count: 1 } ]
... ClusterDef.ClusterGroup pool = ClusterDef.getCluster(config, 0); Scheduler testGroup = new DrillbitScheduler(pool.getName(), taskSpec, pool.getCount(), requestTimeoutSecs, maxExtraNodes); dispatcher.getController().registerScheduler(testGroup); ...
-
创建ZooKeeper集群协调器
String zkConnect = config.getString(DrillOnYarnConfig.ZK_CONNECT); String zkRoot = config.getString(DrillOnYarnConfig.ZK_ROOT); String clusterId = config.getString(DrillOnYarnConfig.CLUSTER_ID);
-
-
启动Dispatcher,主要启动AMRMClientAsync、NMClientAsync、YarnClient
... yarn.start(new ResourceCallback(), new NodeCallback()); String url = trackingUrl.replace("<port>", Integer.toString(httpPort)); if (DrillOnYarnConfig.config().getBoolean(DrillOnYarnConfig.HTTP_ENABLE_SSL)) { url = url.replace("http:", "https:"); } yarn.register(url); controller.started(); ...
... resourceMgr = AMRMClientAsync.createAMRMClientAsync(pollPeriodMs, resourceCallback); resourceMgr.init(conf); resourceMgr.start(); ... nodeMgr = NMClientAsync.createNMClientAsync(nodeCallback); nodeMgr.init(conf); nodeMgr.start(); ... client = YarnClient.createYarnClient(); client.init(conf); client.start(); ...
-
启动dirll运维界面
WebServer webServer = new WebServer(dispatcher); webServer.start();
-
运行Dispatcher,主要是启动一个线程,此线程会不断的轮询当前的任务队列中的任务情况,比如启动、停止、resize等类型的任务,然后执行相应的动作,拿启动来说
-
添加一个启动任务,然后放入pendingTask队列中
if (state == State.LIVE) { adjustTasks(curTime); requestContainers(); }
-
向RM请求container:创建一个ContainerRequest
ContainerRequest request = containerSpec.makeRequest(); resourceMgr.addContainerRequest(containerSpec.makeRequest()); return request;
-
ResourceCallback监听container分配,然后启动container
private class ResourceCallback implements AMRMClientAsync.CallbackHandler { @Override public void onContainersAllocated(List<Container> containers) { controller.containersAllocated(containers); } }
public void containerAllocated(EventContext context, Container container) { Task task = context.task; LOG.info(task.getLabel() + " - Received container: " + DoYUtil.describeContainer(container)); context.group.dequeueAllocatingTask(task); // No matter what happens below, we don't want to ask for this // container again. The RM async API is a bit bizarre in this // regard: it will keep asking for container over and over until // we tell it to stop. context.yarn.removeContainerRequest(task.containerRequest); // The container is need both in the normal and in the cancellation // path, so set it here. task.container = container; if (task.cancelled) { context.yarn.releaseContainer(container); taskStartFailed(context, Disposition.CANCELLED); return; } task.error = null; task.completionStatus = null; transition(context, LAUNCHING); // The pool that manages this task wants to know that we have // a container. The task manager may want to do some task- // specific setup. context.group.containerAllocated(context.task); context.getTaskManager().allocated(context); // Go ahead and launch a task in the container using the launch // specification provided by the task group (pool). try { context.yarn.launchContainer(container, task.getLaunchSpec()); task.launchTime = System.currentTimeMillis(); } catch (YarnFacadeException e) { LOG.error("Container launch failed: " + task.getContainerId(), e); // This may not be the right response. RM may still think // we have the container if the above is a local failure. task.error = e; context.group.containerReleased(task); task.container = null; taskStartFailed(context, Disposition.LAUNCH_FAILED); } }
-
NodeCallback监听container启动
public class NodeCallback implements NMClientAsync.CallbackHandler { @Override public void onStartContainerError(ContainerId containerId, Throwable t) { controller.taskStartFailed(containerId, t); } @Override public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) { controller.containerStarted(containerId); } @Override public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) { } @Override public void onGetContainerStatusError(ContainerId containerId, Throwable t) { } @Override public void onStopContainerError(ContainerId containerId, Throwable t) { controller.stopTaskFailed(containerId, t); } @Override public void onContainerStopped(ContainerId containerId) { controller.containerStopped(containerId); } }
-
2.3 fail over
Drill-on-YARN除了提供start、stop、resize功能外,还提供了fail over功能,当前某个drillbit挂掉后,Drill-on-YARN会尝试再次启动drillbit,目前重试的次数为2。此外,如果一个drillbit所在的节点频繁挂掉,会被列入黑名单。
我们可以通过手动kill drillbit来模拟drillbit挂掉的情况,然后等待一会儿,可以看到,drillbit进程重新启动了。下面我们看看,代码的执行流程
- drillbit挂掉,container结束
private class ResourceCallback implements AMRMClientAsync.CallbackHandler { @Override public void onContainersCompleted(List<ContainerStatus> statuses) { controller.containersCompleted(statuses); } }
- retry task:重新将这个task加入pendingTasks,然后轮询的线程检测到pendingTasks不为空,执行启动操作
protected void taskTerminated(EventContext context) { Task task = context.task; context.getTaskManager().completed(context); context.group.containerReleased(task); assert task.completionStatus != null; // container结束的状态不是0,说明不是正常结束 if (task.completionStatus.getExitStatus() == 0) { taskEnded(context, Disposition.COMPLETED); context.group.taskEnded(context.task); } else { taskEnded(context, Disposition.RUN_FAILED); retryTask(context); } }
private void retryTask(EventContext context) { Task task = context.task; assert task.state == END; if (!context.controller.isLive() || !task.retryable()) { context.group.taskEnded(task); return; } if (task.tryCount > task.taskGroup.getMaxRetries()) { LOG.error(task.getLabel() + " - Too many retries: " + task.tryCount); task.disposition = Disposition.TOO_MANY_RETRIES; context.group.taskEnded(task); return; } LOG.info(task.getLabel() + " - Retrying task, try " + task.tryCount); context.group.taskRetried(task); task.reset(); transition(context, START); context.group.enqueuePendingRequest(task); }
3. 停止
除了前面详情介绍的start命令外,Drill-on-YARN也提供了stop命令,其中stop分两种:
- 强制停止:直接调用yarn客户端的killApplication api
yarnClient.killApplication(appId);
- 优雅停止:先清理所有的任务,包括pending、running的,然后调用yarn的api杀死容器,关闭controller,然后通知am运行结束
... for (Task task : getStartingTasks()) { context.setTask(task); context.getState().cancel(context); } for (Task task : getActiveTasks()) { context.setTask(task); context.getState().cancel(context); } ...
... context.yarn.killContainer(task.container); ...
public void run() throws YarnFacadeException { ... boolean success = controller.waitForCompletion(); ... ... finish(success, null); ... }
public boolean waitForCompletion() { start(); synchronized (completionMutex) { try { completionMutex.wait(); } catch (InterruptedException e) { } } return succeeded(); }
public void finish(boolean succeeded, String msg) throws YarnFacadeException { nodeMgr.stop(); String appMsg = "Drill Cluster Shut-Down"; FinalApplicationStatus status = FinalApplicationStatus.SUCCEEDED; if (!succeeded) { appMsg = "Drill Cluster Fatal Error - check logs"; status = FinalApplicationStatus.FAILED; } if (msg != null) { appMsg = msg; } try { resourceMgr.unregisterApplicationMaster(status, appMsg, ""); } catch (YarnException | IOException e) { throw new YarnFacadeException("Deregister AM failed", e); } resourceMgr.stop(); }
4. resize
resize流程为:调整quantity(保留多少个container),之后轮询线程会根据quantity,调整任务,执行resize操作
public int resize(int level) { int limit = quantity + state.getController().getFreeNodeCount() +maxExtraNodes; return super.resize( Math.min( limit, level ) ); }
5. 总结
总的来说,Drill-on-YARN分为两大模块,drill-on-yarn.sh和drill-am.sh。drill-on-yarn.sh用于启动ApplicationMaster,drill-am.sh用于向ResourceManager申请资源并启动Drill集群。其中Drill的启动、停止、缩容、扩容,都被封装为一个任务,在执行这些命令时,会构建一个任务,放入任务队列中。有一个线程会一直轮询此队列,根据队列中的任务执行不同的操作,从而达到启动、停止、缩容、扩容Drill集群的功能。此外,相比独立部署,Drill-on-YARN提供的failover功能强化了Drill的稳定性。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
高级开发必须理解的Java中SPI机制
本文通过探析JDK提供的,在开源项目中比较常用的Java SPI机制,希望给大家在实际开发实践、学习开源项目提供参考。 一、SPI是什么 SPI全称Service Provider Interface,是Java提供的一套用来被第三方实现或者扩展的API,它可以用来启用框架扩展和替换组件。 整体机制图如下: Java SPI 实际上是“基于接口的编程+策略模式+配置文件”组合实现的动态加载机制。 系统设计的各个抽象,往往有很多不同的实现方案,在面向的对象的设计里,一般推荐模块之间基于接口编程,模块之间不对实现类进行硬编码。 一旦代码里涉及具体的实现类,就违反了可拔插的原则,如果需要替换一种实现,就需要修改代码。为了实现在模块装配的时候能不在程序里动态指明,这就需要一种服务发现机制。 Java SPI就是提供这样的一个机制:为某个接口寻找服务实现的机制。有点类似IOC的思想,就是将装配的控制权移到程序之外,在模块化设计中这个机制尤其重要。所以SPI的核心思想就是解耦。 二、使用场景 概括地说,适用于:调用者根据实际使用需要,启用、扩展、或者替换框架的实现策略 比较常见的例子: 数据库驱动...
- 下一篇
ThinkSNS Plus PHP开发概述
Plus (读音:[plʌs],全称:ThinkSNS+[θɪŋk es en es plʌs],是 ThinkSNS 系列产品一个重要版本,其软件识别名称为Plus即+) 是一个基于Latest Laravel框架进行开发的一个功能强大、易于开发和强拓展的社交系统。与其他开源社交程序不同的是 Plus 拥有多年社交系统经验,不仅易于上手,还便于应用拓展。另一方面,程序采用 PHP 7 严格模式,从根本上尽量避免弱级错误的产生。同时因为从零开始选择较好的带有较好 ORM 的原因,Plus 允许你更具你的需求使用不同数据库。 如果你想深入学习 Plus,我们为你准备了大量教程级文档。哪怕你不会 Laravel 框架,也能让你入门框架基础,并胜任 Plus 应用开发。 如果你是有经验的 PHPer,那么你可以了解现代流行框架差异,Laravel 就是现代留下框架的佼佼者之一。 #PHP 环境要求 重点 你可能还没有很好的 Liunx 知识,没关系,后面的教程会拟定你是零基础的前提下教学,但是下面的环境要求限制,你需要重点记忆,这是程序能否运行的关键所在! #PHP 版本 您的 ...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
-
Docker使用Oracle官方镜像安装(12C,18C,19C)
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8编译安装MySQL8.0.19
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
推荐阅读
最新文章
- 2048小游戏-低调大师作品
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- Mario游戏-低调大师作品
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS7,CentOS8安装Elasticsearch6.8.6
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- CentOS6,CentOS7官方镜像安装Oracle11G
- Docker使用Oracle官方镜像安装(12C,18C,19C)