datax源码阅读三:JobContainer
前面介绍的python文件和Engine都是用于做初始化准备,真正的执行都是在这里完成,start代码如下:
/** * jobContainer主要负责的工作全部在start()里面,包括init、prepare、split、scheduler、 * post以及destroy和statistics */ @Override public void start() { LOG.info("DataX jobContainer starts job."); boolean hasException = false; boolean isDryRun = false; try { this.startTimeStamp = System.currentTimeMillis(); isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false); if(isDryRun) { LOG.info("jobContainer starts to do preCheck ..."); this.preCheck(); } else { userConf = configuration.clone(); LOG.debug("jobContainer starts to do preHandle ..."); this.preHandle(); LOG.debug("jobContainer starts to do init ..."); this.init(); LOG.info("jobContainer starts to do prepare ..."); this.prepare(); LOG.info("jobContainer starts to do split ..."); this.totalStage = this.split(); LOG.info("jobContainer starts to do schedule ..."); this.schedule(); LOG.debug("jobContainer starts to do post ..."); this.post(); LOG.debug("jobContainer starts to do postHandle ..."); this.postHandle(); LOG.info("DataX jobId [{}] completed successfully.", this.jobId); this.invokeHooks(); } } catch (Throwable e) { LOG.error("Exception when job run", e); hasException = true; if (e instanceof OutOfMemoryError) { this.destroy(); System.gc(); } if (super.getContainerCommunicator() == null) { // 由于 containerCollector 是在 scheduler() 中初始化的,所以当在 scheduler() 之前出现异常时,需要在此处对 containerCollector 进行初始化 AbstractContainerCommunicator tempContainerCollector; // standalone tempContainerCollector = new StandAloneJobContainerCommunicator(configuration); super.setContainerCommunicator(tempContainerCollector); } Communication communication = super.getContainerCommunicator().collect(); // 汇报前的状态,不需要手动进行设置 // communication.setState(State.FAILED); communication.setThrowable(e); communication.setTimestamp(this.endTimeStamp); Communication tempComm = new Communication(); tempComm.setTimestamp(this.startTransferTimeStamp); Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage); super.getContainerCommunicator().report(reportCommunication); throw DataXException.asDataXException( FrameworkErrorCode.RUNTIME_ERROR, e); } finally { if(!isDryRun) { this.destroy(); this.endTimeStamp = System.currentTimeMillis(); if (!hasException) { //最后打印cpu的平均消耗,GC的统计 VMInfo vmInfo = VMInfo.getVmInfo(); if (vmInfo != null) { vmInfo.getDelta(false); LOG.info(vmInfo.totalString()); LogUtil.logVmInfo(vmInfo); } LOG.info(PerfTrace.getInstance().summarizeNoException()); this.logStatistics(); } } } }
主要执行流程为:
1、preHandle():job前置操作 2、init():初始化reader和writer 3、prepare():执行插件的prepare操作 4、split():切分任务 5、schedule():执行任务 6、post():执行插件的post操作 7、postHandle():job后置操作 8、invokeHooks():调用hook 9、输出统计结果
上述任务流是顺序执行的,第5步会将切分的task分配到多个taskGroup中并发执行,其中preHandle、postHandle、invokeHooks不影响整体执行,可以先忽略,下面主要介绍关键步骤
1)init()关键代码主要是初始化reader和writer插件
/** * reader和writer的初始化 */ private void init() { this.jobReader = this.initJobReader(jobPluginCollector); this.jobWriter = this.initJobWriter(jobPluginCollector); } /** * reader job的初始化,返回Reader.Job * * @return */ private Reader.Job initJobReader( JobPluginCollector jobPluginCollector) { this.readerPluginName = this.configuration.getString( CoreConstant.DATAX_JOB_CONTENT_READER_NAME); classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( PluginType.READER, this.readerPluginName)); Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin( PluginType.READER, this.readerPluginName); // 设置reader的jobConfig jobReader.setPluginJobConf(this.configuration.getConfiguration( CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER)); // 设置reader的readerConfig jobReader.setPeerPluginJobConf(this.configuration.getConfiguration( CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER)); jobReader.setJobPluginCollector(jobPluginCollector); jobReader.init(); classLoaderSwapper.restoreCurrentThreadClassLoader(); return jobReader; }
reader插件初始化的时候使用了自定义classLoader,这样可以做到插件级别的隔离,插件初始化完成之后调用了job的init函数,用于初始化插件的Job,writer插件初始化同理
2)prepare():prepare操作比较简单,分别执行reader和writer插件Job中的prepare函数即可,同样,每次执行前都会先加载对应的classLoader用于隔离
private void prepare() { this.prepareJobReader(); this.prepareJobWriter(); }
3)split():切分任务task
/** * 执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果, * 达到切分后数目相等,才能满足1:1的通道模型,所以这里可以将reader和writer的配置整合到一起, * 然后,为避免顺序给读写端带来长尾影响,将整合的结果shuffler掉 */ private int split() { this.adjustChannelNumber(); if (this.needChannelNumber <= 0) { this.needChannelNumber = 1; } List<Configuration> readerTaskConfigs = this .doReaderSplit(this.needChannelNumber); int taskNumber = readerTaskConfigs.size(); List<Configuration> writerTaskConfigs = this .doWriterSplit(taskNumber); List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER); LOG.debug("transformer configuration: "+ JSON.toJSONString(transformerList)); /** * 输入是reader和writer的parameter list,输出是content下面元素的list */ List<Configuration> contentConfig = mergeReaderAndWriterTaskConfigs( readerTaskConfigs, writerTaskConfigs, transformerList); LOG.debug("contentConfig configuration: "+ JSON.toJSONString(contentConfig)); this.configuration.set(CoreConstant.DATAX_JOB_CONTENT, contentConfig); return contentConfig.size(); }
上面函数主要分为3步:
1、计算限速和并发,即实际的channel数和每个channel的限速,主要在adjustChannelNumber()中,这里不做过多说明 2、根据实际的channel数,切分reader端,具体的切分逻辑reader插件可以自行实现 3、根据reader端切分的数目切分writer端,达到reader:writer=1:1,这样每个task中都包含一个reader和一个writer
4)schedule():执行切分出来的task
/** * schedule首先完成的工作是把上一步reader和writer split的结果整合到具体taskGroupContainer中, * 同时不同的执行模式调用不同的调度策略,将所有任务调度起来 */ private void schedule() { /** * 这里的全局speed和每个channel的速度设置为B/s */ int channelsPerTaskGroup = this.configuration.getInt( CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5); int taskNumber = this.configuration.getList( CoreConstant.DATAX_JOB_CONTENT).size(); this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber); PerfTrace.getInstance().setChannelNumber(needChannelNumber); /** * 通过获取配置信息得到每个taskGroup需要运行哪些tasks任务 */ List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration, this.needChannelNumber, channelsPerTaskGroup); LOG.info("Scheduler starts [{}] taskGroups.", taskGroupConfigs.size()); ExecuteMode executeMode = null; AbstractScheduler scheduler; try { executeMode = ExecuteMode.STANDALONE; scheduler = initStandaloneScheduler(this.configuration); //设置 executeMode for (Configuration taskGroupConfig : taskGroupConfigs) { taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, executeMode.getValue()); } if (executeMode == ExecuteMode.LOCAL || executeMode == ExecuteMode.DISTRIBUTE) { if (this.jobId <= 0) { throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, "在[ local | distribute ]模式下必须设置jobId,并且其值 > 0 ."); } } LOG.info("Running by {} Mode.", executeMode); this.startTransferTimeStamp = System.currentTimeMillis(); scheduler.schedule(taskGroupConfigs); this.endTransferTimeStamp = System.currentTimeMillis(); } catch (Exception e) { LOG.error("运行scheduler 模式[{}]出错.", executeMode); this.endTransferTimeStamp = System.currentTimeMillis(); throw DataXException.asDataXException( FrameworkErrorCode.RUNTIME_ERROR, e); } /** * 检查任务执行情况 */ this.checkLimit(); }
schedule执行过程主要分为以下几步:
1、计算taskGroup个数 2、将切分的task分配到taskGroup中 3、启动线程池执行taskGroup,具体代码流程为scheduler.schedule(taskGroupConfigs) -> AbstractScheduler.schedule -> startAllTaskGroup -> ProcessInnerScheduler.startAllTaskGroup -> this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner) -> TaskGroupContainerRunner.run() -> this.taskGroupContainer.start() 4、收集taskGroup汇报的信息
5)post():执行插件的post操作
private void post() { this.postJobWriter(); this.postJobReader(); }
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
datax源码阅读二:Engine流程
一、根据前面python文件知道,java的main函数是com.alibaba.datax.core.Engine public static void main(String[] args) throws Exception { int exitCode = 0; try { Engine.entry(args); } catch (Throwable e) { exitCode = 1; String trace = ExceptionTracker.trace(e); String errDesc = "未知datax错误,参考堆栈内容分析。"; LOG.error("\n\n经DataX智能分析,该任务最可能的错误原因是:\n" + trace); if (e instanceof DataXException) { DataXException tempException = (DataXException) e; ErrorCode errorCode = tempException.getErrorCode(); errDesc = errorCode.getDescr...
- 下一篇
python实现单向循环链表数据结构及其方法
首先要说明一下研究数据结构有什么用,可能就像高数之类的在生活中并没有多少用处,但是离不开他,很多大公司面试也会问这个东西;但是要落实到某一个具体的业务场景,我也不知道,但并不代表这些东西没用,也可能是这些模型只是为了让我们能理解更多有用的东西。 今天说的是单向循环链表,昨天说了单向链表《python实现单向链表数据结构及其基本方法》,在此基础上我们说单向循环链表,其基本模型示图如下: 只不过在单向链表的基础上,最后一个节点纸箱头部,定义基本节点对象和链条对象。 classNode: def__init__(self,item): self.item=item#该节点值 self.next=None#连接一下一个节点classSinCycLinkedlist:
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS6,CentOS7官方镜像安装Oracle11G
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS7设置SWAP分区,小内存服务器的救世主
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS8安装Docker,最新的服务器搭配容器使用
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS关闭SELinux安全模块
- CentOS6,7,8上安装Nginx,支持https2.0的开启