Skip to content

Commit

Permalink
[Feature][Connector-V2] JDBC source support string type as partition …
Browse files Browse the repository at this point in the history
…key (#4947)
  • Loading branch information
Hisoka-X authored Jul 16, 2023
1 parent 7279267 commit d1d2677
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public interface JdbcDialect extends Serializable {
*/
JdbcDialectTypeMapper getJdbcDialectTypeMapper();

default String hashModForField(String fieldName, int mod) {
return "ABS(MD5(" + quoteIdentifier(fieldName) + ") % " + mod + ")";
}

/** Quotes the identifier for table name or field name */
default String quoteIdentifier(String identifier) {
return identifier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public JdbcRowConverter getRowConverter() {
return new OracleJdbcRowConverter();
}

@Override
public String hashModForField(String fieldName, int mod) {
return "MOD(ORA_HASH(" + quoteIdentifier(fieldName) + ")," + mod + ")";
}

@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new OracleTypeMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new SqlserverTypeMapper();
}

@Override
public String hashModForField(String fieldName, int mod) {
return "ABS(HASHBYTES('MD5', " + quoteIdentifier(fieldName) + ") % " + mod + ")";
}

@Override
public Optional<String> getUpsertStatement(
String database, String tableName, String[] fieldNames, String[] uniqueKeyFields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
if (partitionParameter != null) {
this.query =
JdbcSourceFactory.obtainPartitionSql(
partitionParameter.getPartitionColumnName(),
jdbcSourceConfig.getQuery());
jdbcDialect, partitionParameter, jdbcSourceConfig.getQuery());
}

this.inputFormat =
Expand Down Expand Up @@ -187,9 +186,10 @@ private SeaTunnelRowType initTableField(Connection conn) {
private PartitionParameter createPartitionParameter(Connection connection) {
if (jdbcSourceConfig.getPartitionColumn().isPresent()) {
String partitionColumn = jdbcSourceConfig.getPartitionColumn().get();
JdbcSourceFactory.validationPartitionColumn(partitionColumn, typeInfo);
SeaTunnelDataType<?> dataType =
JdbcSourceFactory.validationPartitionColumn(partitionColumn, typeInfo);
return JdbcSourceFactory.createPartitionParameter(
jdbcSourceConfig, partitionColumn, connection);
jdbcSourceConfig, partitionColumn, dataType, connection);
} else {
LOG.info(
"The partition_column parameter is not configured, and the source parallelism is set to 1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,25 @@ TableSource<T, SplitT, StateT> createSource(TableFactoryContext context) {
connectionProvider,
partitionParameter.isPresent()
? obtainPartitionSql(
partitionParameter.get().getPartitionColumnName(),
querySql)
dialect, partitionParameter.get(), querySql)
: querySql);
}

static String obtainPartitionSql(String partitionColumn, String nativeSql) {
static String obtainPartitionSql(
JdbcDialect dialect, PartitionParameter partitionParameter, String nativeSql) {
if (isStringType(partitionParameter.getDataType())) {
return String.format(
"SELECT * FROM (%s) tt where %s = ?",
nativeSql,
dialect.hashModForField(
partitionParameter.getPartitionColumnName(),
partitionParameter.getPartitionNumber()));
}
return String.format(
"SELECT * FROM (%s) tt where %s >= ? AND %s <= ?",
nativeSql, partitionColumn, partitionColumn);
nativeSql,
partitionParameter.getPartitionColumnName(),
partitionParameter.getPartitionColumnName());
}

public static Optional<PartitionParameter> createPartitionParameter(
Expand All @@ -129,26 +139,36 @@ public static Optional<PartitionParameter> createPartitionParameter(
Optional<String> partitionColumnOptional = getPartitionColumn(config, tableSchema);
if (partitionColumnOptional.isPresent()) {
String partitionColumn = partitionColumnOptional.get();
validationPartitionColumn(partitionColumn, tableSchema.toPhysicalRowDataType());
SeaTunnelDataType<?> dataType =
validationPartitionColumn(partitionColumn, tableSchema.toPhysicalRowDataType());
return Optional.of(
createPartitionParameter(
config, partitionColumn, connectionProvider.getConnection()));
config, partitionColumn, dataType, connectionProvider.getConnection()));
}
log.info(
"The partition_column parameter is not configured, and the source parallelism is set to 1");
return Optional.empty();
}

static PartitionParameter createPartitionParameter(
JdbcSourceConfig config, String columnName, Connection connection) {
JdbcSourceConfig config,
String columnName,
SeaTunnelDataType<?> dataType,
Connection connection) {
BigDecimal max = null;
BigDecimal min = null;

if (dataType.equals(BasicType.STRING_TYPE)) {
return new PartitionParameter(
columnName, dataType, null, null, config.getPartitionNumber().orElse(null));
}

if (config.getPartitionLowerBound().isPresent()
&& config.getPartitionUpperBound().isPresent()) {
max = config.getPartitionUpperBound().get();
min = config.getPartitionLowerBound().get();
return new PartitionParameter(
columnName, min, max, config.getPartitionNumber().orElse(null));
columnName, dataType, min, max, config.getPartitionNumber().orElse(null));
}
try (ResultSet rs =
connection
Expand All @@ -171,7 +191,7 @@ static PartitionParameter createPartitionParameter(
throw new PrepareFailException("jdbc", PluginType.SOURCE, e.toString());
}
return new PartitionParameter(
columnName, min, max, config.getPartitionNumber().orElse(null));
columnName, dataType, min, max, config.getPartitionNumber().orElse(null));
}

