您现在的位置是:首页 > 文章详情

深入浅出Apache SeaTunnel SQL Server Sink Connector

日期:2023-10-23点击:93

在大数据时代,数据的迁移和流动已经变得日益重要。为了使数据能够更加高效地从一个源流向另一个目标,我们需要可靠、高效和易于配置的工具。今天,我们将介绍 JDBC SQL Server Sink Connector,这是一个专为 SQL Server 设计的连接器,能够确保数据的精准、高效传输。

file

不仅如此,它还支持多种流处理引擎,例如 Spark、Flink 和 SeatTunnel Zeta。无论您是初学者还是有经验的开发者,本文都将为您提供关于如何最大限度地利用此连接器的深入见解。

支持 SQL Server 版本

  • 服务器:2008(或更高版本,仅供信息参考)

支持的引擎

Spark<br/> Flink<br/> Seatunnel Zeta<br/>

主要特点

使用 Xa 事务 来确保 精准一次性。因此,仅支持支持 Xa 事务 的数据库的 精准一次性。您可以设置 is_exactly_once=true 来启用它。

描述

通过 JDBC 写入数据。支持批处理模式和流处理模式,支持并发写入,支持精准一次性语义(使用 XA 事务保证)。

支持的数据源信息

数据源 支持的版本 驱动 URL Maven
SQL Server 支持版本 >= 2008 com.microsoft.sqlserver.jdbc.SQLServerDriver jdbc:sqlserver://localhost:1433 下载

数据库依赖

请下载与 'Maven' 对应的支持列表,并将其复制到 '$SEATNUNNEL_HOME/plugins/jdbc/lib/' 工作目录<br/> 例如 SQL Server 数据源:cp mssql-jdbc-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/

数据类型映射

SQL Server 数据类型 Seatunnel 数据类型
BIT BOOLEAN
TINYINT<br/>SMALLINT SHORT
INTEGER INT
BIGINT LONG
DECIMAL<br />NUMERIC<br />MONEY<br />SMALLMONEY DECIMAL((指定列的指定列大小)+1,<br/>(获取指定列的小数点右边的数字的数量。)))
REAL FLOAT
FLOAT DOUBLE
CHAR<br />NCHAR<br />VARCHAR<br />NTEXT<br />NVARCHAR<br />TEXT STRING
DATE LOCAL_DATE
TIME LOCAL_TIME
DATETIME<br />DATETIME2<br />SMALLDATETIME<br />DATETIMEOFFSET LOCAL_DATE_TIME
TIMESTAMP<br />BINARY<br />VARBINARY<br />IMAGE<br />UNKNOWN 尚不支持

Sink 选项

名称 类型 必需 默认值 描述
url 字符串 - JDBC 连接的 URL。例如:jdbc:sqlserver://localhost:1433;databaseName=mydatabase
driver 字符串 - 用于连接到远程数据源的 JDBC 类名,如果使用 SQL Server,则值为 com.microsoft.sqlserver.jdbc.SQLServerDriver
user 字符串 - 连接实例的用户名
password 字符串 - 连接实例的密码
query 字符串 - 使用此 SQL 将上游输入数据写入数据库。例如 INSERT ...query 具有更高的优先级
database 字符串 - 使用此 databasetable-name 自动生成 SQL 并接收上游输入数据写入数据库。此选项与 query 互斥,优先级更高。
table 字符串 - 使用数据库和此表名自动生成 SQL 并接收上游输入数据写入数据库。此选项与 query 互斥,优先级更高。
primary_keys 数组 - 此选项用于支持自动生成 SQL 时的 insertdeleteupdate 等操作。
support_upsert_by_query_primary_key_exist 布尔 false 选择是否使用 INSERT SQL、UPDATE SQL 来处理基于查询主键是否存在的更新事件(INSERT、UPDATE_AFTER)。只有在数据库不支持 upsert 语法时才使用此配置。注意:此方法性能较低。
connection_check_timeout_sec 整数 30 等待用于验证连接的数据库操作完成的秒数。
max_retries 整数 0 重试提交失败(executeBatch)的次数。
batch_size 整数 1000 用于批量写入的记录数量达到 batch_size 或时间达到 checkpoint.interval 时,数据将刷新到数据库。
is_exactly_once 布尔 false 是否启用精准一次性语义,将使用 XA 事务。如果开启,需要设置 xa_data_source_class_name
generate_sink_sql 布尔 false 基于要写入的数据库表生成 SQL 语句。
xa_data_source_class_name 字符串 - 数据库驱动程序的 XA 数据源类名,例如,SQL Server 为 com.microsoft.sqlserver.jdbc.SQLServerXADataSource,其他数据源请参考附录。
max_commit_attempts 整数 3 事务提交失败的重试次数。
transaction_timeout_sec 整数 -1 事务打开后的超时时间,默认值为 -1(永不超时)。请注意,设置超时可能会影响精准一次性语义。
auto_commit 布尔 true 默认启用自动事务提交。
common-options - Sink 插件通用参数,请参考 Sink Common Options 以获取详细信息。

