From d1d2677658469664842c4767da94db110771aa25 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Sun, 16 Jul 2023 15:09:35 +0800 Subject: [PATCH] [Feature][Connector-V2] JDBC source support string type as partition key (#4947) --- .../jdbc/internal/dialect/JdbcDialect.java | 4 ++ .../dialect/oracle/OracleDialect.java | 5 ++ .../dialect/sqlserver/SqlServerDialect.java | 5 ++ .../seatunnel/jdbc/source/JdbcSource.java | 8 +-- .../jdbc/source/JdbcSourceFactory.java | 51 ++++++++++++++----- .../source/JdbcSourceSplitEnumerator.java | 25 +++++---- .../jdbc/source/PartitionParameter.java | 3 ++ 7 files changed, 76 insertions(+), 25 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java index f36067b3c2b..e8967fce08f 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java @@ -59,6 +59,10 @@ public interface JdbcDialect extends Serializable { */ JdbcDialectTypeMapper getJdbcDialectTypeMapper(); + default String hashModForField(String fieldName, int mod) { + return "ABS(MD5(" + quoteIdentifier(fieldName) + ") % " + mod + ")"; + } + /** Quotes the identifier for table name or field name */ default String quoteIdentifier(String identifier) { return identifier; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java index 1ca10739e1d..7edd935e780 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java @@ -44,6 +44,11 @@ public JdbcRowConverter getRowConverter() { return new OracleJdbcRowConverter(); } + @Override + public String hashModForField(String fieldName, int mod) { + return "MOD(ORA_HASH(" + quoteIdentifier(fieldName) + ")," + mod + ")"; + } + @Override public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { return new OracleTypeMapper(); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java index 697d2d2dc17..2121369e22a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java @@ -42,6 +42,11 @@ public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { return new SqlserverTypeMapper(); } + @Override + public String hashModForField(String fieldName, int mod) { + return "ABS(HASHBYTES('MD5', " + quoteIdentifier(fieldName) + ") % " + mod + ")"; + } + @Override public Optional getUpsertStatement( String database, String tableName, String[] fieldNames, String[] uniqueKeyFields) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java index 732892b21d6..aa001f78e2a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java @@ -113,8 +113,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException { if (partitionParameter != null) { this.query = JdbcSourceFactory.obtainPartitionSql( - partitionParameter.getPartitionColumnName(), - jdbcSourceConfig.getQuery()); + jdbcDialect, partitionParameter, jdbcSourceConfig.getQuery()); } this.inputFormat = @@ -187,9 +186,10 @@ private SeaTunnelRowType initTableField(Connection conn) { private PartitionParameter createPartitionParameter(Connection connection) { if (jdbcSourceConfig.getPartitionColumn().isPresent()) { String partitionColumn = jdbcSourceConfig.getPartitionColumn().get(); - JdbcSourceFactory.validationPartitionColumn(partitionColumn, typeInfo); + SeaTunnelDataType dataType = + JdbcSourceFactory.validationPartitionColumn(partitionColumn, typeInfo); return JdbcSourceFactory.createPartitionParameter( - jdbcSourceConfig, partitionColumn, connection); + jdbcSourceConfig, partitionColumn, dataType, connection); } else { LOG.info( "The partition_column parameter is not configured, and the source parallelism is set to 1"); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java index 43aa1c03d63..8c21a842339 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java @@ -111,15 +111,25 @@ TableSource createSource(TableFactoryContext context) { connectionProvider, partitionParameter.isPresent() ? obtainPartitionSql( - partitionParameter.get().getPartitionColumnName(), - querySql) + dialect, partitionParameter.get(), querySql) : querySql); } - static String obtainPartitionSql(String partitionColumn, String nativeSql) { + static String obtainPartitionSql( + JdbcDialect dialect, PartitionParameter partitionParameter, String nativeSql) { + if (isStringType(partitionParameter.getDataType())) { + return String.format( + "SELECT * FROM (%s) tt where %s = ?", + nativeSql, + dialect.hashModForField( + partitionParameter.getPartitionColumnName(), + partitionParameter.getPartitionNumber())); + } return String.format( "SELECT * FROM (%s) tt where %s >= ? AND %s <= ?", - nativeSql, partitionColumn, partitionColumn); + nativeSql, + partitionParameter.getPartitionColumnName(), + partitionParameter.getPartitionColumnName()); } public static Optional createPartitionParameter( @@ -129,10 +139,11 @@ public static Optional createPartitionParameter( Optional partitionColumnOptional = getPartitionColumn(config, tableSchema); if (partitionColumnOptional.isPresent()) { String partitionColumn = partitionColumnOptional.get(); - validationPartitionColumn(partitionColumn, tableSchema.toPhysicalRowDataType()); + SeaTunnelDataType dataType = + validationPartitionColumn(partitionColumn, tableSchema.toPhysicalRowDataType()); return Optional.of( createPartitionParameter( - config, partitionColumn, connectionProvider.getConnection())); + config, partitionColumn, dataType, connectionProvider.getConnection())); } log.info( "The partition_column parameter is not configured, and the source parallelism is set to 1"); @@ -140,15 +151,24 @@ public static Optional createPartitionParameter( } static PartitionParameter createPartitionParameter( - JdbcSourceConfig config, String columnName, Connection connection) { + JdbcSourceConfig config, + String columnName, + SeaTunnelDataType dataType, + Connection connection) { BigDecimal max = null; BigDecimal min = null; + + if (dataType.equals(BasicType.STRING_TYPE)) { + return new PartitionParameter( + columnName, dataType, null, null, config.getPartitionNumber().orElse(null)); + } + if (config.getPartitionLowerBound().isPresent() && config.getPartitionUpperBound().isPresent()) { max = config.getPartitionUpperBound().get(); min = config.getPartitionLowerBound().get(); return new PartitionParameter( - columnName, min, max, config.getPartitionNumber().orElse(null)); + columnName, dataType, min, max, config.getPartitionNumber().orElse(null)); } try (ResultSet rs = connection @@ -171,7 +191,7 @@ static PartitionParameter createPartitionParameter( throw new PrepareFailException("jdbc", PluginType.SOURCE, e.toString()); } return new PartitionParameter( - columnName, min, max, config.getPartitionNumber().orElse(null)); + columnName, dataType, min, max, config.getPartitionNumber().orElse(null)); } private static Optional getPartitionColumn( @@ -185,7 +205,8 @@ private static Optional getPartitionColumn( return Optional.empty(); } - static void validationPartitionColumn(String partitionColumn, SeaTunnelRowType rowType) { + static SeaTunnelDataType validationPartitionColumn( + String partitionColumn, SeaTunnelRowType rowType) { Map> fieldTypes = new HashMap<>(); for (int i = 0; i < rowType.getFieldNames().length; i++) { fieldTypes.put(rowType.getFieldName(i), rowType.getFieldType(i)); @@ -198,10 +219,12 @@ static void validationPartitionColumn(String partitionColumn, SeaTunnelRowType r partitionColumn)); } SeaTunnelDataType partitionColumnType = fieldTypes.get(partitionColumn); - if (!isNumericType(partitionColumnType)) { + if (!isNumericType(partitionColumnType) && !isStringType(partitionColumnType)) { throw new JdbcConnectorException( CommonErrorCode.ILLEGAL_ARGUMENT, - String.format("%s is not numeric type", partitionColumn)); + String.format("%s is not numeric/string type", partitionColumn)); + } else { + return partitionColumnType; } } @@ -220,6 +243,10 @@ private static boolean isNumericType(SeaTunnelDataType type) { return type.equals(BasicType.INT_TYPE) || type.equals(BasicType.LONG_TYPE) || scale == 0; } + private static boolean isStringType(SeaTunnelDataType type) { + return type.equals(BasicType.STRING_TYPE); + } + @Override public OptionRule optionRule() { return OptionRule.builder() diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java index 185f87075b4..bd9abb2d6fb 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.source; import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig; import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; @@ -106,15 +107,21 @@ private Set discoverySplits() { partitionParameter.getPartitionNumber() != null ? partitionParameter.getPartitionNumber() : enumeratorContext.currentParallelism(); - JdbcNumericBetweenParametersProvider jdbcNumericBetweenParametersProvider = - new JdbcNumericBetweenParametersProvider( - partitionParameter.getMinValue(), - partitionParameter.getMaxValue()) - .ofBatchNum(partitionNumber); - Serializable[][] parameterValues = - jdbcNumericBetweenParametersProvider.getParameterValues(); - for (int i = 0; i < parameterValues.length; i++) { - allSplit.add(new JdbcSourceSplit(parameterValues[i], i)); + if (partitionParameter.getDataType().equals(BasicType.STRING_TYPE)) { + for (int i = 0; i < partitionNumber; i++) { + allSplit.add(new JdbcSourceSplit(new Object[] {i}, i)); + } + } else { + JdbcNumericBetweenParametersProvider jdbcNumericBetweenParametersProvider = + new JdbcNumericBetweenParametersProvider( + partitionParameter.getMinValue(), + partitionParameter.getMaxValue()) + .ofBatchNum(partitionNumber); + Serializable[][] parameterValues = + jdbcNumericBetweenParametersProvider.getParameterValues(); + for (int i = 0; i < parameterValues.length; i++) { + allSplit.add(new JdbcSourceSplit(parameterValues[i], i)); + } } } else { allSplit.add(new JdbcSourceSplit(null, 0)); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java index 61677079ec2..88605e87609 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.source; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; + import lombok.AllArgsConstructor; import lombok.Data; @@ -28,6 +30,7 @@ public class PartitionParameter implements Serializable { String partitionColumnName; + SeaTunnelDataType dataType; BigDecimal minValue; BigDecimal maxValue; Integer partitionNumber;