巾帼力量助力 Flink 引擎 CDC 源模式演进支持 | Apache SeaTunnel 开源之夏成果
今年的开源之夏活动已渐近尾声,历经半年多的潜心开发,Apache SeaTunnel 项目的开发者们收获满满。今天,让我们聚焦于在 Apache SeaTunnel 所支持的 Flink 引擎上实现 CDC 源模式的项目。从项目的初始构思,到一步步的开发实践,再到完成后的感悟,全方位领略这一成果的诞生历程。
接下来,让我们通过采访,走进这位来自北京科技大学的开发者的开源世界,看看她是如何兼顾繁重的学习任务,圆满完成这次开发任务的吧!
个人介绍
- 项目导师:Lucifer Tyrant
- 姓名:董嘉欣
- 学校 + 专业:北京科技大学 大数据管理与应用
- GitHub ID:147227543
- 个人感兴趣或擅长的研究领域:大数据平台开发,曾在快手,美团的数据平台部做数据平台开发工作
- 兴趣爱好:读一些技术文档,尝试业界新技术栈,看小说
项目名称
Flink引擎CDC Source Schema Evolution支持
项目背景
在实时数据同步场景中,源表的schema变更,如新增列、修改列类型等是常见需求。目前Apache SeaTunnel已经在自研引擎上支持了CDC schema evolution,但在Flink引擎上还没有实现这一特性,这导致用户在使用Flink引擎进行CDC同步时,一旦遇到schema变更就需要重启任务,非常影响数据同步的连续性和稳定性。
实现思路
我的实现灵感主要来自于Flink CDC项目的设计思路。在研究Flink CDC的schema evolution实现后结合Apache SeaTunnel的架构特点,设计了一套适配Flink引擎的schema演化方案。
具体实现时序图如下:
核心架构设计包含以下几个关键组件:
-
SchemaCoordinator
-
职责:这是整个方案的核心协调中心,负责全局schema变更的状态管理和同步协调
-
实现细节:
-
维护了schemaChangeStates映射表,记录每个表的schema变更状态
-
通过schemaVersions跟踪每个表的schema版本号
-
使用ReentrantLock锁机制保证多个并发schema变更请求的线程安全
-
维护pendingRequests队列,管理等待schema变更完成的CompletableFuture
-
-
-
SchemaOperator
-
职责:插入在CDC Source和Sink之间的专用算子,负责拦截和处理schema变更事件
-
实现细节:
-
在processElement()方法中检测SchemaChangeEvent
-
调用processSchemaChangeEvent()处理schema变更流程
-
维护currentSchemaChangeFuture用于支持schema变更的取消和回滚
-
通过lastProcessedEventTime防止重复处理旧的schema变更事件
-
-
遇到的关键问题及解决过程:
在开发过程中,我遇到了一个比较棘手的问题:在processElement方法中处理schema变更事件时,整个流程会卡住,不再继续处理后续数据,只会不断地执行checkpoint流程。
通过仔细分析日志,我发现了问题的根源:
2025-08-17 12:33:36,597 INFO FlinkSinkWriter - FlinkSinkWriter handled FlushEvent for table: .schema_test.products
2025-08-17 12:33:36,597 INFO SchemaOperator - FlushEvent sent to downstream for table: .schema_test.products
2025-08-17 12:33:36,597 INFO SchemaCoordinator - Processing schema change for table: .schema_test.products
2025-08-17 12:33:36,598 WARN SchemaCoordinator - No schema change state found for table: .schema_test.products
从这些日志可以看出,先发送了FlushEvent到下游,FlinkSinkWriter处理完FlushEvent后尝试通知SchemaCoordinator,但此时SchemaCoordinator还没有初始化schema change state(因为请求协调器的代码还没执行),导致通知失败。SchemaOperator中的schemaChangeFuture.get()方法会一直等待,直到60秒超时。
之后通过观察日志状态,我调整了执行顺序,将原本 “先发送FlushEvent,后请求SchemaCoordinator” 的逻辑,改为 “先请求SchemaCoordinator创建状态,后发送FlushEvent“ **,就比如这里:
CompletableFuture<SchemaResponse> schemaChangeFuture =
schemaCoordinator.requestSchemaChange(
tableId, jobId, schemaChangeEvent.getChangeAfter(), 1);
currentSchemaChangeFuture.set(schemaChangeFuture);
sendFlushEventToDownstream(schemaChangeEvent); // 在请求协调器之后才发送
这样确保SchemaCoordinator先创建好schema change state,之后请求的时候就不会返回空,然后算子将FlushEvent被发送到下游,下游处理完FlushEvent后,因为此时state已经存在,就可以成功通知SchemaCoordinator,SchemaCoordinator收到通知后,完成schema change的CompletableFuture,之后processSchemaChangeEvent方法的等待结束,继续执行后续流程。
项目成果
-
解决的问题:
- 实现了Flink引擎上的实时schema演化能力,用户在使用Flink引擎进行CDC同步时,源表发生schema变更后无需重启任务
- 提供了完整的schema变更协调机制,确保多算子之间的schema变更同步
-
为用户带来的好处:
- 业务连续性提升:schema变更不再需要停机,大大提高了数据同步的可用性
- 运维成本降低:减少了人工干预,避免了频繁的任务重启
- 数据一致性保障:通过FlushEvent机制确保schema变更前后的数据一致性
- 引擎选择灵活性:用户可以根据自己的需要选择Flink引擎或SeaTunnel引擎,都能获得schema evolution能力
-
技术贡献:
- 新增了SchemaCoordinator全局协调器
- 新增了FlushEvent事件类型和处理机制
- 在Flink translation层实现了完整的schema evolution适配
-
改进方向:
- 多并行度支持:设计并实现多并行度场景下的flush协调机制,可能需要引入并行度感知的计数器和更细粒度的状态管理
- 状态持久化:考虑将SchemaCoordinator改造为Flink的Operator或利用Flink的BroadcastState,使其状态能够参与checkpoint
同时,为了更好地了解同学们在参与开源之夏项目中的开发心得和感受,Apache SeaTunnel 对同学们进行了简短的采访,以下为采访实录:
Q:在众多项目中,为什么选择参与 Apache SeaTunnel 的项目?
A:我选择参与 Apache SeaTunnel 项目,主要有这样几点考虑:第一是它的技术方向和我已有的经验非常契合。之前在一家初创公司实习时,我们就是用SeaTunnel做数据集成,支持数据仓库的搭建。我自己也常用Flink开发数据处理管道、搭建实时血缘系统,对数据集成和实时同步这个领域很感兴趣。SeaTunnel作为新一代数据集成平台,技术栈新、架构清晰,我觉得很适合深入学习并做出贡献。
而且,Apache SeaTunnel 社区氛围特别好,社区非常活跃,大家响应也很及时,对像我这样初次参与开源的同学来说非常友好,CDC schema evolution 这个功能解决的是真实场景中的痛点,能看到自己写的代码真正帮助到用户,会很有成就感。
Q:Apache SeaTunnel 的项目与你的学业有什么交集吗?
A: 有挺多交集的。比如我们大数据处理课程中讲到的 Flink、StarRocks 等框架,在 SeaTunnel 里都有深入的应用。大二时为了处理 Spark 相关的微批次任务,我还用过 StreamPark,所以对数据集成这一块也比较熟悉。参与 SeaTunnel 项目,正好能把课堂上学到的理论知识在实际项目中落地,加深理解。
Q:参与这个项目给你的学业和未来个人规划带来了哪些影响?
这个项目让我收获很大。比如,为了理解CDC的实现,我深入阅读了Flink CDC的源码,对Flink的运行机制、分布式协调、异步编程等有了更扎实的理解。
同时,在导师的指导下,我也学会了如何在大型开源项目中协作:包括代码规范、PR流程、测试覆盖等工程实践,为我未来的开源参与打下了很好的基础。更重要的是,通过这个项目,我明确了自己对数据基础架构方向的兴趣,未来也希望在这个领域继续深耕。
Q: 参与这个项目的过程中您遇到的最大的挑战是什么?是如何克服的?
A: 最大的挑战是遇到一个比较棘手的技术问题:在实现过程中,processElement 方法会卡住,只做checkpoint却不继续处理数据。此外,在架构设计上,如何将新功能优雅地集成到现有系统中,也比我预想的要复杂。
为解决这些问题,除了自己反复调试、查阅资料,也积极向导师和社区伙伴请教。大家的建议给了我很多启发,也帮助我逐步理清了思路。
Q: 您参与开源有多长时间了?喜欢开源吗?开源给你带来了哪些改变?
A: 这是我第一次正式参与开源。虽然之前在公司实习时也写过一些内部功能的代码,但向社区提交PR还是第一次。我非常喜欢开源。最吸引我的是那种开源的氛围——大家为了一个共同的目标,公开讨论、协作贡献,每个人都能在过程中学习和成长。这种开放、共享的精神,让我觉得特别有意义。
Q: 您之前是否了解过或使用过 Apache SeaTunnel 或其他数据集成产品?
之前实习时就用过SeaTunnel,主要是做不同数据源之间的同步,比如从 Kafka 到 Hive,或者从 Kafka 到 StarRocks。Flink CDC 我也接触过,主要用在 CDC Source 的流式集成场景。相比之下,SeaTunnel 支持三种执行引擎,既能做流处理也能做批处理,覆盖的场景更全面。如果未来再选择数据集成工具,我会优先考虑 SeaTunnel——一是功能全面,二是配置起来也比较方便。
Q: Apache SeaTunnel社区贡献给您的第一印象是怎样的?您希望在这里有何收获?
A: 第一印象特别好:社区氛围友好,Mentor 响应及时,代码Review 也非常认真细致。我希望在这里能继续认识更多志同道合的朋友,为社区做出有价值的贡献,同时也精进自己的技术水平。
Q: 您后续还会持续在 Apache SeaTunnel社区活跃吗?
A: 会的。关于我目前实现的功能,我还想再做些优化,更好地保证SeaTunnel on Flink 的精准一次语义。未来也希望能参与更多有意思的议题。
关注公众号
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
基于Portal的NJet模拟环境
背景介绍 作为开发人员,经常碰到的问题是代码开发、测试都是好的,但在客户现场/生产环境,总是出各种各样的问题。这种问题,有可能是实现的逻辑,但更多的情况是现场人员的配置问题导致。因此开发人员需要一个和现场完全一致的环境,进行现场的故障复现,配置验证。 NJet作为应用引擎,应用场景复杂,配置灵活,当然也面临此类问题。为了快速模拟 NJet 配置,验证整条业务链路是否通畅,同时检验系统在高压、大配置数据场景下的承载能力,并确认证书、限流等关键机制是否生效,NJet的模拟环境应运而生。我们期望通过该环境: 自动构建、完全模拟客户的网络环境拓扑 可视化验证客户的配置有效性 可进行批量/单独的功能测试 能够进行域名解析,证书校验,客户端地址等模拟,辅助功能验证 同时,出于简化GUI管理的目的,该模拟应用利用NJet存在的Portal能力,避免了相关的认证授权的开发工作。 NJet模拟应用的安装 随NJet4.0发布,NJet发布了模拟应用的安装包,因此需首先安装Portal安装包,在Portal部署完成后,通过GUI安装模拟应用 参考https://njet.org.cn/cases/njet...
-
下一篇
兼容MongoDB,成本降低10倍,文档数据库EloqDoc发布云服务
继 EloqCloud for EloqKV 成功发布之后,今天我们很高兴地发布EloqCloud for EloqDoc——这是我们为云时代构建的全新 MongoDB 兼容的高性能文档数据库服务。 EloqCloud for EloqDoc 带来了突破性的架构,重新定义了文档工作负载在成本、可扩展性和性能之间的完美平衡。 💡 EloqCloud for EloqDoc 主要特性 1、MongoDB 兼容 API 无需代码改动,轻松迁移。 2、分层存储架构 在内存、NVMe 和对象存储之间无缝管理热数据和冷数据。 3、计算与存储解耦 独立扩展存储与计算,以最低成本处理海量数据集。 4、可预测的高性能 每个实例(即使是免费 tier)都配备专属资源,提供比 MongoDB Atlas FreeTier高达10倍的吞吐量。 💡 首个可用于生产环境的文档数据Free Tier EloqCloud for EloqDoc 推出了一个具备专属资源的生产级FreeTier: 1、25 GB免费存储空间 比 MongoDB Atlas、Supabase 或 Neon 大 50 倍。 2、自动缩容...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- CentOS7设置SWAP分区,小内存服务器的救世主
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS关闭SELinux安全模块
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- CentOS8编译安装MySQL8.0.19
- Windows10,CentOS7,CentOS8安装Nodejs环境
- SpringBoot2全家桶,快速入门学习开发网站教程




微信收款码
支付宝收款码