使用轻量级 CDC debezium-server-databend 构建实时数据同步
作者:韩山杰
Databend Cloud 研发工程师
Debezium Server Databend 是一个基于 Debezium Engine 自研的轻量级 CDC 项目,用于实时捕获数据库更改并将其作为事件流传递最终将数据写入目标数据库 Databend。它提供了一种简单的方式来监视和捕获关系型数据库的变化,并支持将这些变化转换为可消费事件。
使用 Debezium server databend 实现 CDC 无须依赖大型的 Data Infra 比如 Flink, Kafka, Spark 等,只需一个启动脚本即可开启实时数据同步。
这篇教程将展示如何基于 Debezium server databend 快速构建 MySQL 到 Databend 的实时数据同步。
假设我们有电子商务业务,商品的数据存储在 MySQL ,我们需要实时把它同步到 Databend 中。
接下来的内容将介绍如何使用 Debezium server databend CDC 来实现这个需求,系统的整体架构如下图所示:
准备阶段
准备一台已经安装了 Docker ,docker-compose 以及 Java 11 环境 的 Linux 或者 MacOS 。
准备教程所需要的组件
接下来的教程将以 docker-compose
的方式准备所需要的组件。
debezium-MySQL
docker-compose.yaml
version: '2.1' services: postgres: image: debezium/example-postgres:1.1 ports: - "5432:5432" environment: - POSTGRES_DB=postgres - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres mysql: image: debezium/example-mysql:1.1 ports: - "3306:3306" environment: - MYSQL_ROOT_PASSWORD=123456 - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw
Debezium Server Databend
-
Clone 项目:
git clone ``https://github.com/databendcloud/debezium-server-databend.git
-
从项目根目录开始:
- 构建和打包 debezium server:
mvn -Passembly -Dmaven.test.skip package
- 构建完成后,解压服务器分发包:
unzip debezium-server-databend-dist/target/debezium-server-databend-dist*.zip -d databendDist
- 进入解压后的文件夹:
cd databendDist
- 创建
application.properties
文件并修改:nano conf/application.properties
,将下面的 application.properties 拷贝进去,根据用户实际情况修改相应的配置。 - 使用提供的脚本运行服务:
bash run.sh
- Debezium Server with Databend 将会启动
- 构建和打包 debezium server:
同时我们也提供了相应的 Docker image,可以在容器中一键启动:
version: '2.1' services: debezium: image: ghcr.io/databendcloud/debezium-server-databend:pr-2 ports: - "8080:8080" - "8083:8083" volumes: - $PWD/conf:/app/conf - $PWD/data:/app/data
NOTE: 在容器中启动注意所连接数据库的网络。
Debezium Server Databend Application Properties
本文章使用下面提供的配置,更多的参数说明以及配置可以参考文档。
debezium.sink.type=databend debezium.sink.databend.upsert=true debezium.sink.databend.upsert-keep-deletes=false debezium.sink.databend.database.databaseName=debezium debezium.sink.databend.database.url=jdbc:databend://tnf34b0rm--xxxxxx.default.databend.cn:443 debezium.sink.databend.database.username=cloudapp debezium.sink.databend.database.password=password debezium.sink.databend.database.primaryKey=id debezium.sink.databend.database.tableName=products debezium.sink.databend.database.param.ssl=true # enable event schemas debezium.format.value.schemas.enable=true debezium.format.key.schemas.enable=true debezium.format.value=json debezium.format.key=json # mysql source debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=60000 debezium.source.database.hostname=127.0.0.1 debezium.source.database.port=3306 debezium.source.database.user=root debezium.source.database.password=123456 debezium.source.database.dbname=mydb debezium.source.database.server.name=from_mysql debezium.source.include.schema.changes=false debezium.source.table.include.list=mydb.products # debezium.source.database.ssl.mode=required # Run without Kafka, use local file to store checkpoints debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory debezium.source.database.history.file.filename=data/status.dat # do event flattening. unwrap message! debezium.transforms=unwrap debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState debezium.transforms.unwrap.delete.handling.mode=rewrite debezium.transforms.unwrap.drop.tombstones=true # ############ SET LOG LEVELS ############ quarkus.log.level=INFO # Ignore messages below warning level from Jetty, because it's a bit verbose quarkus.log.category."org.eclipse.jetty".level=WARN
准备数据
在 MySQL 数据库中准备数据
进入 MySQL 容器
docker-compose exec mysql mysql -uroot -p123456
创建数据库 mydb 和表 products
,并插入数据:
CREATE DATABASE mydb; USE mydb; CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512)); ALTER TABLE products AUTO_INCREMENT = 10; INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"), (default,"car battery","12V car battery"), (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"), (default,"hammer","12oz carpenter's hammer"), (default,"hammer","14oz carpenter's hammer"), (default,"hammer","16oz carpenter's hammer"), (default,"rocks","box of assorted rocks"), (default,"jacket","water resistent black wind breaker"), (default,"cloud","test for databend"), (default,"spare tire","24 inch spare tire");
在 Databend 中创建 Database
NOTE: 用户可以不必先在 Databend 中创建表,系统检测到后会自动为用户建表。
启动 Debezium Server Databend
bash run.sh
首次启动会进入 init snapshot 模式,通过配置的 Batch Size 全量将 MySQL 中的数据同步到 Databend,所以在 Databend 中可以看到 MySQL 中的数据已经同步过来了:
同步 Insert 数据
我们继续往 MySQL 中插入 5 条数据:
INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"), (default,"car battery","12V car battery"), (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"), (default,"hammer","12oz carpenter's hammer"), (default,"hammer","14oz carpenter's hammer");
Debezium server databend 日志:
同时在 Databend 中可以查到 5 条数据已经同步过来了:
同步 Update 数据
配置文件中 debezium.sink.databend.upsert=true
,所以我们也可以处理 Update/Delete 的事件。
在 MySQL 中更新 id=10 的数据:
update products set name="from debezium" where id=10;
在 Databend 中可以查到 id 为 10 的数据已经被更新:
同步 Delete 数据
在配置文件中,有以下的配置,既可开启处理 Delete 事件的能力:
debezium.sink.databend.upsert-keep-deletes=false debezium.transforms=unwrap debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState debezium.transforms.unwrap.delete.handling.mode=rewrite debezium.transforms.unwrap.drop.tombstones=true
Debezim Server 对 Delete 的处理比较复杂,在 DELETE 操作下会生成两条事件记录:
- 一个包含 "op": "d",其他的行数据以及字段;
- 一个tombstones记录,它具有与被删除行相同的键,但值为null。
这两条事件会同时发出,在 Debezium Server Databend 中我们选择对 Delete 数据实行软删除,这就要求我们在 target table 中拥有 __deleted
字段,当 Delete 事件过来的时候我们将该字段置为 TRUE 后插入到目标表。
这样设计的好处是,有些用户想要保留这些数据,但可能未来会想到将其删除,这样就为用户提供了可选的方案,未来想要删除这些数据的时候,只需要 delete from table where __deleted=true
即可。
关于 Debezium 对删除事件的说明以及处理方式,详情可参考文档。
在 MySQL 中删除 id=12 的数据:
delete from products where id=12;
在 Databend 中可以观察到 id=12 的值的 __deleted
字段已经被置为 true
。
环境清理
操作结束后,在 docker-compose.yml
文件所在的目录下执行如下命令停止所有容器:
docker-compose down
结论
以上就是基于轻量级 CDC debezium server databend 构建 MySQL 到 Databend 的 实时数据同步的全部过程,这种方式不需要依赖 Flink, Kafka 等大型组件,启动和管理非常方便。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Spring 容器原始 Bean 是如何创建的?
以下内容基于 Spring6.0.4。 这个话题其实非常庞大,我本来想从 getBean 方法讲起,但一想这样讲完估计很多小伙伴就懵了,所以我们还是一步一步来,今天我主要是想和小伙伴们讲讲 Spring 容器创建 Bean 最最核心的 createBeanInstance 方法,这个方法专门用来创建一个原始 Bean 实例。 松哥这里就以 Spring 源码中方法的执行顺序为例来和小伙伴们分享。 1. doCreateBean AbstractAutowireCapableBeanFactory#doCreateBean 就是 Bean 的创建方法,但是 Bean 的创建涉及到的步骤非常多,包括各种需要调用的前置后置处理器方法,今天我主要是想和大家聊聊单纯的创建 Bean 的过程,其他方法咱们后面文章继续。 在 doCreateBean 方法中,有如下一行方法调用: protected Object doCreateBean(String beanName, RootBeanDefinition mbd, @Nullable Object[] args) throws BeanCrea...
- 下一篇
云上 Index:看「简墨」如何为云原生打造全新索引
拓数派首款数据计算引擎 PieCloudDB 是一款全新的云原生虚拟数仓。为了提升用户使用体验,提高查询效率,在实现存算分离的同时,PieCloudDB 设计与打造了全新的存储引擎「简墨」等模块,并针对云场景和分析型场景设计了高效的「Data Skipping」索引。本文将详细介绍 PieCloudDB 的存储和索引的设计与打造过程,并将通过示例来演示 PieCloudDB 如何使用 Data Skipping 索引加速查询的效率。 作为一款云原生虚拟数仓,PieCloudDB 依赖于云计算所提供的基础设施服务,包括大规模分布式集群、虚拟机、容器等。通过利用这些服务,PieCloudDB 可以更好地适应动态的和不断变化的工作负载需求,并将实现高可用、易扩展、异地多活和弹性伸缩等特性。 索引是数据库系统提升查询效率的关键技术,其设计与存储息息相关。为了更好地适应云原生和分析型场景的要求,PieCloudDB 必须使用合理的存储架构及技术,打造一款全新的存储引擎,并实现高效的云上索引技术,满足用户查询需求。PieCloudDB 的存储作为将应用程序和用户数据连接起来的关键桥梁,是云原生虚拟...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS关闭SELinux安全模块
- CentOS7安装Docker,走上虚拟化容器引擎之路
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Linux系统CentOS6、CentOS7手动修改IP地址
- Hadoop3单机部署,实现最简伪集群
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS7,8上快速安装Gitea,搭建Git服务器