datax源码阅读一:python文件
一、前面主要是怎么使用datax和datax的插件编写,后面主要说明源码阅读部分,python相关文件
二、datax关键代码(python datax.py test.json)
1、datax.py文件
printCopyright()
parser = getOptionParser(sys.argv[1:])
options, args = parser.parse_args(sys.argv[1:])
if options.reader is not None and options.writer is not None:
generateJobConfigTemplate(options.reader,options.writer)
sys.exit(RET_STATE['OK'])
if len(args) == 0:
parser.print_help()
sys.exit(RET_STATE['FAIL'])
startCommand = buildStartCommand(options, args)
child_process = subprocess.Popen(startCommand, shell=True)
上面代码关键部分是函数buildStartCommand(options, args)拼出来的java命令,然后启动一个新的子进程去执行java命令
def buildStartCommand(options, args):
commandMap = {}
tempJVMCommand = DEFAULT_JVM
if options.jvmParameters:
tempJVMCommand = tempJVMCommand + " " + options.jvmParameters
if options.remoteDebug:
tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIG
print 'local ip: ', getLocalIp()
if options.loglevel:
tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel))
if options.mode:
commandMap["mode"] = options.mode
# jobResource 可能是 URL,也可能是本地文件路径(相对,绝对)
jobResource = args[0]
if not isUrl(jobResource):
jobResource = os.path.abspath(jobResource)
if jobResource.lower().startswith("file://"):
jobResource = jobResource[len("file://"):]
jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))
if options.params:
jobParams = jobParams + " " + options.params
if options.jobid:
commandMap["jobid"] = options.jobid
commandMap["jvm"] = tempJVMCommand
commandMap["params"] = jobParams
commandMap["job"] = jobResource
return Template(ENGINE_COMMAND).substitute(**commandMap)
java命令是通过模板ENGINE_COMMAND和commandMap拼接而成,其中ENGINE_COMMAND是主要的java脚本命令
ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (DEFAULT_PROPERTY_CONF, CLASS_PATH)
commandMap中包含jvm参数、任务动态参数和job文件信息(json配置文件),根据这个python文件,用户可以自行修改jvm相关参数,如
DEFAULT_JVM = "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
2、上述python文件执行完毕之后会启动java命令,主函数是com.alibaba.datax.core.Engine,至此python文件主要流程完毕

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
python 多重类继承__init__
目的 项目中遇到多重类继承的问题,想调用父类构造函数中的内容,调试了一两个小时,遇到两个问题。 说不存在某个父类的函数; 报MRO列表错误; 查询了相关的文档,大致是讲解父类的继承,没有涉及到多重继承,以及多重继承构造函数的问题,这里总结一下。 调用父类方法 想在子类中调用父类的某个已经被覆盖的方法: 解决方案 为了调用父类(超类)的一个方法,可以使用 super() 函数,比如 class A: def spam(self): print('A.spam') class B(A): def spam(self): print('B.spam') super().spam() # Call parent spam() super() 函数的一个常见用法是在 __init__() 方法中确保父类被正确的初始化了: class A: def __init__(self): self.x = 0 class B(A): def __init__(self): super().__init__() self.y = 1 super() 的另外一个常见用法出现在覆盖Python特殊方法的代码中,...
-
下一篇
datax源码阅读二:Engine流程
一、根据前面python文件知道,java的main函数是com.alibaba.datax.core.Engine public static void main(String[] args) throws Exception { int exitCode = 0; try { Engine.entry(args); } catch (Throwable e) { exitCode = 1; String trace = ExceptionTracker.trace(e); String errDesc = "未知datax错误,参考堆栈内容分析。"; LOG.error("\n\n经DataX智能分析,该任务最可能的错误原因是:\n" + trace); if (e instanceof DataXException) { DataXException tempException = (DataXException) e; ErrorCode errorCode = tempException.getErrorCode(); errDesc = errorCode.getDescr...
相关文章
文章评论
共有0条评论来说两句吧...