您现在的位置是:首页 > 文章详情

datax源码阅读二:Engine流程

日期:2019-04-03点击:355

一、根据前面python文件知道,java的main函数是com.alibaba.datax.core.Engine

 public static void main(String[] args) throws Exception { int exitCode = 0; try { Engine.entry(args); } catch (Throwable e) { exitCode = 1; String trace = ExceptionTracker.trace(e); String errDesc = "未知datax错误,参考堆栈内容分析。"; LOG.error("\n\n经DataX智能分析,该任务最可能的错误原因是:\n" + trace); if (e instanceof DataXException) { DataXException tempException = (DataXException) e; ErrorCode errorCode = tempException.getErrorCode(); errDesc = errorCode.getDescription(); if (errorCode instanceof FrameworkErrorCode) { FrameworkErrorCode tempErrorCode = (FrameworkErrorCode) errorCode; exitCode = tempErrorCode.toExitValue(); } } System.exit(exitCode); } System.exit(exitCode); } 

main函数主要catch了一下异常,并将异常信息打印出来,实际执行在entry函数中

 public static void entry(final String[] args) throws Throwable { Options options = new Options(); options.addOption("job", true, "Job config."); options.addOption("jobid", true, "Job unique id."); options.addOption("mode", true, "Job runtime mode."); BasicParser parser = new BasicParser(); CommandLine cl = parser.parse(options, args); String jobPath = cl.getOptionValue("job"); // 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1 String jobIdString = cl.getOptionValue("jobid"); RUNTIME_MODE = cl.getOptionValue("mode"); Configuration configuration = ConfigParser.parse(jobPath); long jobId; if (!"-1".equalsIgnoreCase(jobIdString)) { jobId = Long.parseLong(jobIdString); } else { // only for dsc & ds & datax 3 update String dscJobUrlPatternString = "/instance/(\\d{1,})/config.xml"; String dsJobUrlPatternString = "/inner/job/(\\d{1,})/config"; String dsTaskGroupUrlPatternString = "/inner/job/(\\d{1,})/taskGroup/"; List<String> patternStringList = Arrays.asList(dscJobUrlPatternString, dsJobUrlPatternString, dsTaskGroupUrlPatternString); jobId = parseJobIdFromUrl(patternStringList, jobPath); } boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE); if (!isStandAloneMode && jobId == -1) { // 如果不是 standalone 模式,那么 jobId 一定不能为-1 throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "非 standalone 模式必须在 URL 中提供有效的 jobId."); } configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId); //打印vmInfo VMInfo vmInfo = VMInfo.getVmInfo(); if (vmInfo != null) { LOG.info(vmInfo.toString()); } LOG.info("\n" + Engine.filterJobConfiguration(configuration) + "\n"); LOG.debug(configuration.toJSON()); ConfigurationValidate.doValidate(configuration); Engine engine = new Engine(); engine.start(configuration); } 

entry函数主要功能:

1、解析了java命令行的三个参数,分别是job、jobid和mode,其中job是用户配置的json文件路径,jobid和mode是python文件带进来的,单机模式下可以忽略改参数 2、读取用户配置的json文件,转化为内部的configuration配置 3、打印相关信息,并校验json文件的合法性 4、启动engine执行 

entry执行完毕之后,进入start函数,关键代码如下:

 public void start(Configuration allConf) { // 绑定column转换信息 ColumnCast.bind(allConf); /** * 初始化PluginLoader,可以获取各种插件配置 */ LoadUtil.bind(allConf); ************* container = new JobContainer(allConf); ************* container.start(); } 

start函数中主要包括:

1、列转换默认值,即动态在configuration中注入默认值 2、初始化插件的LoadUtil,后面classLoader相关操作都会依赖这个函数 3、初始化JobContainer并启动
原文链接:https://yq.aliyun.com/articles/696855
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章