您现在的位置是:首页 > 文章详情

Apache SeaTunnel基于JDBC连接器开发教程

日期:2025-02-19点击:66

说明

以下内容旨在帮助开发人员,快速了解熟悉SeaTunnel2.3.8程序框架,并能够进行JDBC连接器开发内容。

必要知识

在进行开发前,你一定要仔细阅读如下框架文档,它们存在于SeaTunnel源码中的Docs目录,这些文档能够帮你快速上手和规范开发,如果你已经阅读过则无需关注。

  • docs/zh/concept/config.md
  • docs/zh/concept/connector-v2-features.md
  • docs/zh/concept/JobEnvConfig.md
  • docs/zh/connector-v2/sink/Jdbc.md
  • docs/zh/connector-v2/sink-common-options.md
  • docs/zh/connector-v2/source-common-options.md
  • 连接器开发必看(https://www.modb.pro/db/447922)

SeaTunnel基础开发流程

拉取项目

 git clone https://github.com/apache/seatunnel.git 

编译构建

  1. 选择Profiles

  2. mvn打包安装本地包

    mvn clean install -Dmaven.test.skip=true

运行工程样例

seatunnel-examples是SeaTunnel本地环境运行模块,运行SeaTunnelEngineLocalExample的样例配置文件fake_to_console.conf,可查看SeaTunnel运行环境是否成功。

打包发布

选择Profiles后,通过Dist模块可直接构建发布包。

JDBC连接器开发

包目录介绍

使用技巧

做JDBC连接器开发我们一般仅需要关注两部分,第一个是catalog包目录,第二个是internal中的dialect

这两部分已经提供了区别不同数据库的差异性描述,其余大部分代码是公共使用,不建议轻易修改,否则可能会影响所有引用类。

catalog中的类介绍

MySqlCatalogFactory中使用factoryIdentifier()来标识数据库类型,optionRule()用于定制连接器参数效验规则,createCatalog()是工厂类用来创建实例的方法。

@AutoService(Factory.class) public class MySqlCatalogFactory implements CatalogFactory { [@Override](https://my.oschina.net/u/1162528) public String factoryIdentifier() { return DatabaseIdentifier.MYSQL; } [@Override](https://my.oschina.net/u/1162528) public Catalog createCatalog(String catalogName, ReadonlyConfig options) { String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL); Preconditions.checkArgument( StringUtils.isNoneBlank(urlWithDatabase), "Miss config <base-url>! Please check your config."); JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase); return new MySqlCatalog( catalogName, options.get(JdbcCatalogOptions.USERNAME), options.get(JdbcCatalogOptions.PASSWORD), urlInfo); } [@Override](https://my.oschina.net/u/1162528) public OptionRule optionRule() { return JdbcCatalogOptions.BASE_RULE.build(); } } 

MySqlCatalog 中包含了对于数据库元数据的查询,例如SELECT\_DATABASE\_EXISTS(库信息查询)、SELECT\_TABLE\_EXISTS (表信息查询),还定义了一些库表生成语句,例如获取表DDL语句,并且使用getTable()方法还能直接获取到外部表。

[@Slf4j](https://my.oschina.net/slf4j) public class MySqlCatalog extends AbstractJdbcCatalog { private static final String SELECT_COLUMNS_SQL_TEMPLATE = "SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME ='%s' ORDER BY ORDINAL_POSITION ASC"; private static final String SELECT_DATABASE_EXISTS = "SELECT SCHEMA_NAME FROM information_schema.schemata WHERE SCHEMA_NAME = '%s'"; private static final String SELECT_TABLE_EXISTS = "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE table_schema = '%s' AND table_name = '%s'"; private MySqlVersion version; private MySqlTypeConverter typeConverter; public MySqlCatalog( String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) { super(catalogName, username, pwd, urlInfo, null); this.version = resolveVersion(); this.typeConverter = new MySqlTypeConverter(version); } @Override protected String getDatabaseWithConditionSql(String databaseName) { return String.format(SELECT_DATABASE_EXISTS, databaseName); } @Override protected String getTableWithConditionSql(TablePath tablePath) { return String.format( SELECT_TABLE_EXISTS, tablePath.getDatabaseName(), tablePath.getTableName()); } @Override protected String getListDatabaseSql() { return "SHOW DATABASES;"; } @Override protected String getListTableSql(String databaseName) { return "SHOW TABLES;"; } @Override protected String getTableName(ResultSet rs) throws SQLException { return rs.getString(1); } @Override protected String getTableName(TablePath tablePath) { return tablePath.getTableName(); } @Override protected String getSelectColumnsSql(TablePath tablePath) { return String.format( SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getDatabaseName(), tablePath.getTableName()); } @Override protected TableIdentifier getTableIdentifier(TablePath tablePath) { return TableIdentifier.of( catalogName, tablePath.getDatabaseName(), tablePath.getTableName()); } @Override protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData, TablePath tablePath) throws SQLException { List<ConstraintKey> indexList = super.getConstraintKeys( metaData, tablePath.getDatabaseName(), tablePath.getSchemaName(), tablePath.getTableName()); for (Iterator<ConstraintKey> it = indexList.iterator(); it.hasNext(); ) { ConstraintKey index = it.next(); if (ConstraintKey.ConstraintType.UNIQUE_KEY.equals(index.getConstraintType()) && "PRIMARY".equals(index.getConstraintName())) { it.remove(); } } return indexList; } @Override protected Column buildColumn(ResultSet resultSet) throws SQLException { String columnName = resultSet.getString("COLUMN_NAME"); // e.g. tinyint(1) unsigned String columnType = resultSet.getString("COLUMN_TYPE"); // e.g. tinyint String dataType = resultSet.getString("DATA_TYPE").toUpperCase(); String comment = resultSet.getString("COLUMN_COMMENT"); Object defaultValue = resultSet.getObject("COLUMN_DEFAULT"); String isNullableStr = resultSet.getString("IS_NULLABLE"); boolean isNullable = isNullableStr.equals("YES"); // e.g. `decimal(10, 2)` is 10 long numberPrecision = resultSet.getInt("NUMERIC_PRECISION"); // e.g. `decimal(10, 2)` is 2 int numberScale = resultSet.getInt("NUMERIC_SCALE"); // e.g. `varchar(10)` is 40 long charOctetLength = resultSet.getLong("CHARACTER_OCTET_LENGTH"); // e.g. `timestamp(3)` is 3 int timePrecision = MySqlVersion.V_5_5.equals(version) ? 0 : resultSet.getInt("DATETIME_PRECISION"); Preconditions.checkArgument(!(numberPrecision > 0 && charOctetLength > 0)); Preconditions.checkArgument(!(numberScale > 0 && timePrecision > 0)); MysqlType mysqlType = MysqlType.getByName(columnType); boolean unsigned = columnType.toLowerCase(Locale.ROOT).contains("unsigned"); BasicTypeDefine<MysqlType> typeDefine = BasicTypeDefine.<MysqlType>builder() .name(columnName) .columnType(columnType) .dataType(dataType) .nativeType(mysqlType) .unsigned(unsigned) .length(Math.max(charOctetLength, numberPrecision)) .precision(numberPrecision) .scale(Math.max(numberScale, timePrecision)) .nullable(isNullable) .defaultValue(defaultValue) .comment(comment) .build(); return typeConverter.convert(typeDefine); } @Override protected String getCreateTableSql( TablePath tablePath, CatalogTable table, boolean createIndex) { return MysqlCreateTableSqlBuilder.builder(tablePath, table, typeConverter, createIndex) .build(table.getCatalogName()); } @Override protected String getDropTableSql(TablePath tablePath) { return String.format( "DROP TABLE `%s`.`%s`;", tablePath.getDatabaseName(), tablePath.getTableName()); } @Override protected String getCreateDatabaseSql(String databaseName) { return String.format("CREATE DATABASE `%s`;", databaseName); } @Override protected String getDropDatabaseSql(String databaseName) { return String.format("DROP DATABASE `%s`;", databaseName); } @Override public CatalogTable getTable(String sqlQuery) throws SQLException { Connection defaultConnection = getConnection(defaultUrl); return CatalogUtils.getCatalogTable( defaultConnection, sqlQuery, new MySqlTypeMapper(typeConverter)); } @Override protected String getTruncateTableSql(TablePath tablePath) throws CatalogException { return String.format( "TRUNCATE TABLE `%s`.`%s`;", tablePath.getDatabaseName(), tablePath.getTableName()); } public String getExistDataSql(TablePath tablePath) { return String.format( "SELECT * FROM `%s`.`%s` LIMIT 1;", tablePath.getDatabaseName(), tablePath.getTableName()); } private MySqlVersion resolveVersion() { try (Statement statement = getConnection(defaultUrl).createStatement(); ResultSet resultSet = statement.executeQuery("SELECT VERSION()")) { resultSet.next(); return MySqlVersion.parse(resultSet.getString(1)); } catch (Exception e) { log.info( "Failed to get mysql version, fallback to default version: {}", MySqlVersion.V_5_7, e); return MySqlVersion.V_5_7; } } } 

MysqlCreateTableSqlBuilderMysqlDataTypeConvertor是用于获取表DDL语句和类型转换器方法,较为简单这里就不一一赘述。

dialect中的类介绍

MySqlDialectFactory使用工厂方式创建MySqlDialect实例,使用acceptsURL()方法判断不同JDBC URL,从而识别不同数据库类型。

create()的重载方法,主要是用于数据库端兼容多个数据库方言的场景。

@AutoService(JdbcDialectFactory.class) public class MySqlDialectFactory implements JdbcDialectFactory { @Override public boolean acceptsURL(String url) { return url.startsWith("jdbc:mysql:"); } @Override public JdbcDialect create() { return new MysqlDialect(); } @Override public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) { if (DatabaseIdentifier.STARROCKS.equalsIgnoreCase(compatibleMode)) { return new StarRocksDialect(fieldIde); } return new MysqlDialect(fieldIde); } } 

MySqlDialect 的getRowConverter()getTypeConverter()用于获取行对象转换器和类型转换器。

行对象转换器主要提供JDBC数据对象 ⇔ \Leftrightarrow ⇔ SeaTunnelRow对象互转方法。

类型转换器主要提供JDBC数据类型 ⇔ \Leftrightarrow ⇔ SeaTunnel数据类型互转的方法,这两者共同作用于引擎侧内部对象和JDBC数据集的转入转出。

MySqlDialect中还定义了一些不通数据库操作的特性功能,例如Upsert功能实现和转义符号等。

MySqlDialect还能使用defaultParameter()方法为JDBC URL串定制一些默认的参数。

public class MysqlDialect implements JdbcDialect { private static final List NOT_SUPPORTED_DEFAULT_VALUES = Arrays.asList(MysqlType.BLOB, MysqlType.TEXT, MysqlType.JSON, MysqlType.GEOMETRY); public String fieldIde = FieldIdeEnum.ORIGINAL.getValue(); public MysqlDialect() { } public MysqlDialect(String fieldIde) { this.fieldIde = fieldIde; } @Override public String dialectName() { return DatabaseIdentifier.MYSQL; } @Override public JdbcRowConverter getRowConverter() { return new MysqlJdbcRowConverter(); } @Override public TypeConverter<BasicTypeDefine> getTypeConverter() { TypeConverter typeConverter = MySqlTypeConverter.DEFAULT_INSTANCE; return typeConverter; } @Override public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { return new MySqlTypeMapper(); } @Override public String quoteIdentifier(String identifier) { return "`" + getFieldIde(identifier, fieldIde) + "`"; } @Override public String quoteDatabaseIdentifier(String identifier) { return "`" + identifier + "`"; } @Override public String tableIdentifier(TablePath tablePath) { return tableIdentifier(tablePath.getDatabaseName(), tablePath.getTableName()); } @Override public Optional<String> getUpsertStatement( String database, String tableName, String[] fieldNames, String[] uniqueKeyFields) { String updateClause = Arrays.stream(fieldNames) .map( fieldName -> quoteIdentifier(fieldName) + "=VALUES(" + quoteIdentifier(fieldName) + ")") .collect(Collectors.joining(", ")); String upsertSQL = getInsertIntoStatement(database, tableName, fieldNames) + " ON DUPLICATE KEY UPDATE " + updateClause; return Optional.of(upsertSQL); } @Override public PreparedStatement creatPreparedStatement( Connection connection, String queryTemplate, int fetchSize) throws SQLException { PreparedStatement statement = connection.prepareStatement( queryTemplate, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); statement.setFetchSize(Integer.MIN_VALUE); return statement; } @Override public String extractTableName(TablePath tablePath) { return tablePath.getTableName(); } @Override public Map<String, String> defaultParameter() { HashMap<String, String> map = new HashMap<>(); map.put("rewriteBatchedStatements", "true"); return map; } @Override public TablePath parse(String tablePath) { return TablePath.of(tablePath, false); } @Override public Object[] sampleDataFromColumn( Connection connection, JdbcSourceTable table, String columnName, int samplingRate, int fetchSize) throws Exception { String sampleQuery; if (StringUtils.isNotBlank(table.getQuery())) { sampleQuery = String.format( "SELECT %s FROM (%s) AS T", quoteIdentifier(columnName), table.getQuery()); } else { sampleQuery = String.format( "SELECT %s FROM %s", quoteIdentifier(columnName), tableIdentifier(table.getTablePath())); } try (Statement stmt = connection.createStatement( ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) { stmt.setFetchSize(Integer.MIN_VALUE); try (ResultSet rs = stmt.executeQuery(sampleQuery)) { int count = 0; List<Object> results = new ArrayList<>(); while (rs.next()) { count++; if (count % samplingRate == 0) { results.add(rs.getObject(1)); } if (Thread.currentThread().isInterrupted()) { throw new InterruptedException("Thread interrupted"); } } Object[] resultsArray = results.toArray(); Arrays.sort(resultsArray); return resultsArray; } } } @Override public Long approximateRowCntStatement(Connection connection, JdbcSourceTable table) throws SQLException { // 1. If no query is configured, use TABLE STATUS. // 2. If a query is configured but does not contain a WHERE clause and tablePath is // configured , use TABLE STATUS. // 3. If a query is configured with a WHERE clause, or a query statement is configured but // tablePath is TablePath.DEFAULT, use COUNT(*). boolean useTableStats = StringUtils.isBlank(table.getQuery()) || (!table.getQuery().toLowerCase().contains("where") && table.getTablePath() != null && !TablePath.DEFAULT .getFullName() .equals(table.getTablePath().getFullName())); if (useTableStats) { // The statement used to get approximate row count which is less // accurate than COUNT(*), but is more efficient for large table. TablePath tablePath = table.getTablePath(); String useDatabaseStatement = String.format("USE %s;", quoteDatabaseIdentifier(tablePath.getDatabaseName())); String rowCountQuery = String.format("SHOW TABLE STATUS LIKE '%s';", tablePath.getTableName()); try (Statement stmt = connection.createStatement()) { log.info("Split Chunk, approximateRowCntStatement: {}", useDatabaseStatement); stmt.execute(useDatabaseStatement); log.info("Split Chunk, approximateRowCntStatement: {}", rowCountQuery); try (ResultSet rs = stmt.executeQuery(rowCountQuery)) { if (!rs.next() || rs.getMetaData().getColumnCount() < 5) { throw new SQLException( String.format( "No result returned after running query [%s]", rowCountQuery)); } return rs.getLong(5); } } } return SQLUtils.countForSubquery(connection, table.getQuery()); } @Override public boolean supportDefaultValue(BasicTypeDefine typeBasicTypeDefine) { MysqlType nativeType = (MysqlType) typeBasicTypeDefine.getNativeType(); return !(NOT_SUPPORTED_DEFAULT_VALUES.contains(nativeType)); } @Override public boolean needsQuotesWithDefaultValue(BasicTypeDefine columnDefine) { MysqlType mysqlType = MysqlType.getByName(columnDefine.getColumnType()); switch (mysqlType) { case CHAR: case VARCHAR: case TEXT: case TINYTEXT: case MEDIUMTEXT: case LONGTEXT: case ENUM: case SET: case BLOB: case TINYBLOB: case MEDIUMBLOB: case LONGBLOB: case DATE: case DATETIME: case TIMESTAMP: case TIME: case YEAR: return true; default: return false; } } } 

MySqlTypeConverter类只存在convertreconvert,分别对应JDBC类型转换为SEATUNNEL 类型以及SEATUNNEL类型转换为JDBC类型,同时值得注意的是,在运行SEATUNNEL任务时,若存在该类中没有被定义的类型转换规则,则会抛出运行时异常UNSUPPORTED_DATA_TYPE,所以铁子们要想支持兼容更多的数据类型,应该从此类下手。

public class MySqlTypeConverter implements TypeConverter<BasicTypeDefine<MysqlType>> { // ============================data types===================== static final String MYSQL_NULL = "NULL"; static final String MYSQL_BIT = "BIT"; static final String MYSQL_BIT_UNSIGNED = "BIT UNSIGNED"; // -------------------------number---------------------------- static final String MYSQL_TINYINT = "TINYINT"; static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED"; static final String MYSQL_SMALLINT = "SMALLINT"; static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED"; static final String MYSQL_MEDIUMINT = "MEDIUMINT"; static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED"; static final String MYSQL_INT = "INT"; static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED"; static final String MYSQL_INTEGER = "INTEGER"; static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED"; static final String MYSQL_BIGINT = "BIGINT"; static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED"; static final String MYSQL_DECIMAL = "DECIMAL"; static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED"; static final String MYSQL_FLOAT = "FLOAT"; static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED"; static final String MYSQL_DOUBLE = "DOUBLE"; static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED"; // -------------------------string---------------------------- public static final String MYSQL_CHAR = "CHAR"; public static final String MYSQL_VARCHAR = "VARCHAR"; static final String MYSQL_TINYTEXT = "TINYTEXT"; static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT"; static final String MYSQL_TEXT = "TEXT"; static final String MYSQL_LONGTEXT = "LONGTEXT"; static final String MYSQL_JSON = "JSON"; static final String MYSQL_ENUM = "ENUM"; // ------------------------------time------------------------- static final String MYSQL_DATE = "DATE"; public static final String MYSQL_DATETIME = "DATETIME"; public static final String MYSQL_TIME = "TIME"; public static final String MYSQL_TIMESTAMP = "TIMESTAMP"; static final String MYSQL_YEAR = "YEAR"; static final String MYSQL_YEAR_UNSIGNED = "YEAR UNSIGNED"; // ------------------------------blob------------------------- static final String MYSQL_TINYBLOB = "TINYBLOB"; static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB"; static final String MYSQL_BLOB = "BLOB"; static final String MYSQL_LONGBLOB = "LONGBLOB"; static final String MYSQL_BINARY = "BINARY"; static final String MYSQL_VARBINARY = "VARBINARY"; static final String MYSQL_GEOMETRY = "GEOMETRY"; public static final int DEFAULT_PRECISION = 38; public static final int MAX_PRECISION = 65; public static final int DEFAULT_SCALE = 18; public static final int MAX_SCALE = 30; public static final int MAX_TIME_SCALE = 6; public static final int MAX_TIMESTAMP_SCALE = 6; public static final long POWER_2_8 = (long) Math.pow(2, 8); public static final long POWER_2_16 = (long) Math.pow(2, 16); public static final long POWER_2_24 = (long) Math.pow(2, 24); public static final long POWER_2_32 = (long) Math.pow(2, 32); public static final long MAX_VARBINARY_LENGTH = POWER_2_16 - 4; public static final MySqlTypeConverter DEFAULT_INSTANCE = new MySqlTypeConverter(MySqlVersion.V_5_7); private final MySqlVersion version; public MySqlTypeConverter(MySqlVersion version) { this.version = version; } public MySqlTypeConverter() { this(MySqlVersion.V_5_7); } @Override public String identifier() { return DatabaseIdentifier.MYSQL; } @Override public Column convert(BasicTypeDefine typeDefine) { PhysicalColumn.PhysicalColumnBuilder builder = PhysicalColumn.builder() .name(typeDefine.getName()) .sourceType(typeDefine.getColumnType()) .nullable(typeDefine.isNullable()) .defaultValue(typeDefine.getDefaultValue()) .comment(typeDefine.getComment()); String mysqlDataType = typeDefine.getDataType().toUpperCase(); if (mysqlDataType.endsWith("ZEROFILL")) { mysqlDataType = mysqlDataType.substring(0, mysqlDataType.length() - "ZEROFILL".length()).trim(); } if (typeDefine.isUnsigned() && !(mysqlDataType.endsWith(" UNSIGNED"))) { mysqlDataType = mysqlDataType + " UNSIGNED"; } switch (mysqlDataType) { case MYSQL_NULL: builder.dataType(BasicType.VOID_TYPE); break; case MYSQL_BIT: case MYSQL_BIT_UNSIGNED: if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) { builder.dataType(BasicType.BOOLEAN_TYPE); } else if (typeDefine.getLength() == 1) { builder.dataType(BasicType.BOOLEAN_TYPE); } else { builder.dataType(PrimitiveByteArrayType.INSTANCE); // BIT(M) -> BYTE(M/8) long byteLength = typeDefine.getLength() / 8; byteLength += typeDefine.getLength() % 8 > 0 ? 1 : 0; builder.columnLength(byteLength); } break; case MYSQL_TINYINT: if (typeDefine.getColumnType().equalsIgnoreCase("tinyint(1)")) { builder.dataType(BasicType.BOOLEAN_TYPE); } else { builder.dataType(BasicType.BYTE_TYPE); } break; case MYSQL_TINYINT_UNSIGNED: case MYSQL_SMALLINT: builder.dataType(BasicType.SHORT_TYPE); break; case MYSQL_SMALLINT_UNSIGNED: case MYSQL_MEDIUMINT: case MYSQL_MEDIUMINT_UNSIGNED: case MYSQL_INT: case MYSQL_INTEGER: case MYSQL_YEAR: case MYSQL_YEAR_UNSIGNED: builder.dataType(BasicType.INT_TYPE); break; case MYSQL_INT_UNSIGNED: case MYSQL_INTEGER_UNSIGNED: case MYSQL_BIGINT: builder.dataType(BasicType.LONG_TYPE); break; case MYSQL_BIGINT_UNSIGNED: DecimalType intDecimalType = new DecimalType(20, 0); builder.dataType(intDecimalType); builder.columnLength(Long.valueOf(intDecimalType.getPrecision())); builder.scale(intDecimalType.getScale()); break; case MYSQL_FLOAT: builder.dataType(BasicType.FLOAT_TYPE); break; case MYSQL_FLOAT_UNSIGNED: log.warn("{} will probably cause value overflow.", MYSQL_FLOAT_UNSIGNED); builder.dataType(BasicType.FLOAT_TYPE); break; case MYSQL_DOUBLE: builder.dataType(BasicType.DOUBLE_TYPE); break; case MYSQL_DOUBLE_UNSIGNED: log.warn("{} will probably cause value overflow.", MYSQL_DOUBLE_UNSIGNED); builder.dataType(BasicType.DOUBLE_TYPE); break; case MYSQL_DECIMAL: Preconditions.checkArgument(typeDefine.getPrecision() > 0); DecimalType decimalType; if (typeDefine.getPrecision() > DEFAULT_PRECISION) { log.warn("{} will probably cause value overflow.", MYSQL_DECIMAL); decimalType = new DecimalType(DEFAULT_PRECISION, DEFAULT_SCALE); } else { decimalType = new DecimalType( typeDefine.getPrecision().intValue(), typeDefine.getScale() == null ? 0 : typeDefine.getScale().intValue()); } builder.dataType(decimalType); builder.columnLength(Long.valueOf(decimalType.getPrecision())); builder.scale(decimalType.getScale()); break; case MYSQL_DECIMAL_UNSIGNED: Preconditions.checkArgument(typeDefine.getPrecision() > 0); log.warn("{} will probably cause value overflow.", MYSQL_DECIMAL_UNSIGNED); DecimalType decimalUnsignedType = new DecimalType( typeDefine.getPrecision().intValue() + 1, typeDefine.getScale() == null ? 0 : typeDefine.getScale().intValue()); builder.dataType(decimalUnsignedType); builder.columnLength(Long.valueOf(decimalUnsignedType.getPrecision())); builder.scale(decimalUnsignedType.getScale()); break; case MYSQL_ENUM: builder.dataType(BasicType.STRING_TYPE); if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) { builder.columnLength(100L); } else { builder.columnLength(typeDefine.getLength()); } break; case MYSQL_CHAR: case MYSQL_VARCHAR: if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) { builder.columnLength(TypeDefineUtils.charTo4ByteLength(1L)); } else { builder.columnLength(typeDefine.getLength()); } builder.dataType(BasicType.STRING_TYPE); break; case MYSQL_TINYTEXT: builder.dataType(BasicType.STRING_TYPE); builder.columnLength(POWER_2_8 - 1); break; case MYSQL_TEXT: builder.dataType(BasicType.STRING_TYPE); builder.columnLength(POWER_2_16 - 1); break; case MYSQL_MEDIUMTEXT: builder.dataType(BasicType.STRING_TYPE); builder.columnLength(POWER_2_24 - 1); break; case MYSQL_LONGTEXT: builder.dataType(BasicType.STRING_TYPE); builder.columnLength(POWER_2_32 - 1); break; case MYSQL_JSON: builder.dataType(BasicType.STRING_TYPE); break; case MYSQL_BINARY: case MYSQL_VARBINARY: if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) { builder.columnLength(1L); } else { builder.columnLength(typeDefine.getLength()); } builder.dataType(PrimitiveByteArrayType.INSTANCE); break; case MYSQL_TINYBLOB: builder.dataType(PrimitiveByteArrayType.INSTANCE); builder.columnLength(POWER_2_8 - 1); break; case MYSQL_BLOB: builder.dataType(PrimitiveByteArrayType.INSTANCE); builder.columnLength(POWER_2_16 - 1); break; case MYSQL_MEDIUMBLOB: builder.dataType(PrimitiveByteArrayType.INSTANCE); builder.columnLength(POWER_2_24 - 1); break; case MYSQL_LONGBLOB: builder.dataType(PrimitiveByteArrayType.INSTANCE); builder.columnLength(POWER_2_32 - 1); break; case MYSQL_GEOMETRY: builder.dataType(PrimitiveByteArrayType.INSTANCE); break; case MYSQL_DATE: builder.dataType(LocalTimeType.LOCAL_DATE_TYPE); break; case MYSQL_TIME: builder.dataType(LocalTimeType.LOCAL_TIME_TYPE); builder.scale(typeDefine.getScale()); break; case MYSQL_DATETIME: case MYSQL_TIMESTAMP: builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE); builder.scale(typeDefine.getScale()); break; default: throw CommonError.convertToSeaTunnelTypeError( DatabaseIdentifier.MYSQL, mysqlDataType, typeDefine.getName()); } return builder.build(); } @Override public BasicTypeDefine<MysqlType> reconvert(Column column) { BasicTypeDefine.BasicTypeDefineBuilder builder = BasicTypeDefine.<MysqlType>builder() .name(column.getName()) .nullable(column.isNullable()) .comment(column.getComment()) .defaultValue(column.getDefaultValue()); switch (column.getDataType().getSqlType()) { case NULL: builder.nativeType(MysqlType.NULL); builder.columnType(MYSQL_NULL); builder.dataType(MYSQL_NULL); break; case BOOLEAN: builder.nativeType(MysqlType.BOOLEAN); builder.columnType(String.format("%s(%s)", MYSQL_TINYINT, 1)); builder.dataType(MYSQL_TINYINT); builder.length(1L); break; case TINYINT: builder.nativeType(MysqlType.TINYINT); builder.columnType(MYSQL_TINYINT); builder.dataType(MYSQL_TINYINT); break; case SMALLINT: builder.nativeType(MysqlType.SMALLINT); builder.columnType(MYSQL_SMALLINT); builder.dataType(MYSQL_SMALLINT); break; case INT: builder.nativeType(MysqlType.INT); builder.columnType(MYSQL_INT); builder.dataType(MYSQL_INT); break; case BIGINT: builder.nativeType(MysqlType.BIGINT); builder.columnType(MYSQL_BIGINT); builder.dataType(MYSQL_BIGINT); break; case FLOAT: builder.nativeType(MysqlType.FLOAT); builder.columnType(MYSQL_FLOAT); builder.dataType(MYSQL_FLOAT); break; case DOUBLE: builder.nativeType(MysqlType.DOUBLE); builder.columnType(MYSQL_DOUBLE); builder.dataType(MYSQL_DOUBLE); break; case DECIMAL: DecimalType decimalType = (DecimalType) column.getDataType(); long precision = decimalType.getPrecision(); int scale = decimalType.getScale(); if (precision <= 0) { precision = DEFAULT_PRECISION; scale = DEFAULT_SCALE; log.warn( "The decimal column {} type decimal({},{}) is out of range, " + "which is precision less than 0, " + "it will be converted to decimal({},{})", column.getName(), decimalType.getPrecision(), decimalType.getScale(), precision, scale); } else if (precision > MAX_PRECISION) { scale = (int) Math.max(0, scale - (precision - MAX_PRECISION)); precision = MAX_PRECISION; log.warn( "The decimal column {} type decimal({},{}) is out of range, " + "which exceeds the maximum precision of {}, " + "it will be converted to decimal({},{})", column.getName(), decimalType.getPrecision(), decimalType.getScale(), MAX_PRECISION, precision, scale); } if (scale < 0) { scale = 0; log.warn( "The decimal column {} type decimal({},{}) is out of range, " + "which is scale less than 0, " + "it will be converted to decimal({},{})", column.getName(), decimalType.getPrecision(), decimalType.getScale(), precision, scale); } else if (scale > MAX_SCALE) { scale = MAX_SCALE; log.warn( "The decimal column {} type decimal({},{}) is out of range, " + "which exceeds the maximum scale of {}, " + "it will be converted to decimal({},{})", column.getName(), decimalType.getPrecision(), decimalType.getScale(), MAX_SCALE, precision, scale); } builder.nativeType(MysqlType.DECIMAL); builder.columnType(String.format("%s(%s,%s)", MYSQL_DECIMAL, precision, scale)); builder.dataType(MYSQL_DECIMAL); builder.precision(precision); builder.scale(scale); break; case BYTES: if (column.getColumnLength() == null || column.getColumnLength() <= 0) { builder.nativeType(MysqlType.VARBINARY); builder.columnType( String.format("%s(%s)", MYSQL_VARBINARY, MAX_VARBINARY_LENGTH / 2)); builder.dataType(MYSQL_VARBINARY); } else if (column.getColumnLength() < MAX_VARBINARY_LENGTH) { builder.nativeType(MysqlType.VARBINARY); builder.columnType( String.format("%s(%s)", MYSQL_VARBINARY, column.getColumnLength())); builder.dataType(MYSQL_VARBINARY); } else if (column.getColumnLength() < POWER_2_24) { builder.nativeType(MysqlType.MEDIUMBLOB); builder.columnType(MYSQL_MEDIUMBLOB); builder.dataType(MYSQL_MEDIUMBLOB); } else { builder.nativeType(MysqlType.LONGBLOB); builder.columnType(MYSQL_LONGBLOB); builder.dataType(MYSQL_LONGBLOB); } break; case STRING: if (column.getColumnLength() == null || column.getColumnLength() <= 0) { builder.nativeType(MysqlType.LONGTEXT); builder.columnType(MYSQL_LONGTEXT); builder.dataType(MYSQL_LONGTEXT); } else if (column.getColumnLength() < POWER_2_8) { builder.nativeType(MysqlType.VARCHAR); builder.columnType( String.format("%s(%s)", MYSQL_VARCHAR, column.getColumnLength())); builder.dataType(MYSQL_VARCHAR); } else if (column.getColumnLength() < POWER_2_16) { builder.nativeType(MysqlType.TEXT); builder.columnType(MYSQL_TEXT); builder.dataType(MYSQL_TEXT); } else if (column.getColumnLength() < POWER_2_24) { builder.nativeType(MysqlType.MEDIUMTEXT); builder.columnType(MYSQL_MEDIUMTEXT); builder.dataType(MYSQL_MEDIUMTEXT); } else { builder.nativeType(MysqlType.LONGTEXT); builder.columnType(MYSQL_LONGTEXT); builder.dataType(MYSQL_LONGTEXT); } break; case DATE: builder.nativeType(MysqlType.DATE); builder.columnType(MYSQL_DATE); builder.dataType(MYSQL_DATE); break; case TIME: builder.nativeType(MysqlType.TIME); builder.dataType(MYSQL_TIME); if (version.isAtOrBefore(MySqlVersion.V_5_5)) { builder.columnType(MYSQL_TIME); } else if (column.getScale() != null && column.getScale() > 0) { int timeScale = column.getScale(); if (timeScale > MAX_TIME_SCALE) { timeScale = MAX_TIME_SCALE; log.warn( "The time column {} type time({}) is out of range, " + "which exceeds the maximum scale of {}, " + "it will be converted to time({})", column.getName(), column.getScale(), MAX_SCALE, timeScale); } builder.columnType(String.format("%s(%s)", MYSQL_TIME, timeScale)); builder.scale(timeScale); } else { builder.columnType(MYSQL_TIME); } break; case TIMESTAMP: builder.nativeType(MysqlType.DATETIME); builder.dataType(MYSQL_DATETIME); if (version.isAtOrBefore(MySqlVersion.V_5_5)) { builder.columnType(MYSQL_DATETIME); } else if (column.getScale() != null && column.getScale() > 0) { int timestampScale = column.getScale(); if (timestampScale > MAX_TIMESTAMP_SCALE) { timestampScale = MAX_TIMESTAMP_SCALE; log.warn( "The timestamp column {} type timestamp({}) is out of range, " + "which exceeds the maximum scale of {}, " + "it will be converted to timestamp({})", column.getName(), column.getScale(), MAX_TIMESTAMP_SCALE, timestampScale); } builder.columnType(String.format("%s(%s)", MYSQL_DATETIME, timestampScale)); builder.scale(timestampScale); } else { builder.columnType(MYSQL_DATETIME); } break; default: throw CommonError.convertToConnectorTypeError( DatabaseIdentifier.MYSQL, column.getDataType().getSqlType().name(), column.getName()); } return builder.build(); } } 

MySqlTypeMapper 是老版本的TypeConverter,该类也是引用的MySqlTypeConverter

MySqlVersion 是版本控制。

总结

通过上述内容,感兴趣的伙伴可以快速上手SeaTunnel 2.3.8的JDBC新连接器开发,实现高效的数据处理和集成。

SeaTunnel 社区致力于打造一个世界尖端的开源数据集成工具,如果对 SeaTunnel 源码感兴趣的伙伴也欢迎加入我们的贡献者种子群(欢迎添加我们的微信:davidzollo),专为社区新贡献者而建立,欢迎大家一起交流,一起进步,一起为社区壮大贡献自己的力量! 本文完!

本文由 白鲸开源科技 提供发布支持!

原文链接:https://my.oschina.net/SeaTunnel/blog/17658141
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章