【DataX】Java中集成DataX开发
步骤
先说总体步骤:
- 下载源码,并编译到本地
maven
仓库[上传私服(可选)]; -
pom
文件依赖datax-core
和需要的reader
和writer
- 环境变量设置
datax.home
(或者利用System#setProperty(String)
)和一些需要替换脚本中的变量:脚本中${}
占位符的变量将被系统变量替换。 - 将datax.tar.gz中解压出来的
conf
、plugin
等文件放到datax.home目录中。 - 构造参数数组:
{"-job", "xxx.json", "-mode", "standalone", "-jobid", "-1"}
- 调用
Engin#main(String[])
或者Engine#entry(String[])
引言
目前官方的使用指南里都是利用python来调用dataX执行任务。而且现有的博客基本上也是利用java来调用python命令Runtime.getRuntime().exec()
来执行。
个人感觉,dataX未提供java集成开发的方法,应该是定位生产系统,运维需要吧?!
我们的业务场景:执行完dataX的job之后,还有一定的业务逻辑,所以希望在java应用里调用dataX执行完job之后,再执行后续逻辑。
DataX分析
笔者简单的看了一下午的DataX的逻辑,完全以使用者的视角分析DataX,必然不能完全了解DataX的整个执行过程。
本文仅分析如果能够在java代码里集成DataX进行开发。
集成准备
DataX没有将代码上传到maven服务器上,所以需要自己先pull代码到本地,编译,才能在集成开发的使用通过pom引用。有条件的可以上传到自己的私服上。
代码地址
代码依赖
通过pom文件加入datax-core
:
<dependency> <groupId>com.alibaba.datax</groupId> <artifactId>datax-core</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
如果需要对应的reader
和writer
的话,加入到pom文件中,比如需要streamreader和streamwriter:
<dependency> <groupId>com.alibaba.datax</groupId> <artifactId>streamreader</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> <dependency> <groupId>com.alibaba.datax</groupId> <artifactId>streamwriter</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
要依赖datax一定要保证有对应的源码或者编译到本机的maven repository或者在对应的私服上有上传相应的编译版本,不然pom文件是找不到依赖的。
为了集成开发,可能需要一口气引用所有的reader和writer,目前所知,就得一个一个写,如果大家有好办法,麻烦告知!
准备相应的文件
从com.alibaba.datax.core.util.container.CoreConstant
中可以看到,datax.home
很重要,很多文件的读取都是在datax.home
里面获取的。就如我们在安装版的datax中可以看到里面一些目录一样
$ ll total 4 drwxr-xr-x 2 mcbadm mcb 56 Sep 20 18:28 bin drwxr-xr-x 2 mcbadm mcb 65 Sep 20 18:28 conf drwxr-xr-x 2 mcbadm mcb 21 Sep 20 18:28 job drwxr-xr-x 2 mcbadm mcb 4096 Sep 20 18:28 lib drwxr-xr-x 4 mcbadm mcb 32 Sep 20 18:28 plugin drwxr-xr-x 2 mcbadm mcb 22 Sep 20 18:28 script drwxr-xr-x 2 mcbadm mcb 23 Sep 20 18:28 tmp
目前所知的,Engine#entry
在解析配置的时候会读取conf目录下的文件,还有对应plugin/reader/xxxreader、plugin/writer/xxxwriter的plugin.json文件:
{ "name": "streamreader", "class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader", "description": { "useScene": "only for developer test.", "mechanism": "use datax framework to transport data from stream.", "warn": "Never use it in your real job." }, "developer": "alibaba" }
编写代码
编写job代码:
{ "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "sliceRecordCount": 1, "column": [ { "type": "long", "value": "10" }, { "type": "string", "value": "hello,你好,世界-DataX" } ] } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "UTF-8", "print": true } } } ], "setting": { "speed": { "channel": 1 } } } }
写个测试类吧:
import com.alibaba.datax.core.Engine; public class EngineTest { public static void main(String[] args) { System.setProperty("datax.home", getCurrentClasspath()); String[] datxArgs = {"-job", getCurrentClasspath() + "/job/stream2stream.json", "-mode", "standalone", "-jobid", "-1"}; try { Engine.entry(datxArgs); } catch (Throwable e) { e.printStackTrace(); } } public static String getCurrentClasspath() { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); String currentClasspath = classLoader.getResource("").getPath(); // 当前操作系统 String osName = System.getProperty("os.name"); if (osName.startsWith("Windows")) { // 删除path中最前面的/ currentClasspath = currentClasspath.substring(1); } return currentClasspath; } }
datax在解析完配置后,会将core.json,job.json,plugin.json合并在一起:
{ "common": { "column": { "dateFormat": "yyyy-MM-dd", "datetimeFormat": "yyyy-MM-dd HH:mm:ss", "encoding": "utf-8", "extraFormats": [ "yyyyMMdd" ], "timeFormat": "HH:mm:ss", "timeZone": "GMT+8" } }, "core": { "container": { "job": { "id": -1, "reportInterval": 10000 }, "taskGroup": { "channel": 5 }, "trace": { "enable": "false" } }, "dataXServer": { "address": "http://localhost:7001/api", "reportDataxLog": false, "reportPerfLog": false, "timeout": 10000 }, "statistics": { "collector": { "plugin": { "maxDirtyNumber": 10, "taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector" } } }, "transport": { "channel": { "byteCapacity": 67108864, "capacity": 512, "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel", "flowControlInterval": 20, "speed": { "byte": -1, "record": -1 } }, "exchanger": { "bufferSize": 32, "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger" } } }, "entry": { "jvm": "-Xms1G -Xmx1G" }, "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "column": [ { "type": "long", "value": "10" }, { "type": "string", "value": "hello,你好,世界-DataX" } ], "sliceRecordCount": 1 } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "UTF-8", "print": true } } } ], "setting": { "speed": { "channel": 1 } } }, "plugin": { "reader": { "streamreader": { "class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader", "description": { "mechanism": "use datax framework to transport data from stream.", "useScene": "only for developer test.", "warn": "Never use it in your real job." }, "developer": "alibaba", "name": "streamreader", "path": "D:/workspace/datax-test/target/test-classes/\\plugin\\reader\\streamreader" } }, "writer": { "streamwriter": { "class": "com.alibaba.datax.plugin.writer.streamwriter.StreamWriter", "description": { "mechanism": "use datax framework to transport data to stream.", "useScene": "only for developer test.", "warn": "Never use it in your real job." }, "developer": "alibaba", "name": "streamwriter", "path": "D:/workspace/datax-test/target/test-classes/\\plugin\\writer\\streamwriter" } } } }
说说插件原理
每个reader和writer都有自己的plugin.json文件,里面最重要的就是class配置了,这个类的全路径配置用于classloader将其加载进来并通过反射将其实例化。加载代码可看com.alibaba.datax.core.util.container.LoadUtil
所以我们在集成的时候,plugin目录下面不需要有jar包了,只需要放json文件就行,因为我们通过pom文件依赖了对应的reader和writer,说白了,就是classpath下面有对应的reader和writer即可。
结束语
文章有点长,记录了一个下午的研究结果,应该有很多不完善的地方,希望可以和大家多交流。如果觉得有帮助,可以点个赞。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Java 调用树莓派硬件资源
使用 Java 和 Pi4J 调用树莓派上的硬件资源。使用树莓派 3B,系统是RASPBIAN STRETCH LITE version June 2018 环境安装 安装Java sudo apt-get install oracle-java8-jdk 安装 WiringPi 使用里面的方法1WiringPi 安装 安装Pi4J库最新 Pi4J 库(1.2)需要最新库的原因 下载后传到树莓派文件系统里,使用如下命令安装: sudo dpkg -i pi4j-1.2-SNAPSHOT.deb (待定)
- 下一篇
Coding and Paper Letter(二十三)
资源整理。 1 Coding: 1.开源项目aom,基于Fotran90编写的水光学蒙特卡洛模型。可以模拟辐射量。 aomc 2.开源项目proSR,一种全新的单图像超分辨率重建方法。 proSR 3.开源项目conditional neural process,条件神经网络的jupyter notebook实现。 conditional neural process 4.如何将自己开发的R包推送到cran上,需要准备的东西。 prepare for cran 5.Python库flask ipywidgets,flask框架里的ipywidgets支持。 flask ipywidgets 6.针对生态学家的贝叶斯数据分析介绍。 bayes course 7.R中关于回归,GLM,混合效应模型和GLMM的wokrshop练习。 glmm course 8.R语言包bayesdfa,bayesdfa与Stan实施贝叶斯动态因子分析(DFA)。 bayesdfa 9.开放交通数据的资源。 opentransportdata 10.R语言包rangeModelMetadata,提供了创建与R...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS关闭SELinux安全模块
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- Hadoop3单机部署,实现最简伪集群
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS6,CentOS7官方镜像安装Oracle11G
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8编译安装MySQL8.0.19
- CentOS6,7,8上安装Nginx,支持https2.0的开启