Flink JDBC Connector:Flink 与数据库集成最佳实践
摘要:Flink 1.11 引入了 CDC,在此基础上, JDBC Connector 也发生比较大的变化,本文由 Apache Flink Contributor,阿里巴巴高级开发工程师徐榜江(雪尽)分享,主要介绍 Flink 1.11 JDBC Connector 的最佳实践。大纲如下:
JDBC connector
JDBC Catalog
JDBC Dialect
Demo
Tips:点击下方链接可查看作者原版 PPT 及分享视频:
https://flink-learning.org.cn/developers/flink-training-course3/
JDBC-Connector 的重构
JDBC-Connector 的重构
FLINK-15782 :Rework JDBC Sinks[1] (重写 JDBC Sink)
FLINK-17537:Refactor flink-jdbc connector structure[2] (重构 flink-jdbc 连接器的结构)
FLIP-95: New TableSource and TableSink interfaces[3] (新的 TableSource 和 TableSink 接口)
FLIP-122:New Connector Property Keys for New Factory[4](新的连接器参数)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
FLIP-87:Primary key Constraints in Table API[5] (Table API 接口中的主键约束问题)
JDBC Catalog
JDBC Catalog
// The supported methods by Postgres Catalog.PostgresCatalog.databaseExists(String databaseName)PostgresCatalog.listDatabases()PostgresCatalog.getDatabase(String databaseName)PostgresCatalog.listTables(String databaseName)PostgresCatalog.getTable(ObjectPath tablePath)PostgresCatalog.tableExists(ObjectPath tablePath)
JDBC Dialect
JDBC Dialect
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#data-type-mapping
|
|
|
|
|
|
|
|
|
实践 Demo
实践 Demo
-
Flink standalone 环境准备并在提供的地址下载好对应的安装包和 connector jar。 -
测试数据准备,通过拉起容器运行已经打包好的镜像。其中 Kafka 中的 changelog 数据是通过 debezium connector 抓取的 MySQL orders表 的 binlog。 -
通过 SQL Client 编写 SQL 作业,分别创建 Flink 订单表,维表,用户表,产品表,并创建 Function UDF。从 PG Catalog 获取结果表信息之后,把作业提交至集群执行运行。 -
测试 CDC 数据同步和维表 join,通过新增订单、修改订单、删除订单、维表数据更新等一系列操作验证 CDC 在 Flink 上如何运行以及写入结果表。
https://github.com/leonardBang/flink-sql-etl
问答环节
问答环节
https://issues.apache.org/jira/browse/FLINK-16681
总结
总结
参考链接:

(点击可了解更多议题投递详情)
本文分享自微信公众号 - Flink 中文社区(gh_5efd76d10a8d)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。



