Debezium的基本使用(以MySQL为例)
- GreatSQL社区原创内容未经授权不得随意使用,转载请联系小编并注明来源。
- GreatSQL是MySQL的国产分支版本,使用上与MySQL一致。
一、Debezium介绍
摘自官网:
Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.
简单理解就是Debezium可以捕获数据库中所有行级的数据变化并包装成事件流顺序输出。
二、基本使用
下面以MySQL为例介绍Debezium的基本使用。
1. MySQL的准备工作
- 准备一个MySQL用户并且拥有相应权限,像这样:
CREATE USER 'dbz'@'%' IDENTIFIED BY 'dbzpwd'; GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dbz' IDENTIFIED BY 'dbzpwd';
- 检查MySQL是否开启
log-bin
SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin'; -- If the following error occurs: The 'INFORMATION_SCHEMA.GLOBAL_VARIABLES' feature is disabled... -- please execute the given SQL again after execute this SQL: set global show_compatibility_56=on;
如果是OFF
则需要修改MySQL配置文件,类似下面这样:
server-id = 223344 #必须有 log_bin = mysql-bin #log_bin的值是binlog文件序列的基本名称 binlog_format = ROW #必须是ROW binlog_row_image = FULL #必须是FULL expire_logs_days = 10 #依据实际情况而定
- 准备数据库&表
create database inventory; create table inventory.a (id bigint primary key auto_increment, name varchar(32)); insert into inventory.a values (null, 'n1'),(null, 'n2'),(null, 'n3');
2. 编写程序
2.1. 工程依赖(Maven)
pom.xml
<dependency> <groupId>io.debezium</groupId> <artifactId>debezium-api</artifactId> <version>${version.debezium}</version> </dependency> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-embedded</artifactId> <version>${version.debezium}</version> </dependency> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-connector-mysql</artifactId> <version>${version.debezium}</version> </dependency>
目前Debezium最新稳定版本为:1.9.5.Final
2.2. 准备数据库&表
create database inventory; create table inventory.a (id bigint primary key, name varchar(32)); insert into inventory.a values (1, 'n1'),(2, 'n2'),(3, 'n3');
2.3. 代码编写
package com.greatdb.dbzdemo; import java.io.IOException; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.format.Json; /** * @author wang.jianwen * @version 1.0 * @date 2022/07/29 */ public class DebeziumTest { private static DebeziumEngine<ChangeEvent<String, String>> engine; public static void main(String[] args) throws Exception { final Properties props = new Properties(); props.setProperty("name", "dbz-engine"); props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector"); //offset config begin - 使用文件来存储已处理的binlog偏移量 props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore"); props.setProperty("offset.storage.file.filename", "/tmp/dbz/storage/mysql_offsets.dat"); props.setProperty("offset.flush.interval.ms", "0"); //offset config end props.setProperty("database.server.name", "mysql-connector"); props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory"); props.setProperty("database.history.file.filename", "/tmp/dbz/storage/mysql_dbhistory.txt"); props.setProperty("database.server.id", "122112"); //需要与MySQL的server-id不同 props.setProperty("database.hostname", "tmg"); props.setProperty("database.port", "3306"); props.setProperty("database.user", "mysqluser"); props.setProperty("database.password", "mysqlpw"); props.setProperty("database.include.list", "inventory");//要捕获的数据库名 props.setProperty("table.include.list", "inventory.a");//要捕获的数据表 props.setProperty("snapshot.mode", "initial");//全量+增量 // 使用上述配置创建Debezium引擎,输出样式为Json字符串格式 engine = DebeziumEngine.create(Json.class) .using(props) .notifying(record -> { System.out.println(record);//输出到控制台 }) .using((success, message, error) -> { if (error != null) { // 报错回调 System.out.println("------------error, message:" + message + "exception:" + error); } closeEngine(engine); }) .build(); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(engine); addShutdownHook(engine); awaitTermination(executor); System.out.println("------------main finished."); } private static void closeEngine(DebeziumEngine<ChangeEvent<String, String>> engine) { try { engine.close(); } catch (IOException ignored) { } } private static void addShutdownHook(DebeziumEngine<ChangeEvent<String, String>> engine) { Runtime.getRuntime().addShutdownHook(new Thread(() -> closeEngine(engine))); } private static void awaitTermination(ExecutorService executor) { if (executor != null) { try { executor.shutdown(); while (!executor.awaitTermination(5, TimeUnit.SECONDS)) { } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } }
3. 测试
程序跑起来后,可以看到控制台输出:
...(省略) EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":1}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":1,"name":"n1"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005186,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005191,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=1,name=n1},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005186,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005191}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}] EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":2}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"n2"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005195,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=2,name=n2},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005195,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}] EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":3}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":3,"name":"n3"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005196,"snapshot":"last","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=3,name=n3},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005196,snapshot=last,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}] ...(省略)
可以看到全量的数据已经输出,关键的数据如下:
..."payload":{"before":null,"after":{"id":1,"name":"n1"}..."op":"r"... ..."payload":{"before":null,"after":{"id":2,"name":"n2"}..."op":"r"... ..."payload":{"before":null,"after":{"id":3,"name":"n3"}..."op":"r"...
- 接下来新增一条数据:
insert into inventory.a values (4, 'n4');
控制台输出:
..."payload":{"before":null,"after":{"id":4,"name":"n4"}..."op":"c"...
- 修改一条数据:
update inventory.a set name = 'n4-upd' where id = 4;
控制台输出:
..."payload":{"before":{"id":4,"name":"n4"},"after":{"id":4,"name":"n4-upd"}..."op":"u"...
- 删除一条数据:
delete from inventory.a where id = 1;
控制台输出:
..."payload":{"before":{"id":1,"name":"n1"},"after":null..."op":"d"...
三、总结
本文以MySQL为例介绍了Debezium在代码中基本使用流程,对MySQL的数据进行常见的增删改操作,Debezium将捕获这些数据行的变化,并记录了数据行变化前后的数据,并对外提供事件流,外部可以获取并对事件进行相应处理。
参考:https://debezium.io/documentation/reference/1.8/index.html

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
EMQX 4.x 版本更新:Kafka 与 RocketMQ 集成安全增强
近日,EMQX 开源版 v4.3.17、v4.3.18、v4.4.6、v4.4.7,与企业版 v4.3.12、v4.3.13、v4.4.6、v4.4.7 八个维护版本正式发布。 此次发布包含了多个功能更新:规则引擎 RocketMQ 支持 ACL 检查、Kafka 支持 SASL/SCRAM 与 SASL/GSSAPI 认证以适配更多部署方式,提升规则引擎 TDengine 写入性能以及 MQTT 共享订阅性能,同时在 CLI 中提供了配置文件检查命令,方便用户修改 EMQX 配置。此外还修复了多项已知 BUG。 欢迎下载使用:https://www.emqx.com/zh/try 规则引擎新功能 RocketMQ 支持携带用户信息实现 ACL 检查 包含版本 企业版 v4.3.12 企业版 v4.4.6 RocketMQ 在 4.4.0 版本开始支持 ACL,通过创建多个用户并为其赋予不同的 Topic 和消费组权限,以达到用户之间的权限隔离。开启 ACL 访问控制会导致没有配置认证信息的客户端连接中断。 本次发布 EMQX 新增了 RocketMQ ACL 支持,在资源创建页面填入...
- 下一篇
老梗新玩「GitHub 热点速览 v.22.34」
不知道你是否和我有一样的烦恼,最近的流行梗当自己要用拿来造词时,就陷入了不知道咋“换壳”的尴尬地步。sao-gen-gen 大大减少了你老梗新用的脑力成本,骚话张口就来是怎么回事呢?下面就让小编带大家一起了解下这些会玩的开源项目吧。 同样会玩的还有 ravynos,它要做个 BSD 版本的 macOS,什么都一样,就是生态更开放。最会玩的还是当属 次世代赛博编程语言 helang,仿佛翻开了某个不常见版本的圣经。 以下内容摘录自微博@HelloGitHub 的 GitHub Trending 及 Hacker News 热帖(简称 HN 热帖),选项标准:新发布 | 实用 | 有趣,根据项目 release 时间分类,发布时间不超过 14 day 的项目会标注 New,无该标志则说明项目 release 超过半月。由于本文篇幅有限,还有部分项目未能在本文展示,望周知 🌝 本文目录 本周特推 1.1 骚话生成器:sao-gen-gen 1.2 文本图像生成器:stable-diffusion GitHub Trending 周榜 2.1 笔记应用:joplin 2.2 内存数据库:dr...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7安装Docker,走上虚拟化容器引擎之路
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS关闭SELinux安全模块
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- Linux系统CentOS6、CentOS7手动修改IP地址
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS8安装Docker,最新的服务器搭配容器使用
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能