利用 Addax 异构迁移数据到 Databend
作者:邰翀
(https://github.com/TCeason) Databend 研发工程师
现在互联网应用越来越复杂,每个公司都会有多种多样的数据库。通常是用最好的硬件来跑 OLTP,甚至还在 OLTP 中进行分库分表来足业务,这样对于一些分析,聚合,排序操作非常麻烦。这也有了异构数据库的数据同步需求,今天重点给大家介绍两个利器 :异构数据迁移:Addax 结合云原生数仓 Databend 实现异构数据库数据合并及分析。
Addax (https://github.com/wgzhao/Addax) 是一个异构数据源离线同步工具,最初来源于阿里的 DataX ,致力于实现包括关系型数据库(MySQL、PostgreSQL、Oracle等)、HDFS、Hive、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
Databend (https://github.com/datafuselabs/databend) 是一个开源、弹性、负载感知的现代云数仓库,赋能企业降本增效。在之前的文章中介绍了 如何快速部署 Databend (https://www.databend.cn/blog/deploy-databend-on-minio)。
为什么是 Addax 没有选用 Datax, 实际这次发起这个项目的原因是一个用户原来的环境中有 Clickhouse, Datax 不支持 Clickhose 读取和写入,所以我们优先支持了 Addax 这个异构迁移工具。下面我们通过一个简单练习,让你学习使用 Addax , 另外通过几个进阶案例给你展示一下 Addax 的魅力。
本文中仅以 Addax 的 mysqlreader plugin 为例进行实验,databendwriter 支持所有 Addax 提供的 reader plugin。
1. Addax 基本使用
1.1. 安装 Addax
# more info https://wgzhao.github.io/Addax/4.0.11/ wget https://github.com/wgzhao/Addax/releases/download/4.0.11/addax-4.0.11.tar.gz; tar xvf addax-4.0.11.tar.gz;
Addax 也支持 Docker 安装 (https://github.com/wgzhao/Addax#use-docker-image)和编译安装 (https://github.com/wgzhao/Addax#compile-and-package)。
1.2. Demo (from MySQL to Databend)
在 MySQL Server 中建立迁移用户。(本例中待迁移的表为 db.tb01)
mysql> create user 'mysqlu1'@'%' identified by '123'; mysql> grant all on *.* to 'mysqlu1'@'%'; mysql> create database db; mysql> create table db.tb01(id int, col1 varchar(10)); mysql> insert into db.tb01 values(1, 'test1'), (2, 'test2'), (3, 'test3');
在 Databend 中建立对应的表结构。(将 MySQL 的 db.tb01 数据迁移至 Databend 的 migrate_db.tb01)
databend> create database migrate_db; databend> create table migrate_db.tb01(id int null, col1 String null);
进行如下配置后,即可开始迁移。
$ cd addax-4.0.11/bin; $ cat <<EOF > ./mysql2databend.json { "job": { "setting": { "speed": { "channel": 4 } }, "content": { "writer": { "name": "databendwriter", "parameter": { "preSql": [ "truncate table @table" ], "postSql": [ ], "username": "u1", "password": "123", "database": "migrate_db", "table": "tb01", "jdbcUrl": "jdbc:mysql://127.0.0.1:3307/migrate_db", "loadUrl": ["127.0.0.1:8000","127.0.0.1:8000"], "fieldDelimiter": "\\x01", "lineDelimiter": "\\x02", "column": ["*"], "format": "csv" } }, "reader": { "name": "mysqlreader", "parameter": { "username": "mysqlu1", "password": "123", "column": [ "*" ], "connection": [ { "jdbcUrl": [ "jdbc:mysql://127.0.0.1:3306/db" ], "driver": "com.mysql.jdbc.Driver", "table": [ "tb01" ] } ] } } } } } EOF $ ./addax.sh -L debug ./mysql2databend.json
1.3. 校验数据
databend> select * from migrate_db.tb01; +------+-------+ | id | col1 | +------+-------+ | 1 | test1 | | 2 | test2 | | 3 | test3 | +------+-------+
更多使用方式参见:https://wgzhao.github.io/Addax/4.0.11/writer/databendwriter/
1.4. 小结
上面的例子是通过 Addax 跑通一个表的迁移到 Databend , 通过一个简单的例子也可以感受一下 Addax 大概的流程。
但 Addax 远比这个 Demo 强大。另外 Addax 强大之处可能通过参数来控制配置文件,这样比轻松地实现一个配置迁移, 甚至可以传入 SQL 这样来读取指定区间做数据的迁移。
2. Addax 进阶使用
Addax 配置框架可以参考:https://wgzhao.github.io/Addax/4.0.11/setupJob/
如果只是使用源端和目标,这块在配置中主要需要关注:
"content": { "reader": {}, "writer": {}, }
另外 Addax 参考了 DataX 的设计和使用习惯,对于 DataX 支持语法在 Addax 都可以使用。
下面我举几个生产中可能会用到例子,来给大家参考一下:
Case1: 生产中 10 张表的数据合并到 Databend 中一张表
Case2: 把 MySQL 中所有的表都迁移到 Databend 中
Case3: 指定 SQL 读取原表的数据,迁移到指定的表中
Case 1: 生产中 10 张表的数据合并到 Databend 中一张表
这个需求在生产中比较常见,需要把线上的数据汇聚一个地方进行分析, 这块正好可以利用 Databend 基于对象存储及高压缩的能力。假设源端是 MySQL 数据库, 目标端是 Databend, 下面是一个简化的配置:
"reader": { "name": "mysqlreader", ... "connection": [ ... "table": [ '${dst_table}' ] ] }, "writer": { "name": "databendwriter", ... "table": '${src_table}', ... }
基于这配置我们需要写一个脚本来调用, 例如要迁移的前缀是 sbtest 从 1 到 10 ,最终合并为 sbtest 调用方法如下:
pre_tb="sbtest" dst_tb="sbtest" for t in `seq 1 10` do tb=$pre_tb$t echo $tb ./bin/addax.sh ./job/my2databend.json -p "-Dsrc_table=$tb -Ddst_table=$dst_tb" done
mysql2databend.json 配置参考:databend-workshop/mysql2databend.json at main · wubx/databend-workshop (https://github.com/wubx/databend-workshop/blob/main/addax_mysql2databend/mysql2databend.json)
Case 2: 把 MySQL 中所有的表都迁移到 Databend 中
基于上面的案例,估计大概已经明白其中的套路了。如果 MySQL 到 Databend 还可以基于上面的配置文件,只需要把要迁移的表整理成一个 list 控制。全库迁移难点在于表结构生成,对于 Databend 表结构的生成只需要字段名和类型即可。
表结构生成的脚本参考:https://github.com/wubx/databend-workshop/blob/main/addax_mysql2databend/mysql_str2databend.py
调用方法:
#python3 g_mysql.py -H MySQL_IP -P MySQL_PORT -u MySQL_User -p mysql_password -d dbname |mysql -h databend_ip -Pdatabend_port -udatabend_user dbname python3 g_mysql.py -H 172.21.16.9 -P 3306 -u root -p vgypH8nc -d wubx |mysql -h 127.0.0.1 -P3307 -uroot wubx
定制迁移配置
"reader": { "name": "mysqlreader", ... "connection": [ ... "table": [ '${src_table}' ] ] }, "writer": { "name": "databendwriter", ... "table": '${src_table}', ... }
调用方式
pre_tb="sbtest" for t in `seq 1 10` do tb=$pre_tb$t echo $tb ./bin/addax.sh ./job/mysql2databend.json -p "-Dsrc_table=$tb" done
Case 3: 指定 SQL 读取原表的数据,迁移到指定的表中
这种场景适合定期小批量迁移的,但原始表里需要有时间字段,比如订单类数据迁移。这里我们要使用到 Addax 数据读取中指定 querySQL 这个特性
"content": { "reader": { "name": "mysqlreader", "parameter": { ... "connection": [ { "querySql": ["$DS"], ... } ] } }, "writer": { "name": "databendwriter", ... "table": '${DT}', ... }
调用方法:
# 利用 DS 传入读取的 SQL ,通过 DT 指定写入的表名 python3 ./bin/addax.py job/msql.json -p "-DDS='select * from sbtest1 limit 10' -DDT=sbtest"
小结
本部分通过几个案例分展示了 Addax 的数据读取能力,通过灵活的组合基本可以满足生产中的各种需求。实际使用中如果为了提速,需要调整任务的配置和 autoPK 的这个功能。另外也可以使用 Addax 和 Databend 也可以担负起数据库的归档需求。
Databend 兼容 MySQL 协议 (https://databend.rs/doc/integrations/api/mysql-handler),如果使用 Databend 做 ODS 层,又想结合原来的大数据生态使用,也可以使用 Addax 直接使用 mysql (https://wgzhao.github.io/Addax/4.0.11/reader/mysqlreader/) reader (https://wgzhao.github.io/Addax/4.0.11/reader/mysqlreader/) 插件读取 Databend 中的数据。
3. 使用中注意事项
Addax 使用中对于具体的数据库,建议多阅读官方的 reader 或是 writer 的说明,根据实际情况决定合理的配置。这里的案例中 Databend 使用的是 Streaming_load 接口进行的数据写入,如果单节点 databend-query 有压力的情况下,也可以考虑配置多个 databend-query ,Addax 也是支持多个 databend-query 写入。
以上都是参考,如果还有不能满足你的使用,或是你对 Addax 和 Databend 有进一步需求,也可以 wx 小D : Databend 约一些线上的交流。
关于 Databend
Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。
Databend 文档:https://databend.rs/
Wechat:Databend

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
总结了6种卷积神经网络压缩方法
摘要:神经网络的压缩算法是,旨在将一个庞大而复杂的预训练模型(pre-trained model)转化为一个精简的小模型。 本文分享自华为云社区《卷积神经网络压缩方法总结》,作者:嵌入式视觉 。 我们知道,在一定程度上,网络越深,参数越多,模型越复杂,其最终效果越好。神经网络的压缩算法是,旨在将一个庞大而复杂的预训练模型(pre-trained model)转化为一个精简的小模型。 按照压缩过程对网络结构的破坏程度,我们将模型压缩技术分为“前端压缩”和“后端压缩”两部分。 前端压缩,是指在不改变原网络结构的压缩技术,主要包括知识蒸馏、轻量级网络(紧凑的模型结构设计)以及滤波器(filter)层面的剪枝(结构化剪枝)等; 后端压缩,是指包括低秩近似、未加限制的剪枝(非结构化剪枝/稀疏)、参数量化以及二值网络等,目标在于尽可能减少模型大小,会对原始网络结构造成极大程度的改造。 总结:前端压缩几乎不改变原有网络结构(仅仅只是在原模型基础上减少了网络的层数或者滤波器个数),后端压缩对网络结构有不可逆的大幅度改变,造成原有深度学习库、甚至硬件设备不兼容改变之后的网络。其维护成本很高。 一,低秩近...
- 下一篇
一次JSF上线问题引发的MsgPack深入理解,保证对你有收获
作者: 京东零售 肖梦圆 前序 某一日晚上上线,测试同学在回归项目黄金流程时,有一个工单项目接口报JSF序列化错误,马上升级对应的client包版本,编译部署后错误消失。 线上问题是解决了,但是作为程序员要了解问题发生的原因和本质。但这都是为什么呢? 第一个问题:为什么测试的时候没有发现问题呢? 首先预发环境中,所有项目中的JSF别名和client包都是beta,每天都有项目进行编译部署,这样每个项目获取的都是最新的client包,所以在预发环境测试没有发现 第二个问题:为什么会出现序列化问题? JDer的开发们都知道JSF接口如果添加字段需要在类的最后进行添加,对此我检查了自己的代码发现我添加的代码也是在类的最后进行添加的,但是特殊之处在于这是一个父类,有子类进行继承 第三个问题:如果在父类上添加一个字段有什么影响呢? 说实话,猛的这么一问,我犹豫了,JDer们都知道JSF的默认序列化使用的是MsgPack,一直都是口口相传说如果client类添加字段必须在类的最后,但是也没人告诉父类添加字段咋办呀,父子类这种场景MsgPack是如何处理序列化和反序列化的? 第四...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8编译安装MySQL8.0.19
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- CentOS8安装Docker,最新的服务器搭配容器使用
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库