private static Optional<String> getPartitionColumn(
Expand All @@ -185,7 +205,8 @@ private static Optional<String> getPartitionColumn(
return Optional.empty();
}

static void validationPartitionColumn(String partitionColumn, SeaTunnelRowType rowType) {
static SeaTunnelDataType<?> validationPartitionColumn(
String partitionColumn, SeaTunnelRowType rowType) {
Map<String, SeaTunnelDataType<?>> fieldTypes = new HashMap<>();
for (int i = 0; i < rowType.getFieldNames().length; i++) {
fieldTypes.put(rowType.getFieldName(i), rowType.getFieldType(i));
Expand All @@ -198,10 +219,12 @@ static void validationPartitionColumn(String partitionColumn, SeaTunnelRowType r
partitionColumn));
}
SeaTunnelDataType<?> partitionColumnType = fieldTypes.get(partitionColumn);
if (!isNumericType(partitionColumnType)) {
if (!isNumericType(partitionColumnType) && !isStringType(partitionColumnType)) {
throw new JdbcConnectorException(
CommonErrorCode.ILLEGAL_ARGUMENT,
String.format("%s is not numeric type", partitionColumn));
String.format("%s is not numeric/string type", partitionColumn));
} else {
return partitionColumnType;
}
}

Expand All @@ -220,6 +243,10 @@ private static boolean isNumericType(SeaTunnelDataType<?> type) {
return type.equals(BasicType.INT_TYPE) || type.equals(BasicType.LONG_TYPE) || scale == 0;
}

private static boolean isStringType(SeaTunnelDataType<?> type) {
return type.equals(BasicType.STRING_TYPE);
}

@Override
public OptionRule optionRule() {
return OptionRule.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.source;

import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
Expand Down Expand Up @@ -106,15 +107,21 @@ private Set<JdbcSourceSplit> discoverySplits() {
partitionParameter.getPartitionNumber() != null
? partitionParameter.getPartitionNumber()
: enumeratorContext.currentParallelism();
JdbcNumericBetweenParametersProvider jdbcNumericBetweenParametersProvider =
new JdbcNumericBetweenParametersProvider(
partitionParameter.getMinValue(),
partitionParameter.getMaxValue())
.ofBatchNum(partitionNumber);
Serializable[][] parameterValues =
jdbcNumericBetweenParametersProvider.getParameterValues();
for (int i = 0; i < parameterValues.length; i++) {
allSplit.add(new JdbcSourceSplit(parameterValues[i], i));
if (partitionParameter.getDataType().equals(BasicType.STRING_TYPE)) {
for (int i = 0; i < partitionNumber; i++) {
allSplit.add(new JdbcSourceSplit(new Object[] {i}, i));
}
} else {
JdbcNumericBetweenParametersProvider jdbcNumericBetweenParametersProvider =
new JdbcNumericBetweenParametersProvider(
partitionParameter.getMinValue(),
partitionParameter.getMaxValue())
.ofBatchNum(partitionNumber);
Serializable[][] parameterValues =
jdbcNumericBetweenParametersProvider.getParameterValues();
for (int i = 0; i < parameterValues.length; i++) {
allSplit.add(new JdbcSourceSplit(parameterValues[i], i));
}
}
} else {
allSplit.add(new JdbcSourceSplit(null, 0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.connectors.seatunnel.jdbc.source;

import org.apache.seatunnel.api.table.type.SeaTunnelDataType;

import lombok.AllArgsConstructor;
import lombok.Data;

Expand All @@ -28,6 +30,7 @@
public class PartitionParameter implements Serializable {

String partitionColumnName;
SeaTunnelDataType<?> dataType;
BigDecimal minValue;
BigDecimal maxValue;
Integer partitionNumber;
Expand Down

0 comments on commit d1d2677

Please sign in to comment.