实测有效|用 SeaTunnel 免费实现 MySQL→Oracle 实时同步,步骤超细
要说 MySQL 同步到Oracle的工具,除了传统的 OGG,应该考虑的就是 ST(SeaTunnel) 了吧,简直太好用啦 ~
当前生产已稳定运行 4 个月,源端 3 个库,完美支持!推荐给大家试试~
- SeaTunnel怎么用?来看看帮助信息
./bin/seatunnel.sh -h
Usage: seatunnel.sh [options]
Options:
--async Run the job asynchronously, when the job
is submitted, the client will
exit
(default:
false
)
-can, --cancel-job Cancel job by JobId
--check Whether check config (default:
false
)
-cj, --close-job Close client the task will also be closed
(default:
true
)
-cn, --cluster The name of cluster
-c, --config Config file
--decrypt Decrypt config file, When both --decrypt
and --encrypt are specified, only
--encrypt will take effect (default:
false
)
-m, --master, -e, --deploy-mode SeaTunnel job submit master, support
[
local
, cluster] (default: cluster)
--encrypt Encrypt config file, when both --decrypt
and --encrypt are specified, only
--encrypt will take effect (default:
false
)
--get_running_job_metrics Gets metrics
for
running
jobs
(default:
false
)
-h, --
help
Show the usage message
-j, --job-id Get job status by JobId
-l, --list list job status (default:
false
)
--metrics Get job metrics by JobId
-n, --name SeaTunnel job name (default: SeaTunnel)
-r, --restore restore with savepoint by jobId
-s, --savepoint savepoint job by jobId
--set-job-id Set custom job id
for
job
-i, --variable Variable substitution, such as -i
city=beijing, or -i date=20190318.We use
','
as separator, when inside
""
,
','
are
treated as normal characters instead of
delimiters. For example, -i
city=
"beijing,shanghai"
. If you want to
use dynamic parameters, you can use the
following format: -i date=$(date
+
"%Y%m%d"
). (default: [])
++参考说明:++
1)作业提交相关参数
2)作业管理相关参数
- 怎么安装,可以参考之前文章
【数据同步】SeaTunnel初体验,5000字深入浅出带你用上Oracle-CDC
- 日常运维之任务管理
3.1 查看任务:running 表示正在运行的,当然也会看到其他的状态
./bin/seatunnel.sh -l
3.2 暂停任务
./bin/seatunnel.sh -s 967714059992432641
3.3 再次启动已暂停的任务
./bin/seatunnel.sh -r 967714059992432641 -c $SEATUNNEL_HOME/config/mysql_virdb_config
由于恢复的时候未加入后台运行,导致一直前端,直接ctrl+c 退出后,状态为
PS: 故恢复时候,一定要加入相关的参数,job命名,是否后台运行等
./bin/seatunnel.sh -r 967714059992432641 -c $SEATUNNEL_HOME/config/mysql_virdb_config --async -n job_mysql_virdb
再次查看作业状态,这就是我们期待的样子
3.4 取消任务
该命令会取消指定作业,取消作业后,作业会被停止,作业的状态会变为CANCELED。
支持批量取消作业,可以一次取消多个作业。
被cancel的作业的所有断点信息都将被删除,无法通过seatunnel.sh -r 恢复。
./bin/seatunnel.sh -can 967714059992432641
PS: 取消后的状态,与直接ctrl+c 退出后竟然的相识,若再启动应该会丢了一部分数据吧
- SeaTunnel 日志配置
配置文件:$SEATUNNEL_HOME/config/log4j2.properties
4.1 为每个作业单独配置日志文件(重启其中一个job后生效)
如下配置
rootLogger.appenderRef.file.ref = fileAppender
appender.file.layout.pattern = [%X{ST-JID}] %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n
更改为如下:
rootLogger.appenderRef.file.ref = routingAppender
appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n
4.2 SeaTunnel 支持定时删除旧日志文件,以避免磁盘空间不足
您可以在 $SEATUNNEL_HOME/config/seatunnel.yaml 配置文件中添加以下配置: 默认配置项如下(时间按分钟计算,比如1440为1440分钟)
seatunnel:
engine:
history-job-expire-minutes:1440
telemetry:
logs:
scheduled-deletion-enable:true
++说明:++
- history-job-expire-minutes: 设置历史作业和日志的保留时间(单位:分钟)。系统将在指定的时间后自动清除过期的作业信息和日志文件。
- scheduled-deletion-enable: 启用定时清理功能,默认为 true。系统将在作业达到 history-job-expire-minutes 设置的过期时间后自动删除相关日志文件。关闭该功能后,日志将永久保留在磁盘上,需要用户自行管理,否则可能影响磁盘占用。建议根据需求合理配置。
- Web UI 查看任务情况
ps:终于等到了,居然有 Web UI界面监控作业情况了,太棒了吧!!!来来来,看下如何配置及访问
5.1 配置Web UI
配置文件:$SEATUNNEL_HOME/config/seatunnel.yaml
,默认配置如下
seatunnel:
engine:
http:
enable-http:true
port:8080
5.2 访问Web UI 页面
打开浏览器,访问 http://ip:8080 即可
5.3 同步数据比对
通过最新创建时间及更新时间,以及行数进行比对。
6 常见错误
6.1 必须配置数据库名及表名,不然会报错
2025-04-24 17:59:42,640 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Fatal Error,
2025-04-24 17:59:42,641 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Please submit bug report
in
https://github.com/apache/seatunnel/issues
2025-04-24 17:59:42,641 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Reason:SeaTunnel job executed failed
2025-04-24 17:59:42,642 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.api.table.factory.FactoryException: ErrorCode:[API-06], ErrorDescription:[Factory initialize failed] - Unable to create a sink
for
identifier
'jdbc'
.
at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSink(FactoryUtil.java:250)
at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.createSinkAction(MultipleTableJobConfigParser.java:669)
at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSink(MultipleTableJobConfigParser.java:592)
at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:240)
at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:123)
at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:191)
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:165)
... 2 more
Caused by: org.apache.seatunnel.api.configuration.util.OptionValidationException: ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - There are unconfigured options, the options(
'database'
) are required because [
'generate_sink_sql'
==
true
] is
true
.
at org.apache.seatunnel.api.configuration.util.ConfigValidator.validate(ConfigValidator.java:200)
at org.apache.seatunnel.api.configuration.util.ConfigValidator.validate(ConfigValidator.java:107)
at org.apache.seatunnel.api.configuration.util.ConfigValidator.validate(ConfigValidator.java:47)
at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSink(FactoryUtil.java:239)
... 8 more
2025-04-24 17:59:42,642 ERROR [o.a.s.c.s.SeaTunnel ] [main] -
===============================================================================
Exception
in
thread
"main"
org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.api.table.factory.FactoryException: ErrorCode:[API-06], ErrorDescription:[Factory initialize failed] - Unable to create a sink
for
identifier
'jdbc'
.
at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSink(FactoryUtil.java:250)
at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.createSinkAction(MultipleTableJobConfigParser.java:669)
at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSink(MultipleTableJobConfigParser.java:592)
at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:240)
at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:123)
at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:191)
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:165)
... 2 more
Caused by: org.apache.seatunnel.api.configuration.util.OptionValidationException: ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - There are unconfigured options, the options(
'database'
) are required because [
'generate_sink_sql'
==
true
] is
true
.
at org.apache.seatunnel.api.configuration.util.ConfigValidator.validate(ConfigValidator.java:200)
at org.apache.seatunnel.api.configuration.util.ConfigValidator.validate(ConfigValidator.java:107)
at org.apache.seatunnel.api.configuration.util.ConfigValidator.validate(ConfigValidator.java:47)
at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSink(FactoryUtil.java:239)
... 8 more
6.2 报错2,ID问题
[967787050499571713] 2025-04-24 22:00:16,190 ERROR [.s.e.s.c.CheckpointCoordinator] [hz.main.generic-operation.thread-14] - report error from task
org.apache.seatunnel.common.utils.SeaTunnelException: java.lang.IllegalArgumentException: can't find field [ID]
at org.apache.seatunnel.api.table.type.SeaTunnelRowType.indexOf(SeaTunnelRowType.java:87)
at org.apache.seatunnel.api.table.type.SeaTunnelRowType.indexOf(SeaTunnelRowType.java:77)
at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSink.createWriter(JdbcSink.java:133)
at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSink.createWriter(JdbcSink.java:66)
at org.apache.seatunnel.api.sink.multitablesink.MultiTableSink.createWriter(MultiTableSink.java:82)
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.restoreState(SinkFlowLifeCycle.java:342)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$16(SeaTunnelTask.java:426)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:423)
at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$runInternal$0(NotifyTaskRestoreOperation.java:107)
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointCoordinator.java:391) ~[seatunnel-starter.jar:2.3.10]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:182) ~[seatunnel-starter.jar:2.3.10]
at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.runInternal(CheckpointErrorReportOperation.java:48) ~[seatunnel-starter.jar:2.3.10]
at org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:42) ~[seatunnel-starter.jar:2.3.10]
at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[seatunnel-starter.jar:2.3.10]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[seatunnel-starter.jar:2.3.10]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[seatunnel-starter.jar:2.3.10]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) ~[seatunnel-starter.jar:2.3.10]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) ~[seatunnel-starter.jar:2.3.10]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) ~[seatunnel-starter.jar:2.3.10]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) ~[seatunnel-starter.jar:2.3.10]
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) ~[seatunnel-starter.jar:2.3.10]
6.3 已存在会提示
Error Msg = ORA-00955: name is already used by an existing object
at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:530)
... 43 more
at org.apache.seatunnel.engine.server.CoordinatorService.lambda$submitJob$6(CoordinatorService.java:656)
at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Web UI的任务展示与服务器查询有不一致的地方
jobid:967961857958608897 之前暂停过,再启动后,在web ui存在2条记录
6.4 可能是数据延迟导致,过几天再看的,已经没有
- 参考文档
-
source 源端 mysql-cdc配置 https://seatunnel.apache.org/zh-CN/docs/2.3.10/connector-v2/source/MySQL-CDC
-
sink 目标端 oracle 配置 https://seatunnel.apache.org/zh-CN/docs/2.3.10/connector-v2/sink/Oracle
-
为每个作业配置单独的配置项 https://seatunnel.apache.org/zh-CN/docs/2.3.10/seatunnel-engine/logging

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
CRC32 自包含退化现象分析
我的好友 fuzhe 在阅读 LevelDB 源码时,发现了一个有趣的细节:系统在存储 CRC 校验码时,并不直接使用计算出的值,而是要先做一个看似"多余"的 mask 操作。这个操作包括右旋转 15 位和加上一个神秘的常数 0xa282ead8ul。 代码注释提到这是为了解决"对包含嵌入式 CRC 的字符串计算 CRC 是有问题的",但到底什么问题?为什么位操作能解决它?这个设计背后隐藏着什么原理? 今天我们带着这些疑问,开始一场 CRC 的探索之旅。一步步揭开 CRC 的数学本质,理解"自包含退化"现象,最终看到 LevelDB 工程师们如何用优雅的数学技巧解决了这个深刻的问题。 什么是 CRC:从自然数除法到校验码 传输数字 1234 时,如何检测错误?用除法取余数: 1234 ÷ 97 = 12 余 70 余数 70 就是 1234 的指纹。 校验原理: 发送:数据=1234, 校验=70 接收:重新计算 received % 97 的余数 余数不是 70 → 检测到错误 判断收到的消息正确性的方式是: 接收到:数据=1234, 校验=70 1. 计算 1234 % 97 = ...
-
下一篇
# 一行代码引发 12G 内存 5 分钟爆仓!SeaTunnel Kafka 连接器"内存溢出"元凶抓到了
转载 | 滑思眉Philip 问题背景 在Apache SeaTunnel 2.3.9版本的Kafka连接器实现中,存在一个潜在的内存溢出风险。当用户配置流式作业从Kafka读取数据时,即使设置了读取速率限制(read_limit.rows_per_second),系统仍可能出现内存持续增长直至OOM(Out Of Memory)的情况。 问题现象 用户在实际部署中观察到以下现象: 在8核12G内存的SeaTunnel Engine集群上运行Kafka到HDFS的流式作业 虽然配置了read_limit.rows_per_second=1的速率限制,但内存使用量在5分钟内从200MB飙升至5GB 停止作业后内存不释放,恢复作业后内存继续增长直至OOM 最终导致worker节点重启 根本原因分析 通过代码审查发现,问题根源在于KafkaSource类的createReader方法中,elementsQueue被初始化为无界队列: elementsQueue = new LinkedBlockingQueue<>(); 这种实现方式存在两个关键问题: 队列无界:LinkedBl...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- MySQL数据库在高并发下的优化方案
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2全家桶,快速入门学习开发网站教程
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2配置默认Tomcat设置,开启更多高级功能