推荐一款数据同步工具:FlinkX
FlinkX
1 什么是FlinkX
- FlinkX是基于flink的分布式离线数据同步框架,实现了多种异构数据源之间高效的数据迁移。
不同的数据源头被抽象成不同的Reader插件,不同的数据目标被抽象成不同的Writer插件。理论上,FlinkX框架可以支持任意数据源类型的数据同步工作。作为一套生态系统,每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
2 工作原理
在底层实现上,FlinkX依赖Flink,数据同步任务会被翻译成StreamGraph在Flink上执行,工作原理如下图:
3 快速起步
3.1 运行模式
- 单机模式:对应Flink集群的单机模式
- standalone模式:对应Flink集群的分布式模式
- yarn模式:对应Flink集群的yarn模式
3.2 执行环境
- Java: JDK8及以上
- Flink集群: 1.4及以上(单机模式不需要安装Flink集群)
- 操作系统:理论上不限,但是目前只编写了shell启动脚本,用户可以可以参考shell脚本编写适合特定操作系统的启动脚本。
3.3 打包
进入项目根目录,使用maven打包:
mvn clean package -Dmaven.test.skip
打包结束后,项目根目录下会产生bin目录和plugins目录,其中bin目录包含FlinkX的启动脚本,plugins目录下存放编译好的数据同步插件包
3.4 启动
3.4.1 命令行参数选项
-
model
-
描述:执行模式,也就是flink集群的工作模式
- local: 本地模式
- standalone: 独立部署模式的flink集群
- yarn: yarn模式的flink集群,需要提前在yarn上启动一个flink session,使用默认名称"Flink session cluster"
- 必选:否
- 默认值:local
-
-
job
- 描述:数据同步任务描述文件的存放路径;该描述文件中使用json字符串存放任务信息。
- 必选:是
- 默认值:无
-
plugin
- 描述:插件根目录地址,也就是打包后产生的plugins目录。
- 必选:是
- 默认值:无
-
flinkconf
- 描述:flink配置文件所在的目录(单机模式下不需要),如/hadoop/flink-1.4.0/conf
- 必选:否
- 默认值:无
-
yarnconf
- 描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop
- 必选:否
- 默认值:无
3.4.2 启动数据同步任务
- 以本地模式启动数据同步任务
bin/flinkx -mode local -job /Users/softfly/company/flink-data-transfer/jobs/task_to_run.json -plugin /Users/softfly/company/flink-data-transfer/plugins -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*
- 以standalone模式启动数据同步任务
bin/flinkx -mode standalone -job /Users/softfly/company/flink-data-transfer/jobs/oracle_to_oracle.json -plugin /Users/softfly/company/flink-data-transfer/plugins -flinkconf /hadoop/flink-1.4.0/conf -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*
- 以yarn模式启动数据同步任务
bin/flinkx -mode yarn -job /Users/softfly/company/flinkx/jobs/mysql_to_mysql.json -plugin /opt/dtstack/flinkplugin/syncplugin -flinkconf /opt/dtstack/myconf/conf -yarnconf /opt/dtstack/myconf/hadoop -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*
4 数据同步任务模版
从最高空俯视,一个数据同步的构成很简单,如下:
{ "job": { "setting": {...}, "content": [...] } }
数据同步任务包括一个job元素,而这个元素包括setting和content两部分。
- setting: 用于配置限速、错误控制和脏数据管理
- content: 用于配置具体任务信息,包括从哪里来(Reader插件信息),到哪里去(Writer插件信息)
4.1 setting
"setting": { "speed": {...}, "errorLimit": {...}, "dirty": {...} }
setting包括speed、errorLimit和dirty三部分,分别描述限速、错误控制和脏数据管理的配置信息
4.1.1 speed
"speed": { "channel": 3, "bytes": 0 }
- channel: 任务并发数
- bytes: 每秒字节数,默认为 Long.MAX_VALUE
4.1.2 errorLimit
"errorLimit": { "record": 10000, "percentage": 100 }
- record: 出错记录数超过record设置的条数时,任务标记为失败
- percentage: 当出错记录数超过percentage百分数时,任务标记为失败
4.1.3 dirty
"dirty": { "path": "/tmp", "hadoopConfig": { "fs.default.name": "hdfs://ns1", "dfs.nameservices": "ns1", "dfs.ha.namenodes.ns1": "nn1,nn2", "dfs.namenode.rpc-address.ns1.nn1": "node02:9000", "dfs.namenode.rpc-address.ns1.nn2": "node03:9000", "dfs.ha.automatic-failover.enabled": "true", "dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider", "fs.hdfs.impl.disable.cache": "true" } }
- path: 脏数据存放路径
- hadoopConfig: 脏数据存放路径对应hdfs的配置信息(hdfs高可用配置)
4.1.4 restore
"restore": { "isRestore": false, "restoreColumnName": "", "restoreColumnIndex": 0 }
restore配置请参考断点续传
4.2 content
"content": [ { "reader": { "name": "...", "parameter": { ... } }, "writer": { "name": "...", "parameter": { ... } } } ]
- reader: 用于读取数据的插件的信息
- writer: 用于写入数据的插件的信息
reader和writer包括name和parameter,分别表示插件名称和插件参数
4.3 数据同步任务例子
详见flinkx-examples子工程
5. 数据同步插件
5.1 读取插件
- 关系数据库读取插件
- 分库分表读取插件
- HDFS读取插件
- HBase读取插件
- Elasticsearch读取插件
- Ftp读取插件
- Odps读取插件
- MongoDB读取插件
- Stream读取插件
- Carbondata读取插件
- MySQL binlog读取插件
- KafKa读取插件
5.2 写入插件
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
MaxCompute问答整理之9月
本文是基于本人对MaxCompute产品的学习进度,再结合开发者社区里面的一些问题,进而整理成文。希望对大家有所帮助。 问题一、如何查看information_schema的tables?在使用ODPS建表时,有可能会建出几千张表,那我们寻找需要的表时就需要知道表名称,可以在数据地图中查看表,也可以使用Pyodps批量获取表名称。具体可参考文档:https://help.aliyun.com/document_detail/90412.html 问题二、不小心drop删除表可以恢复吗?不可以。在客户端和IDE中drop表是一个不可逆操作。表操作要谨慎。 问题三、在哪里可以看到所有执行的SQL?通过Information_Schema元数据的TASKS_HISTORY明细来查,元数据服务Information_Schema已经全面开放,大
- 下一篇
Linux系统:centos7下搭建ElasticSearch中间件,常用接口演示
本文源码:GitHub·点这里 || GitEE·点这里 一、中间件简介 1、基础概念 ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。 2、分布式数据库 分布式数据库系统通常使用较小的计算机系统,每台计算机可单独放在一个地方,每台计算机中都可能有DBMS的一份完整拷贝副本,或者部分拷贝副本,并具有自己局部的数据库,位于不同地点的许多计算机通过网络互相连接,共同组成一个完整的、全局的逻辑上集中、物理上分布的大型数据库。 3、核心角色 1)节点和集群 cluster代表一个集群,集群中有多个节点,其中有一个为主节点,这个主节点是可以通过选举产生的,主从节点是对于集群内部来说的。es的一个概念就是去中心化,字面上理解就是无中心节点,这是对于集群外部来说的,因为从外部来看es集群,在逻辑上是个整体。单个 Elastic 实例称为一个节点(node)。一组节点构成一个集群(cluster)...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7,8上快速安装Gitea,搭建Git服务器
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS关闭SELinux安全模块
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Hadoop3单机部署,实现最简伪集群
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- CentOS7安装Docker,走上虚拟化容器引擎之路