Flink SQL CDC 上线!我们总结了 13 条生产实践经验
项目背景
解决方案
项目运行环境与现状
具体实现
踩过的坑和学到的经验
总结
https://flink-learning.org.cn/developers/flink-training-course3/
01 项目背景
02 解决方案
03 项目运行环境与现状
04 具体实现
-- 在Flink创建账单实收source表
CREATE TABLE bill_info (
billCode STRING,
serviceCode STRING,
accountPeriod STRING,
subjectName STRING ,
subjectCode STRING,
occurDate TIMESTAMP,
amt DECIMAL(11,2),
status STRING,
proc_time AS PROCTIME() -–使用维表时需要指定该字段
) WITH (
'connector' = 'mysql-cdc', -- 连接器
'hostname' = '******', --mysql地址
'port' = '3307', -- mysql端口
'username' = '******', --mysql用户名
'password' = '******', -- mysql密码
'database-name' = 'cdc', -- 数据库名称
'table-name' = '***'
);
-- 在Flink创建订单实收source表
CREATE TABLE order_info (
orderCode STRING,
serviceCode STRING,
accountPeriod STRING,
subjectName STRING ,
subjectCode STRING,
occurDate TIMESTAMP,
amt DECIMAL(11, 2),
status STRING,
proc_time AS PROCTIME() -–使用维表时需要指定该字段
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '******',
'port' = '3307',
'username' = '******',
'password' = '******',
'database-name' = 'cdc',
'table-name' = '***',
);
-- 创建科目维表
CREATE TABLE subject_info (
code VARCHAR(32) NOT NULL,
name VARCHAR(64) NOT NULL,
PRIMARY KEY (code) NOT ENFORCED --指定主键
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xxxx:xxxx/spd?useSSL=false&autoReconnect=true',
'driver' = 'com.mysql.cj.jdbc.Driver',
'table-name' = '***',
'username' = '******',
'password' = '******',
'lookup.cache.max-rows' = '3000',
'lookup.cache.ttl' = '10s',
'lookup.max-retries' = '3'
);
-- 创建实收分布结果表,把结果写到 Elasticsearch
CREATE TABLE income_distribution (
serviceCode STRING,
accountPeriod STRING,
subjectCode STRING,
subjectName STRING,
amt DECIMAL(13,2),
PRIMARY KEY (serviceCode, accountPeriod, subjectCode) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://xxxx:9200',
'index' = 'income_distribution',
'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL'
);
INSERT INTO income_distribution
SELECT t1.serviceCode, t1.accountPeriod, t1.subjectCode, t1.subjectName, SUM(amt) AS amt
FROM (
SELECT b.serviceCode, b.accountPeriod, b.subjectCode, s.name AS subjectName, SUM(amt) AS amt
FROM bill_info AS b
JOIN subject_info FOR SYSTEM_TIME AS OF b.proc_time s ON b.subjectCode = s.code
GROUP BY b.serviceCode, b.accountPeriod, b.subjectCode, s.name
UNION ALL
SELECT b.serviceCode, b.accountPeriod, b.subjectCode, s.name AS subjectName, SUM(amt) AS amt
FROM order_info AS b
JOIN subject_info FOR SYSTEM_TIME AS OF b.proc_time s ON b.subjectCode = s.code
GROUP BY b.serviceCode, b.accountPeriod, b.subjectCode, s.name
) AS t1
GROUP BY t1.serviceCode, t1.accountPeriod, t1.subjectCode, t1.subjectName;
05 踩过的坑和学到的经验
1. Flink 作业原来运行在 standalone session 模式下,提交多个 Flink 作业会导致作业失败报错。
原因:因为 standalone session 模式下启动多个作业会导致多个作业的 Task 共享一个 JVM,可能会导致一些不稳定的问题。并且排查问题时,多个作业的日志混在一个 TaskManager 中,增加了排查的难度。
解决方法:采用 YARN 的 per-job 模式启动多个作业,能有更好的隔离性。
2. SELECT elasticsearch table 报以下错误:
原因:Elasticsearch connector 目前只支持了 sink,不支持 source 。所以不能 SELECT elasticsearch table。
解决办法:在使用 SQL Client 时 sql-client-defaults.yaml 中的并行度配置的优先级更高。在 sql-client-defaults.yaml 中修改并行度,或者删除 sql-client-defaults.yaml 中的并行度配置。更建议采用后者。
原因:Flink CDC 在 scan 全表数据(我们的实收表有千万级数据)需要小时级的时间(受下游聚合反压影响),而在 scan 全表过程中是没有 offset 可以记录的(意味着没法做 checkpoint),但是 Flink 框架任何时候都会按照固定间隔时间做 checkpoint,所以此处 mysql-cdc source 做了比较取巧的方式,即在 scan 全表的过程中,会让执行中的 checkpoint 一直等待甚至超时。超时的 checkpoint 会被仍未认为是 failed checkpoint,默认配置下,这会触发 Flink 的 failover 机制,而默认的 failover 机制是不重启。所以会造成上面的现象。
解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:
execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647
解决方法:在 flink-conf.yaml 中配置 execution.target: yarn-per-job。
原因:因为 SQL Client 默认的 Catalog 是在 in-memory 的,不是持久化 Catalog,所以这属于正常现象,每次启动 Catalog 里面都是空的。
Caused by: org.apache.Flink.elasticsearch7.shaded.org.elasticsearch.ElasticsearchException: Elasticsearch exception [type=illegal_argument_exception, reason=mapper [amt] cannot be changed from type [long] to [float]]
原因:数据库表的字段 amt 的类型是 decimal,DDL 创建输出到 es 的 amt 字段的类型也是 decimal,因为输出到 es 的第一条数据的amt如果是整数,比如是 10,输出到 es 的类型是 long 类型的,es client 会自动创建 es 的索引并且设置 amt 字段为 long 类型的格式,那么如果下一次输出到 es 的 amt 是非整数 10.1,那么输出到 es 的时候就会出现类型不匹配的错误。
解决方法:手动生成 es 索引和 mapping 的信息,指定好 decimal 类型的数据格式是 saclefloat,但是在 DDL 处仍然可以保留该字段类型是 decimal。
原因:因为数据库中别的表做了字段修改,CDC source 同步到了 ALTER DDL 语句,但是解析失败抛出的异常。
解决方法:在 flink-cdc-connectors 最新版本中已经修复该问题(跳过了无法解析的 DDL)。升级 connector jar 包到最新版本 1.1.0:flink-sql-connector-mysql-cdc-1.1.0.jar,替换 flink/lib 下的旧包。
原因:扫描全表阶段慢不一定是 cdc source 的问题,可能是下游节点处理太慢反压了。
解决方法:通过 Web UI 的反压工具排查发现,瓶颈主要在聚合节点上。通过在 sql-client-defaults.yaml 文件配上 MiniBatch 相关参数和开启 distinct 优化(我们的聚合中有 count distinct),作业的 scan 效率得到了很大的提升,从原先的 10 小时,提升到了 1 小时。关于性能调优的参数可以参阅:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/tuning/streaming_aggregation_optimization.html。
configuration:
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 2s
table.exec.mini-batch.size: 5000
table.optimizer.distinct-agg.split.enabled: true
原因:由于使用的 MySQL 用户未授权 RELOAD 权限,导致无法获取全局读锁(FLUSH TABLES WITH READ LOCK), CDC source 就会退化成表级读锁,而使用表级读锁需要等到全表 scan 完,才能释放锁,所以会发现持锁时间过长的现象,影响其他业务写入数据。
解决方法:给使用的 MySQL 用户授予 RELOAD 权限即可。所需的权限列表详见文档:https://github.com/ververica/flink-cdc-connectors/wiki/mysql-cdc-connector#setup-mysql-server。如果出于某些原因无法授予 RELOAD 权限,也可以显式配上 'debezium.snapshot.locking.mode' = 'none'来避免所有锁的获取,但要注意只有当快照期间表的 schema 不会变更才安全。
原因:MySQL binlog 数据同步的原理是,CDC source 会伪装成 MySQL 集群的一个 slave(使用指定的 server id 作为唯一 id),然后从 MySQL 拉取 binlog 数据。如果一个 MySQL 集群中有多个 slave 有同样的 id,就会导致拉取数据错乱的问题。
解决方法:默认会随机生成一个 server id,容易有碰撞的风险。所以建议使用动态参数(table hint)在 query 中覆盖 server id。如下所示:
SELECT *
FROM bill_info /*+ OPTIONS('server-id'='123456') */ ;
原因:Queue Resource Limit for AM 超过了限制资源限制。默认的最大内存是 30G (集群内存) * 0.1 = 3G,而每个 JM 申请 2G 内存,当提交第二个任务时,资源就不够了。
解决方法:调大 AM 的 resource limit,在 capacity-scheduler.xml 配置 yarn.scheduler.capacity.maximum-am-resource-percent,代表AM的占总资源的百分比,默认为0.1,改成0.3(根据服务器的性能灵活配置)。
原因:386.9 MB of 1 GB physical memory used; 2.1 GB of 2.1 GB virtual memory use。默认物理内存是 1GB,动态申请到了 1GB,其中使用了386.9 MB。物理内存 x 2.1=虚拟内存,1GBx2.1≈2.1GB ,2.1GB 虚拟内存已经耗尽,当虚拟内存不够时候,AM 的 container 就会自杀。
解决方法:两个解决方案,或调整 yarn.nodemanager.vmem-pmem-ratio 值大点,或 yarn.nodemanager.vmem-check-enabled=false,关闭虚拟内存检查。参考:https://blog.csdn.net/lzxlfly/article/details/89175452。
06 总结
福利来了 ![]()
Apache Flink 极客挑战赛
本文分享自微信公众号 - Flink 中文社区(gh_5efd76d10a8d)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
深入了解ActiveMQ!
认识MQ(Message Queue) 什么是消息队列 消息队列 首先我们先从以下几个维度来认识一下消息队列: 消息队列:一般我们会简称它为MQ(MessageQueue) 消息(Message):传输的数据。 队列(Queue):队列是一种先进先出的数据结构。 消息队列从字面的含义来看就是一个存放消息的容器。 消息队列可以简单理解为:把要传输的数据放在队列中。 把数据放到消息队列叫做生产者。 从消息队列里边取数据叫做消费者。 为什么需要消息队列 使用消息队列主要是基于以下三个主要场景: 解耦 异步 削峰/限流 下面我们分场景来描述下使用消息队列带来的好处 解耦 假设我们有一个用户系统A,用户系统A可以产生一个userId。 然后,现在有系统B和系统C都需要这个userId去做相关的操作。 解耦前架构 伪码大致如下: javapublicclassSystemA{//系统B和系统C的依赖SystemBsystemB=newSystemB();SystemCsystemC=newSystemC();//系统A独有的数据userIdprivateStringuserId="activeMq...
- 下一篇
木兰编程语言重现:优先级,一个过不去的坎
注:项目目标见码云代码库 上周就复现了一个语法,支持了这样的乘法6/2(1+2),结果为 1。 实现中,用到了针对语法规则的优先级设置。虽然 rply 有文档说明,但死磕过后仍然不明所以然。将调试过程记录在此,最后有问题请教各位。 演示 先举个例子(熟悉者请跳过直接看“正题”部分)。很早就复现了四则运算,比如: 10 + 3 * 6 / 5 => 13 相关的优先级(precedence)设置是开头针对词(token)的部分,从低到高排列: 分析器母机 = ParserGenerator( 规则, precedence=[ ... ('left', [不等于, 等于]), ... ('left', [加, 减]), ('left', [星号, 除]), ... ('right', [乘方]) 注:上面的“除”代表的是原本支持的/除法符号。 比方说,现在想扩展一个语法,支持÷这个除法符号,除了添加一个词“除法”——分词器母机.add(除法, '÷')、增加一个语法规则——@分析器母机.production(语法.二元表达式.成分(语法.表达式, 除法...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- 设置Eclipse缩进为4个空格,增强代码规范