datax常见问题
1、如何调整jvm的参数?
调整datax.py文件中DEFAULT_JVM的值即可
2、插件对应的参数具体含义以及报错之后该如何解决,以MysqlReader为例
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"column": [
"id",
"name"
],
"splitPk": "db_id",
"connection": [
{
"table": [
"table"
],
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/database"]
}
]
}
}
参数说明:splitPk代表切分主键,对于单表进行切分的时候使用 ,如对于表a,存在自增主键id,可以通过id < 1000,id>=1000 & id<2000 ,id>=2000切分成3个sql进行读取,另外,根据connection中的table和jdbcUrl配置项是list可知,这里可以配置多个,多个之间会去笛卡尔积,也就是可以配置任意多的数据库和表,这些库的密码必须保持一致,下面说明一下上述参数在哪里使用
根据前面代码分析可以知道,整个datax的task会被切分多少个是由reader端的split函数决定的,查看mysqlReader的的split函数
public List<Configuration> split(int adviceNumber) {
return this.commonRdbmsReaderJob.split(this.originalConfig, adviceNumber);
}
继续深入进去可以看到实际执行为
public static List<Configuration> doSplit(
Configuration originalSliceConfig, int adviceNumber) {
boolean isTableMode = originalSliceConfig.getBool(Constant.IS_TABLE_MODE).booleanValue();
int eachTableShouldSplittedNumber = -1;
if (isTableMode) {
// adviceNumber这里是channel数量大小, 即datax并发task数量
// eachTableShouldSplittedNumber是单表应该切分的份数, 向上取整可能和adviceNumber没有比例关系了已经
eachTableShouldSplittedNumber = calculateEachTableShouldSplittedNumber(
adviceNumber, originalSliceConfig.getInt(Constant.TABLE_NUMBER_MARK));
}
String column = originalSliceConfig.getString(Key.COLUMN);
String where = originalSliceConfig.getString(Key.WHERE, null);
List<Object> conns = originalSliceConfig.getList(Constant.CONN_MARK, Object.class);
List<Configuration> splittedConfigs = new ArrayList<Configuration>();
for (int i = 0, len = conns.size(); i < len; i++) {
Configuration sliceConfig = originalSliceConfig.clone();
Configuration connConf = Configuration.from(conns.get(i).toString());
String jdbcUrl = connConf.getString(Key.JDBC_URL);
sliceConfig.set(Key.JDBC_URL, jdbcUrl);
// 抽取 jdbcUrl 中的 ip/port 进行资源使用的打标,以提供给 core 做有意义的 shuffle 操作
sliceConfig.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, DataBaseType.parseIpFromJdbcUrl(jdbcUrl));
sliceConfig.remove(Constant.CONN_MARK);
Configuration tempSlice;
// 说明是配置的 table 方式
if (isTableMode) {
// 已在之前进行了扩展和`处理,可以直接使用
List<String> tables = connConf.getList(Key.TABLE, String.class);
Validate.isTrue(null != tables && !tables.isEmpty(), "您读取数据库表配置错误.");
String splitPk = originalSliceConfig.getString(Key.SPLIT_PK, null);
//最终切分份数不一定等于 eachTableShouldSplittedNumber
boolean needSplitTable = eachTableShouldSplittedNumber > 1
&& StringUtils.isNotBlank(splitPk);
if (needSplitTable) {
if (tables.size() == 1) {
//原来:如果是单表的,主键切分num=num*2+1
// splitPk is null这类的情况的数据量本身就比真实数据量少很多, 和channel大小比率关系时,不建议考虑
//eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 2 + 1;// 不应该加1导致长尾
//考虑其他比率数字?(splitPk is null, 忽略此长尾)
eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 5;
}
// 尝试对每个表,切分为eachTableShouldSplittedNumber 份
for (String table : tables) {
tempSlice = sliceConfig.clone();
tempSlice.set(Key.TABLE, table);
List<Configuration> splittedSlices = SingleTableSplitUtil
.splitSingleTable(tempSlice, eachTableShouldSplittedNumber);
splittedConfigs.addAll(splittedSlices);
}
} else {
for (String table : tables) {
tempSlice = sliceConfig.clone();
tempSlice.set(Key.TABLE, table);
String queryColumn = HintUtil.buildQueryColumn(jdbcUrl, table, column);
tempSlice.set(Key.QUERY_SQL, SingleTableSplitUtil.buildQuerySql(queryColumn, table, where));
splittedConfigs.add(tempSlice);
}
}
} else {
// 说明是配置的 querySql 方式
List<String> sqls = connConf.getList(Key.QUERY_SQL, String.class);
// TODO 是否check 配置为多条语句??
for (String querySql : sqls) {
tempSlice = sliceConfig.clone();
tempSlice.set(Key.QUERY_SQL, querySql);
splittedConfigs.add(tempSlice);
}
}
}
return splittedConfigs;
}
在这里可以看到整个reader端是如何切分task,针对个connection下的每个表都切分成一个task(task的配置通过Configuration保存),即求笛卡尔积,同时,这里也考虑了splitPk的情况,通过源码可以直接看到每个参数的作用,在遇到插件的相关问题时,可以直接找到对应的插件函数进行排查

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
Python线性表——单链表
Python线性表——单链表 线性表简介 线性表是一种线性结构,它是由零个或多个数据元素构成的有限序列。线性表的特征是在一个序列中,除了头尾元素,每个元素都有且只有一个直接前驱,有且只有一个直接后继,而序列头元素没有直接前驱,序列尾元素没有直接后继。 数据结构中常见的线性结构有数组、单链表、双链表、循环链表等。线性表中的元素为某种相同的抽象数据类型。可以是C语言的内置类型或结构体,也可以是C++自定义类型。 2. 数组 数组在实际的物理内存上也是连续存储的,数组有上界和下界。C语言中定义一个数组: 数组下标是从0开始的,a[0]对应第一个元素。其中,a[0]称为数组a的下界,a[6]称为数组a的上届。超过这个范围的下标使用数组,将造成数组越界错误。 数组的特点是:数据连续,支持快速随机访问。 数组分为固定数组与动态数组。其中固定数组的大小必须在编译时就能够确认,动态数组允许在运行时申请数组内存。复杂点的数组是多维数组,多维数组实际上也是通过一维数组来实现的。在C语言中,可以通过malloc来分配动态数组,C++使用new。另外,C++的标准模板库提供了动态数组类型vector以及内置有...
-
下一篇
vim 编辑器基本使用
VIM的基本使用 插入 i 在光标前插入 I(大写字母i)在行首插入 a在光标之后插入 A在行尾插入 o在下一行插入 O在上一行插入 r输入替换光标位置字符 R进入输入替换模式 -(减号)大小写转换 ESC 退出编辑模式 搜索 /str 从光标位置开始向下搜索字符 str ?str 从光标位置开始向上搜索字符 str n 找下一个 N 找上一个 复制 yy 复制当前行 nyy 复制从当前开始的n行 n为数字 如 5yy 粘贴 p 粘贴到光标下一行 P 粘贴到光标上一行 替换 :%s/abc/123/g 将所有的abc替换为123 :%s/abc/123/gc 含义同上 但是要一个一个的确认 删除 D当前光标位置开始删除到行尾 dd 删除光标当前行 ndd 删除从光标行开始向后删除n行 n为数字 如 5dd dnG 删除从第n行到当前行的数据 n为数字 d1G 从第一行到当前行的数据 dG 删除从当前行到最后一行的数据 x向后删除一个字符 nx向后删除n个字符 X向前删除1个字符 撤销 u退回前一个动作 Ctrl+r 重复前一个动作 :e! 文档还原到最原始的状态 保存 :w 保存 :w...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8编译安装MySQL8.0.19
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- MySQL8.0.19开启GTID主从同步CentOS8
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- MySQL数据库在高并发下的优化方案
- Docker容器配置,解决镜像无法拉取问题