提示

如果未设置 partition_column,则将以单一并发运行;如果设置了 partition_column,则将根据任务的并发度执行并行操作。

任务示例

简单:

这是一个读取 Sql Server 数据并将其直接插入另一个表中的示例

env { # 您可以在此处设置引擎配置 execution.parallelism = 10 } source { # 这是一个示例源插件,仅用于测试和演示源插件的功能 Jdbc { driver = com.microsoft.sqlserver.jdbc.SQLServerDriver url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test" user = SA password = "Y.sa123456" query = "select * from column_type_test.dbo.full_types_jdbc" # 并行分片读取字段 partition_column = "id" # 片段数量 partition_num = 10 } } transform { # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息, # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql } sink { Jdbc { driver = com.microsoft.sqlserver.jdbc.SQLServerDriver url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test" user = SA password = "Y.sa123456" query = "insert into full_types_jdbc_sink( id, val_char, val_varchar, val_text, val_nchar, val_nvarchar, val_ntext, val_decimal, val_numeric, val_float, val_real, val_smallmoney, val_money, val_bit, val_tinyint, val_smallint, val_int, val_bigint, val_date, val_time, val_datetime2, val_datetime, val_smalldatetime ) values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )" } # 如果您想要获取有关如何配置 Seatunnel 和查看接收插件的完整列表的更多信息, # 请转到 https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc } 

CDC(Change Data Capture)事件

我们还支持 CDC 变更数据,此时需要配置数据库、表和主键。

Jdbc { source_table_name = "customers" driver = com.microsoft.sqlserver.jdbc.SQLServerDriver url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test" user = SA password = "Y.sa123456" generate_sink_sql = true database = "column_type_test" table = "dbo.full_types_sink" batch_size = 100 primary_keys = ["id"] } 

精确一次性 Sink

事务性写入可能会更慢,但对数据更准确

 Jdbc { driver = com.microsoft.sqlserver.jdbc.SQLServerDriver url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test" user = SA password = "Y.sa123456" query = "insert into full_types_jdbc_sink( id, val_char, val_varchar, val_text, val_nchar, val_nvarchar, val_ntext, val_decimal, val_numeric, val_float, val_real, val_smallmoney, val_money, val_bit, val_tinyint, val_smallint, val_int, val_bigint, val_date, val_time, val_datetime2, val_datetime, val_smalldatetime ) values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )" is_exactly_once = "true" xa_data_source_class_name = "com.microsoft.sqlserver.jdbc.SQLServerXADataSource" } # 如果您想要获取有关如何配置 Seatunnel 和查看接收插件的完整列表的更多信息, # 请转到 https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc 

本文由 白鲸开源科技 提供发布支持!

原文链接:https://my.oschina.net/dailidong/blog/10122542
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章