您现在的位置是:首页 > 文章详情

datax源码阅读三:JobContainer

日期:2019-04-03点击:1427

前面介绍的python文件和Engine都是用于做初始化准备,真正的执行都是在这里完成,start代码如下:

/** * jobContainer主要负责的工作全部在start()里面,包括init、prepare、split、scheduler、 * post以及destroy和statistics */ @Override public void start() { LOG.info("DataX jobContainer starts job."); boolean hasException = false; boolean isDryRun = false; try { this.startTimeStamp = System.currentTimeMillis(); isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false); if(isDryRun) { LOG.info("jobContainer starts to do preCheck ..."); this.preCheck(); } else { userConf = configuration.clone(); LOG.debug("jobContainer starts to do preHandle ..."); this.preHandle(); LOG.debug("jobContainer starts to do init ..."); this.init(); LOG.info("jobContainer starts to do prepare ..."); this.prepare(); LOG.info("jobContainer starts to do split ..."); this.totalStage = this.split(); LOG.info("jobContainer starts to do schedule ..."); this.schedule(); LOG.debug("jobContainer starts to do post ..."); this.post(); LOG.debug("jobContainer starts to do postHandle ..."); this.postHandle(); LOG.info("DataX jobId [{}] completed successfully.", this.jobId); this.invokeHooks(); } } catch (Throwable e) { LOG.error("Exception when job run", e); hasException = true; if (e instanceof OutOfMemoryError) { this.destroy(); System.gc(); } if (super.getContainerCommunicator() == null) { // 由于 containerCollector 是在 scheduler() 中初始化的,所以当在 scheduler() 之前出现异常时,需要在此处对 containerCollector 进行初始化 AbstractContainerCommunicator tempContainerCollector; // standalone tempContainerCollector = new StandAloneJobContainerCommunicator(configuration); super.setContainerCommunicator(tempContainerCollector); } Communication communication = super.getContainerCommunicator().collect(); // 汇报前的状态,不需要手动进行设置 // communication.setState(State.FAILED); communication.setThrowable(e); communication.setTimestamp(this.endTimeStamp); Communication tempComm = new Communication(); tempComm.setTimestamp(this.startTransferTimeStamp); Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage); super.getContainerCommunicator().report(reportCommunication); throw DataXException.asDataXException( FrameworkErrorCode.RUNTIME_ERROR, e); } finally { if(!isDryRun) { this.destroy(); this.endTimeStamp = System.currentTimeMillis(); if (!hasException) { //最后打印cpu的平均消耗,GC的统计 VMInfo vmInfo = VMInfo.getVmInfo(); if (vmInfo != null) { vmInfo.getDelta(false); LOG.info(vmInfo.totalString()); LogUtil.logVmInfo(vmInfo); } LOG.info(PerfTrace.getInstance().summarizeNoException()); this.logStatistics(); } } } } 

主要执行流程为:

1、preHandle():job前置操作 2、init():初始化reader和writer 3、prepare():执行插件的prepare操作 4、split():切分任务 5、schedule():执行任务 6、post():执行插件的post操作 7、postHandle():job后置操作 8、invokeHooks():调用hook 9、输出统计结果

上述任务流是顺序执行的,第5步会将切分的task分配到多个taskGroup中并发执行,其中preHandle、postHandle、invokeHooks不影响整体执行,可以先忽略,下面主要介绍关键步骤
1)init()关键代码主要是初始化reader和writer插件

/** * reader和writer的初始化 */ private void init() { this.jobReader = this.initJobReader(jobPluginCollector); this.jobWriter = this.initJobWriter(jobPluginCollector); } /** * reader job的初始化,返回Reader.Job * * @return */ private Reader.Job initJobReader( JobPluginCollector jobPluginCollector) { this.readerPluginName = this.configuration.getString( CoreConstant.DATAX_JOB_CONTENT_READER_NAME); classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( PluginType.READER, this.readerPluginName)); Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin( PluginType.READER, this.readerPluginName); // 设置reader的jobConfig jobReader.setPluginJobConf(this.configuration.getConfiguration( CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER)); // 设置reader的readerConfig jobReader.setPeerPluginJobConf(this.configuration.getConfiguration( CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER)); jobReader.setJobPluginCollector(jobPluginCollector); jobReader.init(); classLoaderSwapper.restoreCurrentThreadClassLoader(); return jobReader; }

reader插件初始化的时候使用了自定义classLoader,这样可以做到插件级别的隔离,插件初始化完成之后调用了job的init函数,用于初始化插件的Job,writer插件初始化同理
2)prepare():prepare操作比较简单,分别执行reader和writer插件Job中的prepare函数即可,同样,每次执行前都会先加载对应的classLoader用于隔离

