datax源码阅读一:python文件
一、前面主要是怎么使用datax和datax的插件编写,后面主要说明源码阅读部分,python相关文件
二、datax关键代码(python datax.py test.json)
1、datax.py文件
printCopyright() parser = getOptionParser(sys.argv[1:]) options, args = parser.parse_args(sys.argv[1:]) if options.reader is not None and options.writer is not None: generateJobConfigTemplate(options.reader,options.writer) sys.exit(RET_STATE['OK']) if len(args) == 0: parser.print_help() sys.exit(RET_STATE['FAIL']) startCommand = buildStartCommand(options, args) child_process = subprocess.Popen(startCommand, shell=True)
上面代码关键部分是函数buildStartCommand(options, args)拼出来的java命令,然后启动一个新的子进程去执行java命令
def buildStartCommand(options, args): commandMap = {} tempJVMCommand = DEFAULT_JVM if options.jvmParameters: tempJVMCommand = tempJVMCommand + " " + options.jvmParameters if options.remoteDebug: tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIG print 'local ip: ', getLocalIp() if options.loglevel: tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel)) if options.mode: commandMap["mode"] = options.mode # jobResource 可能是 URL,也可能是本地文件路径(相对,绝对) jobResource = args[0] if not isUrl(jobResource): jobResource = os.path.abspath(jobResource) if jobResource.lower().startswith("file://"): jobResource = jobResource[len("file://"):] jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_')) if options.params: jobParams = jobParams + " " + options.params if options.jobid: commandMap["jobid"] = options.jobid commandMap["jvm"] = tempJVMCommand commandMap["params"] = jobParams commandMap["job"] = jobResource return Template(ENGINE_COMMAND).substitute(**commandMap)
java命令是通过模板ENGINE_COMMAND和commandMap拼接而成,其中ENGINE_COMMAND是主要的java脚本命令
ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (DEFAULT_PROPERTY_CONF, CLASS_PATH)
commandMap中包含jvm参数、任务动态参数和job文件信息(json配置文件),根据这个python文件,用户可以自行修改jvm相关参数,如
DEFAULT_JVM = "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
2、上述python文件执行完毕之后会启动java命令,主函数是com.alibaba.datax.core.Engine,至此python文件主要流程完毕

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
python 多重类继承__init__
目的 项目中遇到多重类继承的问题,想调用父类构造函数中的内容,调试了一两个小时,遇到两个问题。 说不存在某个父类的函数; 报MRO列表错误; 查询了相关的文档,大致是讲解父类的继承,没有涉及到多重继承,以及多重继承构造函数的问题,这里总结一下。 调用父类方法 想在子类中调用父类的某个已经被覆盖的方法: 解决方案 为了调用父类(超类)的一个方法,可以使用 super() 函数,比如 class A: def spam(self): print('A.spam') class B(A): def spam(self): print('B.spam') super().spam() # Call parent spam() super() 函数的一个常见用法是在 __init__() 方法中确保父类被正确的初始化了: class A: def __init__(self): self.x = 0 class B(A): def __init__(self): super().__init__() self.y = 1 super() 的另外一个常见用法出现在覆盖Python特殊方法的代码中,...
- 下一篇
datax源码阅读二:Engine流程
一、根据前面python文件知道,java的main函数是com.alibaba.datax.core.Engine public static void main(String[] args) throws Exception { int exitCode = 0; try { Engine.entry(args); } catch (Throwable e) { exitCode = 1; String trace = ExceptionTracker.trace(e); String errDesc = "未知datax错误,参考堆栈内容分析。"; LOG.error("\n\n经DataX智能分析,该任务最可能的错误原因是:\n" + trace); if (e instanceof DataXException) { DataXException tempException = (DataXException) e; ErrorCode errorCode = tempException.getErrorCode(); errDesc = errorCode.getDescr...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- 设置Eclipse缩进为4个空格,增强代码规范
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS7设置SWAP分区,小内存服务器的救世主
- Docker安装Oracle12C,快速搭建Oracle学习环境