Apache SeaTunnel基于JDBC连接器开发教程
说明
以下内容旨在帮助开发人员,快速了解熟悉SeaTunnel2.3.8程序框架,并能够进行JDBC连接器开发内容。
必要知识
在进行开发前,你一定要仔细阅读如下框架文档,它们存在于SeaTunnel源码中的Docs
目录,这些文档能够帮你快速上手和规范开发,如果你已经阅读过则无需关注。
- docs/zh/concept/config.md
- docs/zh/concept/connector-v2-features.md
- docs/zh/concept/JobEnvConfig.md
- docs/zh/connector-v2/sink/Jdbc.md
- docs/zh/connector-v2/sink-common-options.md
- docs/zh/connector-v2/source-common-options.md
- 连接器开发必看(https://www.modb.pro/db/447922)
SeaTunnel基础开发流程
拉取项目
git clone https://github.com/apache/seatunnel.git
编译构建
-
选择Profiles
-
mvn打包安装本地包
mvn clean install -Dmaven.test.skip=true
运行工程样例
seatunnel-examples
是SeaTunnel本地环境运行模块,运行SeaTunnelEngineLocalExample
的样例配置文件fake_to_console.conf
,可查看SeaTunnel运行环境是否成功。
打包发布
选择Profiles后,通过Dist模块可直接构建发布包。
JDBC连接器开发
包目录介绍
使用技巧
做JDBC连接器开发我们一般仅需要关注两部分,第一个是catalog
包目录,第二个是internal
中的dialect
。
这两部分已经提供了区别不同数据库的差异性描述,其余大部分代码是公共使用,不建议轻易修改,否则可能会影响所有引用类。
catalog中的类介绍
MySqlCatalogFactory
中使用factoryIdentifier()
来标识数据库类型,optionRule()
用于定制连接器参数效验规则,createCatalog()
是工厂类用来创建实例的方法。
@AutoService(Factory.class) public class MySqlCatalogFactory implements CatalogFactory { [@Override](https://my.oschina.net/u/1162528) public String factoryIdentifier() { return DatabaseIdentifier.MYSQL; } [@Override](https://my.oschina.net/u/1162528) public Catalog createCatalog(String catalogName, ReadonlyConfig options) { String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL); Preconditions.checkArgument( StringUtils.isNoneBlank(urlWithDatabase), "Miss config <base-url>! Please check your config."); JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase); return new MySqlCatalog( catalogName, options.get(JdbcCatalogOptions.USERNAME), options.get(JdbcCatalogOptions.PASSWORD), urlInfo); } [@Override](https://my.oschina.net/u/1162528) public OptionRule optionRule() { return JdbcCatalogOptions.BASE_RULE.build(); } }
MySqlCatalog 中包含了对于数据库元数据的查询,例如SELECT\_DATABASE\_EXISTS
(库信息查询)、SELECT\_TABLE\_EXISTS
(表信息查询),还定义了一些库表生成语句,例如获取表DDL语句,并且使用getTable()
方法还能直接获取到外部表。
[@Slf4j](https://my.oschina.net/slf4j) public class MySqlCatalog extends AbstractJdbcCatalog { private static final String SELECT_COLUMNS_SQL_TEMPLATE = "SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME ='%s' ORDER BY ORDINAL_POSITION ASC"; private static final String SELECT_DATABASE_EXISTS = "SELECT SCHEMA_NAME FROM information_schema.schemata WHERE SCHEMA_NAME = '%s'"; private static final String SELECT_TABLE_EXISTS = "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE table_schema = '%s' AND table_name = '%s'"; private MySqlVersion version; private MySqlTypeConverter typeConverter; public MySqlCatalog( String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) { super(catalogName, username, pwd, urlInfo, null); this.version = resolveVersion(); this.typeConverter = new MySqlTypeConverter(version); } @Override protected String getDatabaseWithConditionSql(String databaseName) { return String.format(SELECT_DATABASE_EXISTS, databaseName); } @Override protected String getTableWithConditionSql(TablePath tablePath) { return String.format( SELECT_TABLE_EXISTS, tablePath.getDatabaseName(), tablePath.getTableName()); } @Override protected String getListDatabaseSql() { return "SHOW DATABASES;"; } @Override protected String getListTableSql(String databaseName) { return "SHOW TABLES;"; } @Override protected String getTableName(ResultSet rs) throws SQLException { return rs.getString(1); } @Override protected String getTableName(TablePath tablePath) { return tablePath.getTableName(); } @Override protected String getSelectColumnsSql(TablePath tablePath) { return String.format( SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getDatabaseName(), tablePath.getTableName()); } @Override protected TableIdentifier getTableIdentifier(TablePath tablePath) { return TableIdentifier.of( catalogName, tablePath.getDatabaseName(), tablePath.getTableName()); } @Override protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData, TablePath tablePath) throws SQLException { List<ConstraintKey> indexList = super.getConstraintKeys( metaData, tablePath.getDatabaseName(), tablePath.getSchemaName(), tablePath.getTableName()); for (Iterator<ConstraintKey> it = indexList.iterator(); it.hasNext(); ) { ConstraintKey index = it.next(); if (ConstraintKey.ConstraintType.UNIQUE_KEY.equals(index.getConstraintType()) && "PRIMARY".equals(index.getConstraintName())) { it.remove(); } } return indexList; } @Override protected Column buildColumn(ResultSet resultSet) throws SQLException { String columnName = resultSet.getString("COLUMN_NAME"); // e.g. tinyint(1) unsigned String columnType = resultSet.getString("COLUMN_TYPE"); // e.g. tinyint String dataType = resultSet.getString("DATA_TYPE").toUpperCase(); String comment = resultSet.getString("COLUMN_COMMENT"); Object defaultValue = resultSet.getObject("COLUMN_DEFAULT"); String isNullableStr = resultSet.getString("IS_NULLABLE"); boolean isNullable = isNullableStr.equals("YES"); // e.g. `decimal(10, 2)` is 10 long numberPrecision = resultSet.getInt("NUMERIC_PRECISION"); // e.g. `decimal(10, 2)` is 2 int numberScale = resultSet.getInt("NUMERIC_SCALE"); // e.g. `varchar(10)` is 40 long charOctetLength = resultSet.getLong("CHARACTER_OCTET_LENGTH"); // e.g. `timestamp(3)` is 3 int timePrecision = MySqlVersion.V_5_5.equals(version) ? 0 : resultSet.getInt("DATETIME_PRECISION"); Preconditions.checkArgument(!(numberPrecision > 0 && charOctetLength > 0)); Preconditions.checkArgument(!(numberScale > 0 && timePrecision > 0)); MysqlType mysqlType = MysqlType.getByName(columnType); boolean unsigned = columnType.toLowerCase(Locale.ROOT).contains("unsigned"); BasicTypeDefine<MysqlType> typeDefine = BasicTypeDefine.<MysqlType>builder() .name(columnName) .columnType(columnType) .dataType(dataType) .nativeType(mysqlType) .unsigned(unsigned) .length(Math.max(charOctetLength, numberPrecision)) .precision(numberPrecision) .scale(Math.max(numberScale, timePrecision)) .nullable(isNullable) .defaultValue(defaultValue) .comment(comment) .build(); return typeConverter.convert(typeDefine); } @Override protected String getCreateTableSql( TablePath tablePath, CatalogTable table, boolean createIndex) { return MysqlCreateTableSqlBuilder.builder(tablePath, table, typeConverter, createIndex) .build(table.getCatalogName()); } @Override protected String getDropTableSql(TablePath tablePath) { return String.format( "DROP TABLE `%s`.`%s`;", tablePath.getDatabaseName(), tablePath.getTableName()); } @Override protected String getCreateDatabaseSql(String databaseName) { return String.format("CREATE DATABASE `%s`;", databaseName); } @Override protected String getDropDatabaseSql(String databaseName) { return String.format("DROP DATABASE `%s`;", databaseName); } @Override public CatalogTable getTable(String sqlQuery) throws SQLException { Connection defaultConnection = getConnection(defaultUrl); return CatalogUtils.getCatalogTable( defaultConnection, sqlQuery, new MySqlTypeMapper(typeConverter)); } @Override protected String getTruncateTableSql(TablePath tablePath) throws CatalogException { return String.format( "TRUNCATE TABLE `%s`.`%s`;", tablePath.getDatabaseName(), tablePath.getTableName()); } public String getExistDataSql(TablePath tablePath) { return String.format( "SELECT * FROM `%s`.`%s` LIMIT 1;", tablePath.getDatabaseName(), tablePath.getTableName()); } private MySqlVersion resolveVersion() { try (Statement statement = getConnection(defaultUrl).createStatement(); ResultSet resultSet = statement.executeQuery("SELECT VERSION()")) { resultSet.next(); return MySqlVersion.parse(resultSet.getString(1)); } catch (Exception e) { log.info( "Failed to get mysql version, fallback to default version: {}", MySqlVersion.V_5_7, e); return MySqlVersion.V_5_7; } } }
MysqlCreateTableSqlBuilder
和MysqlDataTypeConvertor
是用于获取表DDL语句和类型转换器方法,较为简单这里就不一一赘述。
dialect中的类介绍
MySqlDialectFactory使用工厂方式创建MySqlDialect
实例,使用acceptsURL()
方法判断不同JDBC URL,从而识别不同数据库类型。
create()
的重载方法,主要是用于数据库端兼容多个数据库方言的场景。
@AutoService(JdbcDialectFactory.class) public class MySqlDialectFactory implements JdbcDialectFactory { @Override public boolean acceptsURL(String url) { return url.startsWith("jdbc:mysql:"); } @Override public JdbcDialect create() { return new MysqlDialect(); } @Override public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) { if (DatabaseIdentifier.STARROCKS.equalsIgnoreCase(compatibleMode)) { return new StarRocksDialect(fieldIde); } return new MysqlDialect(fieldIde); } }
MySqlDialect 的getRowConverter()
和getTypeConverter()
用于获取行对象转换器和类型转换器。
行对象转换器主要提供JDBC数据对象 ⇔ \Leftrightarrow ⇔ SeaTunnelRow对象互转方法。
类型转换器主要提供JDBC数据类型 ⇔ \Leftrightarrow ⇔ SeaTunnel数据类型互转的方法,这两者共同作用于引擎侧内部对象和JDBC数据集的转入转出。
MySqlDialect中还定义了一些不通数据库操作的特性功能,例如Upsert
功能实现和转义符号等。
MySqlDialect还能使用defaultParameter()
方法为JDBC URL串定制一些默认的参数。
public class MysqlDialect implements JdbcDialect { private static final List NOT_SUPPORTED_DEFAULT_VALUES = Arrays.asList(MysqlType.BLOB, MysqlType.TEXT, MysqlType.JSON, MysqlType.GEOMETRY); public String fieldIde = FieldIdeEnum.ORIGINAL.getValue(); public MysqlDialect() { } public MysqlDialect(String fieldIde) { this.fieldIde = fieldIde; } @Override public String dialectName() { return DatabaseIdentifier.MYSQL; } @Override public JdbcRowConverter getRowConverter() { return new MysqlJdbcRowConverter(); } @Override public TypeConverter<BasicTypeDefine> getTypeConverter() { TypeConverter typeConverter = MySqlTypeConverter.DEFAULT_INSTANCE; return typeConverter; } @Override public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { return new MySqlTypeMapper(); } @Override public String quoteIdentifier(String identifier) { return "`" + getFieldIde(identifier, fieldIde) + "`"; } @Override public String quoteDatabaseIdentifier(String identifier) { return "`" + identifier + "`"; } @Override public String tableIdentifier(TablePath tablePath) { return tableIdentifier(tablePath.getDatabaseName(), tablePath.getTableName()); } @Override public Optional<String> getUpsertStatement( String database, String tableName, String[] fieldNames, String[] uniqueKeyFields) { String updateClause = Arrays.stream(fieldNames) .map( fieldName -> quoteIdentifier(fieldName) + "=VALUES(" + quoteIdentifier(fieldName) + ")") .collect(Collectors.joining(", ")); String upsertSQL = getInsertIntoStatement(database, tableName, fieldNames) + " ON DUPLICATE KEY UPDATE " + updateClause; return Optional.of(upsertSQL); } @Override public PreparedStatement creatPreparedStatement( Connection connection, String queryTemplate, int fetchSize) throws SQLException { PreparedStatement statement = connection.prepareStatement( queryTemplate, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); statement.setFetchSize(Integer.MIN_VALUE); return statement; } @Override public String extractTableName(TablePath tablePath) { return tablePath.getTableName(); } @Override public Map<String, String> defaultParameter() { HashMap<String, String> map = new HashMap<>(); map.put("rewriteBatchedStatements", "true"); return map; } @Override public TablePath parse(String tablePath) { return TablePath.of(tablePath, false); } @Override public Object[] sampleDataFromColumn( Connection connection, JdbcSourceTable table, String columnName, int samplingRate, int fetchSize) throws Exception { String sampleQuery; if (StringUtils.isNotBlank(table.getQuery())) { sampleQuery = String.format( "SELECT %s FROM (%s) AS T", quoteIdentifier(columnName), table.getQuery()); } else { sampleQuery = String.format( "SELECT %s FROM %s", quoteIdentifier(columnName), tableIdentifier(table.getTablePath())); } try (Statement stmt = connection.createStatement( ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) { stmt.setFetchSize(Integer.MIN_VALUE); try (ResultSet rs = stmt.executeQuery(sampleQuery)) { int count = 0; List<Object> results = new ArrayList<>(); while (rs.next()) { count++; if (count % samplingRate == 0) { results.add(rs.getObject(1)); } if (Thread.currentThread().isInterrupted()) { throw new InterruptedException("Thread interrupted"); } } Object[] resultsArray = results.toArray(); Arrays.sort(resultsArray); return resultsArray; } } } @Override public Long approximateRowCntStatement(Connection connection, JdbcSourceTable table) throws SQLException { // 1. If no query is configured, use TABLE STATUS. // 2. If a query is configured but does not contain a WHERE clause and tablePath is // configured , use TABLE STATUS. // 3. If a query is configured with a WHERE clause, or a query statement is configured but // tablePath is TablePath.DEFAULT, use COUNT(*). boolean useTableStats = StringUtils.isBlank(table.getQuery()) || (!table.getQuery().toLowerCase().contains("where") && table.getTablePath() != null && !TablePath.DEFAULT .getFullName() .equals(table.getTablePath().getFullName())); if (useTableStats) { // The statement used to get approximate row count which is less // accurate than COUNT(*), but is more efficient for large table. TablePath tablePath = table.getTablePath(); String useDatabaseStatement = String.format("USE %s;", quoteDatabaseIdentifier(tablePath.getDatabaseName())); String rowCountQuery = String.format("SHOW TABLE STATUS LIKE '%s';", tablePath.getTableName()); try (Statement stmt = connection.createStatement()) { log.info("Split Chunk, approximateRowCntStatement: {}", useDatabaseStatement); stmt.execute(useDatabaseStatement); log.info("Split Chunk, approximateRowCntStatement: {}", rowCountQuery); try (ResultSet rs = stmt.executeQuery(rowCountQuery)) { if (!rs.next() || rs.getMetaData().getColumnCount() < 5) { throw new SQLException( String.format( "No result returned after running query [%s]", rowCountQuery)); } return rs.getLong(5); } } } return SQLUtils.countForSubquery(connection, table.getQuery()); } @Override public boolean supportDefaultValue(BasicTypeDefine typeBasicTypeDefine) { MysqlType nativeType = (MysqlType) typeBasicTypeDefine.getNativeType(); return !(NOT_SUPPORTED_DEFAULT_VALUES.contains(nativeType)); } @Override public boolean needsQuotesWithDefaultValue(BasicTypeDefine columnDefine) { MysqlType mysqlType = MysqlType.getByName(columnDefine.getColumnType()); switch (mysqlType) { case CHAR: case VARCHAR: case TEXT: case TINYTEXT: case MEDIUMTEXT: case LONGTEXT: case ENUM: case SET: case BLOB: case TINYBLOB: case MEDIUMBLOB: case LONGBLOB: case DATE: case DATETIME: case TIMESTAMP: case TIME: case YEAR: return true; default: return false; } } }
MySqlTypeConverter类只存在convert
和reconvert
,分别对应JDBC类型转换为SEATUNNEL 类型以及SEATUNNEL类型转换为JDBC类型,同时值得注意的是,在运行SEATUNNEL任务时,若存在该类中没有被定义的类型转换规则,则会抛出运行时异常UNSUPPORTED_DATA_TYPE
,所以铁子们要想支持兼容更多的数据类型,应该从此类下手。
public class MySqlTypeConverter implements TypeConverter<BasicTypeDefine<MysqlType>> { // ============================data types===================== static final String MYSQL_NULL = "NULL"; static final String MYSQL_BIT = "BIT"; static final String MYSQL_BIT_UNSIGNED = "BIT UNSIGNED"; // -------------------------number---------------------------- static final String MYSQL_TINYINT = "TINYINT"; static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED"; static final String MYSQL_SMALLINT = "SMALLINT"; static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED"; static final String MYSQL_MEDIUMINT = "MEDIUMINT"; static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED"; static final String MYSQL_INT = "INT"; static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED"; static final String MYSQL_INTEGER = "INTEGER"; static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED"; static final String MYSQL_BIGINT = "BIGINT"; static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED"; static final String MYSQL_DECIMAL = "DECIMAL"; static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED"; static final String MYSQL_FLOAT = "FLOAT"; static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED"; static final String MYSQL_DOUBLE = "DOUBLE"; static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED"; // -------------------------string---------------------------- public static final String MYSQL_CHAR = "CHAR"; public static final String MYSQL_VARCHAR = "VARCHAR"; static final String MYSQL_TINYTEXT = "TINYTEXT"; static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT"; static final String MYSQL_TEXT = "TEXT"; static final String MYSQL_LONGTEXT = "LONGTEXT"; static final String MYSQL_JSON = "JSON"; static final String MYSQL_ENUM = "ENUM"; // ------------------------------time------------------------- static final String MYSQL_DATE = "DATE"; public static final String MYSQL_DATETIME = "DATETIME"; public static final String MYSQL_TIME = "TIME"; public static final String MYSQL_TIMESTAMP = "TIMESTAMP"; static final String MYSQL_YEAR = "YEAR"; static final String MYSQL_YEAR_UNSIGNED = "YEAR UNSIGNED"; // ------------------------------blob------------------------- static final String MYSQL_TINYBLOB = "TINYBLOB"; static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB"; static final String MYSQL_BLOB = "BLOB"; static final String MYSQL_LONGBLOB = "LONGBLOB"; static final String MYSQL_BINARY = "BINARY"; static final String MYSQL_VARBINARY = "VARBINARY"; static final String MYSQL_GEOMETRY = "GEOMETRY"; public static final int DEFAULT_PRECISION = 38; public static final int MAX_PRECISION = 65; public static final int DEFAULT_SCALE = 18; public static final int MAX_SCALE = 30; public static final int MAX_TIME_SCALE = 6; public static final int MAX_TIMESTAMP_SCALE = 6; public static final long POWER_2_8 = (long) Math.pow(2, 8); public static final long POWER_2_16 = (long) Math.pow(2, 16); public static final long POWER_2_24 = (long) Math.pow(2, 24); public static final long POWER_2_32 = (long) Math.pow(2, 32); public static final long MAX_VARBINARY_LENGTH = POWER_2_16 - 4; public static final MySqlTypeConverter DEFAULT_INSTANCE = new MySqlTypeConverter(MySqlVersion.V_5_7); private final MySqlVersion version; public MySqlTypeConverter(MySqlVersion version) { this.version = version; } public MySqlTypeConverter() { this(MySqlVersion.V_5_7); } @Override public String identifier() { return DatabaseIdentifier.MYSQL; } @Override public Column convert(BasicTypeDefine typeDefine) { PhysicalColumn.PhysicalColumnBuilder builder = PhysicalColumn.builder() .name(typeDefine.getName()) .sourceType(typeDefine.getColumnType()) .nullable(typeDefine.isNullable()) .defaultValue(typeDefine.getDefaultValue()) .comment(typeDefine.getComment()); String mysqlDataType = typeDefine.getDataType().toUpperCase(); if (mysqlDataType.endsWith("ZEROFILL")) { mysqlDataType = mysqlDataType.substring(0, mysqlDataType.length() - "ZEROFILL".length()).trim(); } if (typeDefine.isUnsigned() && !(mysqlDataType.endsWith(" UNSIGNED"))) { mysqlDataType = mysqlDataType + " UNSIGNED"; } switch (mysqlDataType) { case MYSQL_NULL: builder.dataType(BasicType.VOID_TYPE); break; case MYSQL_BIT: case MYSQL_BIT_UNSIGNED: if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) { builder.dataType(BasicType.BOOLEAN_TYPE); } else if (typeDefine.getLength() == 1) { builder.dataType(BasicType.BOOLEAN_TYPE); } else { builder.dataType(PrimitiveByteArrayType.INSTANCE); // BIT(M) -> BYTE(M/8) long byteLength = typeDefine.getLength() / 8; byteLength += typeDefine.getLength() % 8 > 0 ? 1 : 0; builder.columnLength(byteLength); } break; case MYSQL_TINYINT: if (typeDefine.getColumnType().equalsIgnoreCase("tinyint(1)")) { builder.dataType(BasicType.BOOLEAN_TYPE); } else { builder.dataType(BasicType.BYTE_TYPE); } break; case MYSQL_TINYINT_UNSIGNED: case MYSQL_SMALLINT: builder.dataType(BasicType.SHORT_TYPE); break; case MYSQL_SMALLINT_UNSIGNED: case MYSQL_MEDIUMINT: case MYSQL_MEDIUMINT_UNSIGNED: case MYSQL_INT: case MYSQL_INTEGER: case MYSQL_YEAR: case MYSQL_YEAR_UNSIGNED: builder.dataType(BasicType.INT_TYPE); break; case MYSQL_INT_UNSIGNED: case MYSQL_INTEGER_UNSIGNED: case MYSQL_BIGINT: builder.dataType(BasicType.LONG_TYPE); break; case MYSQL_BIGINT_UNSIGNED: DecimalType intDecimalType = new DecimalType(20, 0); builder.dataType(intDecimalType); builder.columnLength(Long.valueOf(intDecimalType.getPrecision())); builder.scale(intDecimalType.getScale()); break; case MYSQL_FLOAT: builder.dataType(BasicType.FLOAT_TYPE); break; case MYSQL_FLOAT_UNSIGNED: log.warn("{} will probably cause value overflow.", MYSQL_FLOAT_UNSIGNED); builder.dataType(BasicType.FLOAT_TYPE); break; case MYSQL_DOUBLE: builder.dataType(BasicType.DOUBLE_TYPE); break; case MYSQL_DOUBLE_UNSIGNED: log.warn("{} will probably cause value overflow.", MYSQL_DOUBLE_UNSIGNED); builder.dataType(BasicType.DOUBLE_TYPE); break; case MYSQL_DECIMAL: Preconditions.checkArgument(typeDefine.getPrecision() > 0); DecimalType decimalType; if (typeDefine.getPrecision() > DEFAULT_PRECISION) { log.warn("{} will probably cause value overflow.", MYSQL_DECIMAL); decimalType = new DecimalType(DEFAULT_PRECISION, DEFAULT_SCALE); } else { decimalType = new DecimalType( typeDefine.getPrecision().intValue(), typeDefine.getScale() == null ? 0 : typeDefine.getScale().intValue()); } builder.dataType(decimalType); builder.columnLength(Long.valueOf(decimalType.getPrecision())); builder.scale(decimalType.getScale()); break; case MYSQL_DECIMAL_UNSIGNED: Preconditions.checkArgument(typeDefine.getPrecision() > 0); log.warn("{} will probably cause value overflow.", MYSQL_DECIMAL_UNSIGNED); DecimalType decimalUnsignedType = new DecimalType( typeDefine.getPrecision().intValue() + 1, typeDefine.getScale() == null ? 0 : typeDefine.getScale().intValue()); builder.dataType(decimalUnsignedType); builder.columnLength(Long.valueOf(decimalUnsignedType.getPrecision())); builder.scale(decimalUnsignedType.getScale()); break; case MYSQL_ENUM: builder.dataType(BasicType.STRING_TYPE); if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) { builder.columnLength(100L); } else { builder.columnLength(typeDefine.getLength()); } break; case MYSQL_CHAR: case MYSQL_VARCHAR: if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) { builder.columnLength(TypeDefineUtils.charTo4ByteLength(1L)); } else { builder.columnLength(typeDefine.getLength()); } builder.dataType(BasicType.STRING_TYPE); break; case MYSQL_TINYTEXT: builder.dataType(BasicType.STRING_TYPE); builder.columnLength(POWER_2_8 - 1); break; case MYSQL_TEXT: builder.dataType(BasicType.STRING_TYPE); builder.columnLength(POWER_2_16 - 1); break; case MYSQL_MEDIUMTEXT: builder.dataType(BasicType.STRING_TYPE); builder.columnLength(POWER_2_24 - 1); break; case MYSQL_LONGTEXT: builder.dataType(BasicType.STRING_TYPE); builder.columnLength(POWER_2_32 - 1); break; case MYSQL_JSON: builder.dataType(BasicType.STRING_TYPE); break; case MYSQL_BINARY: case MYSQL_VARBINARY: if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) { builder.columnLength(1L); } else { builder.columnLength(typeDefine.getLength()); } builder.dataType(PrimitiveByteArrayType.INSTANCE); break; case MYSQL_TINYBLOB: builder.dataType(PrimitiveByteArrayType.INSTANCE); builder.columnLength(POWER_2_8 - 1); break; case MYSQL_BLOB: builder.dataType(PrimitiveByteArrayType.INSTANCE); builder.columnLength(POWER_2_16 - 1); break; case MYSQL_MEDIUMBLOB: builder.dataType(PrimitiveByteArrayType.INSTANCE); builder.columnLength(POWER_2_24 - 1); break; case MYSQL_LONGBLOB: builder.dataType(PrimitiveByteArrayType.INSTANCE); builder.columnLength(POWER_2_32 - 1); break; case MYSQL_GEOMETRY: builder.dataType(PrimitiveByteArrayType.INSTANCE); break; case MYSQL_DATE: builder.dataType(LocalTimeType.LOCAL_DATE_TYPE); break; case MYSQL_TIME: builder.dataType(LocalTimeType.LOCAL_TIME_TYPE); builder.scale(typeDefine.getScale()); break; case MYSQL_DATETIME: case MYSQL_TIMESTAMP: builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE); builder.scale(typeDefine.getScale()); break; default: throw CommonError.convertToSeaTunnelTypeError( DatabaseIdentifier.MYSQL, mysqlDataType, typeDefine.getName()); } return builder.build(); } @Override public BasicTypeDefine<MysqlType> reconvert(Column column) { BasicTypeDefine.BasicTypeDefineBuilder builder = BasicTypeDefine.<MysqlType>builder() .name(column.getName()) .nullable(column.isNullable()) .comment(column.getComment()) .defaultValue(column.getDefaultValue()); switch (column.getDataType().getSqlType()) { case NULL: builder.nativeType(MysqlType.NULL); builder.columnType(MYSQL_NULL); builder.dataType(MYSQL_NULL); break; case BOOLEAN: builder.nativeType(MysqlType.BOOLEAN); builder.columnType(String.format("%s(%s)", MYSQL_TINYINT, 1)); builder.dataType(MYSQL_TINYINT); builder.length(1L); break; case TINYINT: builder.nativeType(MysqlType.TINYINT); builder.columnType(MYSQL_TINYINT); builder.dataType(MYSQL_TINYINT); break; case SMALLINT: builder.nativeType(MysqlType.SMALLINT); builder.columnType(MYSQL_SMALLINT); builder.dataType(MYSQL_SMALLINT); break; case INT: builder.nativeType(MysqlType.INT); builder.columnType(MYSQL_INT); builder.dataType(MYSQL_INT); break; case BIGINT: builder.nativeType(MysqlType.BIGINT); builder.columnType(MYSQL_BIGINT); builder.dataType(MYSQL_BIGINT); break; case FLOAT: builder.nativeType(MysqlType.FLOAT); builder.columnType(MYSQL_FLOAT); builder.dataType(MYSQL_FLOAT); break; case DOUBLE: builder.nativeType(MysqlType.DOUBLE); builder.columnType(MYSQL_DOUBLE); builder.dataType(MYSQL_DOUBLE); break; case DECIMAL: DecimalType decimalType = (DecimalType) column.getDataType(); long precision = decimalType.getPrecision(); int scale = decimalType.getScale(); if (precision <= 0) { precision = DEFAULT_PRECISION; scale = DEFAULT_SCALE; log.warn( "The decimal column {} type decimal({},{}) is out of range, " + "which is precision less than 0, " + "it will be converted to decimal({},{})", column.getName(), decimalType.getPrecision(), decimalType.getScale(), precision, scale); } else if (precision > MAX_PRECISION) { scale = (int) Math.max(0, scale - (precision - MAX_PRECISION)); precision = MAX_PRECISION; log.warn( "The decimal column {} type decimal({},{}) is out of range, " + "which exceeds the maximum precision of {}, " + "it will be converted to decimal({},{})", column.getName(), decimalType.getPrecision(), decimalType.getScale(), MAX_PRECISION, precision, scale); } if (scale < 0) { scale = 0; log.warn( "The decimal column {} type decimal({},{}) is out of range, " + "which is scale less than 0, " + "it will be converted to decimal({},{})", column.getName(), decimalType.getPrecision(), decimalType.getScale(), precision, scale); } else if (scale > MAX_SCALE) { scale = MAX_SCALE; log.warn( "The decimal column {} type decimal({},{}) is out of range, " + "which exceeds the maximum scale of {}, " + "it will be converted to decimal({},{})", column.getName(), decimalType.getPrecision(), decimalType.getScale(), MAX_SCALE, precision, scale); } builder.nativeType(MysqlType.DECIMAL); builder.columnType(String.format("%s(%s,%s)", MYSQL_DECIMAL, precision, scale)); builder.dataType(MYSQL_DECIMAL); builder.precision(precision); builder.scale(scale); break; case BYTES: if (column.getColumnLength() == null || column.getColumnLength() <= 0) { builder.nativeType(MysqlType.VARBINARY); builder.columnType( String.format("%s(%s)", MYSQL_VARBINARY, MAX_VARBINARY_LENGTH / 2)); builder.dataType(MYSQL_VARBINARY); } else if (column.getColumnLength() < MAX_VARBINARY_LENGTH) { builder.nativeType(MysqlType.VARBINARY); builder.columnType( String.format("%s(%s)", MYSQL_VARBINARY, column.getColumnLength())); builder.dataType(MYSQL_VARBINARY); } else if (column.getColumnLength() < POWER_2_24) { builder.nativeType(MysqlType.MEDIUMBLOB); builder.columnType(MYSQL_MEDIUMBLOB); builder.dataType(MYSQL_MEDIUMBLOB); } else { builder.nativeType(MysqlType.LONGBLOB); builder.columnType(MYSQL_LONGBLOB); builder.dataType(MYSQL_LONGBLOB); } break; case STRING: if (column.getColumnLength() == null || column.getColumnLength() <= 0) { builder.nativeType(MysqlType.LONGTEXT); builder.columnType(MYSQL_LONGTEXT); builder.dataType(MYSQL_LONGTEXT); } else if (column.getColumnLength() < POWER_2_8) { builder.nativeType(MysqlType.VARCHAR); builder.columnType( String.format("%s(%s)", MYSQL_VARCHAR, column.getColumnLength())); builder.dataType(MYSQL_VARCHAR); } else if (column.getColumnLength() < POWER_2_16) { builder.nativeType(MysqlType.TEXT); builder.columnType(MYSQL_TEXT); builder.dataType(MYSQL_TEXT); } else if (column.getColumnLength() < POWER_2_24) { builder.nativeType(MysqlType.MEDIUMTEXT); builder.columnType(MYSQL_MEDIUMTEXT); builder.dataType(MYSQL_MEDIUMTEXT); } else { builder.nativeType(MysqlType.LONGTEXT); builder.columnType(MYSQL_LONGTEXT); builder.dataType(MYSQL_LONGTEXT); } break; case DATE: builder.nativeType(MysqlType.DATE); builder.columnType(MYSQL_DATE); builder.dataType(MYSQL_DATE); break; case TIME: builder.nativeType(MysqlType.TIME); builder.dataType(MYSQL_TIME); if (version.isAtOrBefore(MySqlVersion.V_5_5)) { builder.columnType(MYSQL_TIME); } else if (column.getScale() != null && column.getScale() > 0) { int timeScale = column.getScale(); if (timeScale > MAX_TIME_SCALE) { timeScale = MAX_TIME_SCALE; log.warn( "The time column {} type time({}) is out of range, " + "which exceeds the maximum scale of {}, " + "it will be converted to time({})", column.getName(), column.getScale(), MAX_SCALE, timeScale); } builder.columnType(String.format("%s(%s)", MYSQL_TIME, timeScale)); builder.scale(timeScale); } else { builder.columnType(MYSQL_TIME); } break; case TIMESTAMP: builder.nativeType(MysqlType.DATETIME); builder.dataType(MYSQL_DATETIME); if (version.isAtOrBefore(MySqlVersion.V_5_5)) { builder.columnType(MYSQL_DATETIME); } else if (column.getScale() != null && column.getScale() > 0) { int timestampScale = column.getScale(); if (timestampScale > MAX_TIMESTAMP_SCALE) { timestampScale = MAX_TIMESTAMP_SCALE; log.warn( "The timestamp column {} type timestamp({}) is out of range, " + "which exceeds the maximum scale of {}, " + "it will be converted to timestamp({})", column.getName(), column.getScale(), MAX_TIMESTAMP_SCALE, timestampScale); } builder.columnType(String.format("%s(%s)", MYSQL_DATETIME, timestampScale)); builder.scale(timestampScale); } else { builder.columnType(MYSQL_DATETIME); } break; default: throw CommonError.convertToConnectorTypeError( DatabaseIdentifier.MYSQL, column.getDataType().getSqlType().name(), column.getName()); } return builder.build(); } }
MySqlTypeMapper
是老版本的TypeConverter
,该类也是引用的MySqlTypeConverter
。
MySqlVersion
是版本控制。
总结
通过上述内容,感兴趣的伙伴可以快速上手SeaTunnel 2.3.8的JDBC新连接器开发,实现高效的数据处理和集成。
SeaTunnel 社区致力于打造一个世界尖端的开源数据集成工具,如果对 SeaTunnel 源码感兴趣的伙伴也欢迎加入我们的贡献者种子群(欢迎添加我们的微信:davidzollo),专为社区新贡献者而建立,欢迎大家一起交流,一起进步,一起为社区壮大贡献自己的力量! 本文完!
本文由 白鲸开源科技 提供发布支持!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
解读TaurusDB二级分区,如何提高查询性能和管理效率
摘要:TaurusDB全面兼容MySQL分区表的语法和功能,支持更加丰富的分区方式和组合策略。 本文分享自华为云社区 《【华为云MySQL技术专栏】TaurusDB二级分区,提升企业数据库管理效能》 ,作者:GaussDB 数据库。 1.背景介绍 随着业务的扩展,表的数据量也会相应增加。当表数据量达到一定规模时,数据库查询性能会成为系统瓶颈。 为了解决这一问题,社区MySQL提出了分区表的概念。分区表通过一个或者多个分区键,按照分区规则,将一个逻辑上的表分割成多个小的物理表。在查询时,MySQL能够根据查询条件,选择对应的一个或者几个分区进行扫描,从而提高查询性能和管理效率。 当前,社区MySQL支持一级和二级分区,其中一级分区类型包括RANGE(基于范围)、LIST(基于枚举)、HASH(基于用户定义的散列函数)、KEY(基于MySQL提供的散列函数);二级分区为组合分区,包括RANGE-HASH、RANGE-KEY、LIST-HASH、LIST-KEY。 但是,不容忽视的是,当一级分区基于RANGE或LIST类型时,二级分区仅能选择HASH或KEY类型,这限制了其在复杂场...
- 下一篇
修复 Sequence 主键 Bug | zorm 1.7.8 发布
Go 轻量 ORM, 零依赖,零侵入分布式事务,支持达梦 (dm), 金仓 (kingbase), 神通 (shentong), 南通 (gbase),TDengine,mysql,postgresql,oracle,mssql,sqlite,db2,clickhouse... 源码:https://gitee.com/chunanyong/zorm 官网:https://zorm.cn 基于原生 sql 语句,学习成本更低,性能更优 代码生成器 代码精简,主体 3000 行,零依赖 5000 行,注释详细,方便定制修改 支持事务传播,这是 zorm 诞生的主要原因 支持 mysql,postgresql,oracle,mssql,sqlite,db2,dm (达梦),kingbase (金仓),shentong (神通),gbase (南通),TDengine,clickhouse 支持多库和读写分离 不支持联合主键,变通认为无主键,业务控制实现 (艰难取舍) 集成 seata-go,hptx,dbpack 支持全局托管,不修改业务代码,零侵入分布式事务 支持 clickhouse,...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- 设置Eclipse缩进为4个空格,增强代码规范
- CentOS关闭SELinux安全模块
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Hadoop3单机部署,实现最简伪集群
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- Docker安装Oracle12C,快速搭建Oracle学习环境