datax源码阅读二:Engine流程
一、根据前面python文件知道,java的main函数是com.alibaba.datax.core.Engine
public static void main(String[] args) throws Exception {
int exitCode = 0;
try {
Engine.entry(args);
} catch (Throwable e) {
exitCode = 1;
String trace = ExceptionTracker.trace(e);
String errDesc = "未知datax错误,参考堆栈内容分析。";
LOG.error("\n\n经DataX智能分析,该任务最可能的错误原因是:\n" + trace);
if (e instanceof DataXException) {
DataXException tempException = (DataXException) e;
ErrorCode errorCode = tempException.getErrorCode();
errDesc = errorCode.getDescription();
if (errorCode instanceof FrameworkErrorCode) {
FrameworkErrorCode tempErrorCode = (FrameworkErrorCode) errorCode;
exitCode = tempErrorCode.toExitValue();
}
}
System.exit(exitCode);
}
System.exit(exitCode);
}
main函数主要catch了一下异常,并将异常信息打印出来,实际执行在entry函数中
public static void entry(final String[] args) throws Throwable {
Options options = new Options();
options.addOption("job", true, "Job config.");
options.addOption("jobid", true, "Job unique id.");
options.addOption("mode", true, "Job runtime mode.");
BasicParser parser = new BasicParser();
CommandLine cl = parser.parse(options, args);
String jobPath = cl.getOptionValue("job");
// 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1
String jobIdString = cl.getOptionValue("jobid");
RUNTIME_MODE = cl.getOptionValue("mode");
Configuration configuration = ConfigParser.parse(jobPath);
long jobId;
if (!"-1".equalsIgnoreCase(jobIdString)) {
jobId = Long.parseLong(jobIdString);
} else {
// only for dsc & ds & datax 3 update
String dscJobUrlPatternString = "/instance/(\\d{1,})/config.xml";
String dsJobUrlPatternString = "/inner/job/(\\d{1,})/config";
String dsTaskGroupUrlPatternString = "/inner/job/(\\d{1,})/taskGroup/";
List<String> patternStringList = Arrays.asList(dscJobUrlPatternString,
dsJobUrlPatternString, dsTaskGroupUrlPatternString);
jobId = parseJobIdFromUrl(patternStringList, jobPath);
}
boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);
if (!isStandAloneMode && jobId == -1) {
// 如果不是 standalone 模式,那么 jobId 一定不能为-1
throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "非 standalone 模式必须在 URL 中提供有效的 jobId.");
}
configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);
//打印vmInfo
VMInfo vmInfo = VMInfo.getVmInfo();
if (vmInfo != null) {
LOG.info(vmInfo.toString());
}
LOG.info("\n" + Engine.filterJobConfiguration(configuration) + "\n");
LOG.debug(configuration.toJSON());
ConfigurationValidate.doValidate(configuration);
Engine engine = new Engine();
engine.start(configuration);
}
entry函数主要功能:
1、解析了java命令行的三个参数,分别是job、jobid和mode,其中job是用户配置的json文件路径,jobid和mode是python文件带进来的,单机模式下可以忽略改参数
2、读取用户配置的json文件,转化为内部的configuration配置
3、打印相关信息,并校验json文件的合法性
4、启动engine执行
entry执行完毕之后,进入start函数,关键代码如下:
public void start(Configuration allConf) {
// 绑定column转换信息
ColumnCast.bind(allConf);
/**
* 初始化PluginLoader,可以获取各种插件配置
*/
LoadUtil.bind(allConf);
*************
container = new JobContainer(allConf);
*************
container.start();
}
start函数中主要包括:
1、列转换默认值,即动态在configuration中注入默认值
2、初始化插件的LoadUtil,后面classLoader相关操作都会依赖这个函数
3、初始化JobContainer并启动

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
datax源码阅读一:python文件
一、前面主要是怎么使用datax和datax的插件编写,后面主要说明源码阅读部分,python相关文件 二、datax关键代码(python datax.py test.json) 1、datax.py文件 printCopyright() parser = getOptionParser(sys.argv[1:]) options, args = parser.parse_args(sys.argv[1:]) if options.reader is not None and options.writer is not None: generateJobConfigTemplate(options.reader,options.writer) sys.exit(RET_STATE['OK']) if len(args) == 0: parser.print_help() sys.exit(RET_STATE['FAIL']) startCommand = buildStartCommand(options, args) child_process = subprocess.Popen...
-
下一篇
datax源码阅读三:JobContainer
前面介绍的python文件和Engine都是用于做初始化准备,真正的执行都是在这里完成,start代码如下: /** * jobContainer主要负责的工作全部在start()里面,包括init、prepare、split、scheduler、 * post以及destroy和statistics */ @Override public void start() { LOG.info("DataX jobContainer starts job."); boolean hasException = false; boolean isDryRun = false; try { this.startTimeStamp = System.currentTimeMillis(); isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false); if(isDryRun) { LOG.info("jobContainer starts to do preCheck ..."); this.preCh...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker安装Oracle12C,快速搭建Oracle学习环境
- 面试大杂烩
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Windows10,CentOS7,CentOS8安装Nodejs环境
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS关闭SELinux安全模块
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- SpringBoot2全家桶,快速入门学习开发网站教程
- MySQL数据库在高并发下的优化方案