private void prepare() { this.prepareJobReader(); this.prepareJobWriter(); }

3)split():切分任务task

/** * 执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果, * 达到切分后数目相等,才能满足1:1的通道模型,所以这里可以将reader和writer的配置整合到一起, * 然后,为避免顺序给读写端带来长尾影响,将整合的结果shuffler掉 */ private int split() { this.adjustChannelNumber(); if (this.needChannelNumber <= 0) { this.needChannelNumber = 1; } List<Configuration> readerTaskConfigs = this .doReaderSplit(this.needChannelNumber); int taskNumber = readerTaskConfigs.size(); List<Configuration> writerTaskConfigs = this .doWriterSplit(taskNumber); List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER); LOG.debug("transformer configuration: "+ JSON.toJSONString(transformerList)); /** * 输入是reader和writer的parameter list,输出是content下面元素的list */ List<Configuration> contentConfig = mergeReaderAndWriterTaskConfigs( readerTaskConfigs, writerTaskConfigs, transformerList); LOG.debug("contentConfig configuration: "+ JSON.toJSONString(contentConfig)); this.configuration.set(CoreConstant.DATAX_JOB_CONTENT, contentConfig); return contentConfig.size(); }

上面函数主要分为3步:

1、计算限速和并发,即实际的channel数和每个channel的限速,主要在adjustChannelNumber()中,这里不做过多说明 2、根据实际的channel数,切分reader端,具体的切分逻辑reader插件可以自行实现 3、根据reader端切分的数目切分writer端,达到reader:writer=1:1,这样每个task中都包含一个reader和一个writer

4)schedule():执行切分出来的task

/** * schedule首先完成的工作是把上一步reader和writer split的结果整合到具体taskGroupContainer中, * 同时不同的执行模式调用不同的调度策略,将所有任务调度起来 */ private void schedule() { /** * 这里的全局speed和每个channel的速度设置为B/s */ int channelsPerTaskGroup = this.configuration.getInt( CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5); int taskNumber = this.configuration.getList( CoreConstant.DATAX_JOB_CONTENT).size(); this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber); PerfTrace.getInstance().setChannelNumber(needChannelNumber); /** * 通过获取配置信息得到每个taskGroup需要运行哪些tasks任务 */ List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration, this.needChannelNumber, channelsPerTaskGroup); LOG.info("Scheduler starts [{}] taskGroups.", taskGroupConfigs.size()); ExecuteMode executeMode = null; AbstractScheduler scheduler; try { executeMode = ExecuteMode.STANDALONE; scheduler = initStandaloneScheduler(this.configuration); //设置 executeMode for (Configuration taskGroupConfig : taskGroupConfigs) { taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, executeMode.getValue()); } if (executeMode == ExecuteMode.LOCAL || executeMode == ExecuteMode.DISTRIBUTE) { if (this.jobId <= 0) { throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, "在[ local | distribute ]模式下必须设置jobId,并且其值 > 0 ."); } } LOG.info("Running by {} Mode.", executeMode); this.startTransferTimeStamp = System.currentTimeMillis(); scheduler.schedule(taskGroupConfigs); this.endTransferTimeStamp = System.currentTimeMillis(); } catch (Exception e) { LOG.error("运行scheduler 模式[{}]出错.", executeMode); this.endTransferTimeStamp = System.currentTimeMillis(); throw DataXException.asDataXException( FrameworkErrorCode.RUNTIME_ERROR, e); } /** * 检查任务执行情况 */ this.checkLimit(); }

schedule执行过程主要分为以下几步:

1、计算taskGroup个数 2、将切分的task分配到taskGroup中 3、启动线程池执行taskGroup,具体代码流程为scheduler.schedule(taskGroupConfigs) -> AbstractScheduler.schedule -> startAllTaskGroup -> ProcessInnerScheduler.startAllTaskGroup -> this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner) -> TaskGroupContainerRunner.run() -> this.taskGroupContainer.start() 4、收集taskGroup汇报的信息

5)post():执行插件的post操作

private void post() { this.postJobWriter(); this.postJobReader(); }
原文链接:https://yq.aliyun.com/articles/696860
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章