Skip to content

Commit

Permalink
Fix Jdbc sink target table name error (apache#6269)
Browse files Browse the repository at this point in the history
  • Loading branch information
EricJoy2048 authored Jan 24, 2024
1 parent b83c40a commit 2f62235
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ public Optional<SaveModeHandler> getSaveModeHandler() {
if (StringUtils.isBlank(jdbcSinkConfig.getTable())) {
return Optional.empty();
}
// use query to write data can not support savemode
if (StringUtils.isNotBlank(jdbcSinkConfig.getSimpleSql())) {
return Optional.empty();
}
Optional<Catalog> catalogOptional =
JdbcCatalogUtils.findCatalog(jdbcSinkConfig.getJdbcConnectionConfig(), dialect);
if (catalogOptional.isPresent()) {
Expand All @@ -185,10 +189,10 @@ public Optional<SaveModeHandler> getSaveModeHandler() {
: fieldIdeEnumEnum.getValue();
TablePath tablePath =
TablePath.of(
jdbcSinkConfig.getDatabase()
+ "."
+ CatalogUtils.quoteTableIdentifier(
jdbcSinkConfig.getTable(), fieldIde));
catalogTable.getTableId().getDatabaseName(),
catalogTable.getTableId().getSchemaName(),
CatalogUtils.quoteTableIdentifier(
catalogTable.getTableId().getTableName(), fieldIde));
catalogTable.getOptions().put("fieldIde", fieldIde);
return Optional.of(
new DefaultSaveModeHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,91 +99,89 @@ public TableSink createSink(TableSinkFactoryContext context) {
ReadonlyConfig catalogOptions = getCatalogOptions(context);
Optional<String> optionalTable = config.getOptional(TABLE);
Optional<String> optionalDatabase = config.getOptional(DATABASE);
Optional<String> queryOptional = config.getOptional(QUERY);
if (!optionalTable.isPresent() && !queryOptional.isPresent()) {
if (!optionalTable.isPresent()) {
optionalTable = Optional.of(REPLACE_TABLE_NAME_KEY);
// get source table relevant information
TableIdentifier tableId = catalogTable.getTableId();
String sourceDatabaseName = tableId.getDatabaseName();
String sourceSchemaName = tableId.getSchemaName();
String sourceTableName = tableId.getTableName();
// get sink table relevant information
String sinkDatabaseName = optionalDatabase.orElse(REPLACE_DATABASE_NAME_KEY);
String sinkTableNameBefore = optionalTable.get();
String[] sinkTableSplitArray = sinkTableNameBefore.split("\\.");
String sinkTableName = sinkTableSplitArray[sinkTableSplitArray.length - 1];
String sinkSchemaName;
if (sinkTableSplitArray.length > 1) {
sinkSchemaName = sinkTableSplitArray[sinkTableSplitArray.length - 2];
} else {
sinkSchemaName = null;
}
if (StringUtils.isNotBlank(catalogOptions.get(JdbcCatalogOptions.SCHEMA))) {
sinkSchemaName = catalogOptions.get(JdbcCatalogOptions.SCHEMA);
}
// to add tablePrefix and tableSuffix
String tempTableName;
String prefix = catalogOptions.get(JdbcCatalogOptions.TABLE_PREFIX);
String suffix = catalogOptions.get(JdbcCatalogOptions.TABLE_SUFFIX);
if (StringUtils.isNotEmpty(prefix) || StringUtils.isNotEmpty(suffix)) {
tempTableName =
StringUtils.isNotEmpty(prefix) ? prefix + sinkTableName : sinkTableName;
tempTableName =
StringUtils.isNotEmpty(suffix) ? tempTableName + suffix : tempTableName;
}
// get source table relevant information
TableIdentifier tableId = catalogTable.getTableId();
String sourceDatabaseName = tableId.getDatabaseName();
String sourceSchemaName = tableId.getSchemaName();
String sourceTableName = tableId.getTableName();
// get sink table relevant information
String sinkDatabaseName = optionalDatabase.orElse(REPLACE_DATABASE_NAME_KEY);
String sinkTableNameBefore = optionalTable.get();
String[] sinkTableSplitArray = sinkTableNameBefore.split("\\.");
String sinkTableName = sinkTableSplitArray[sinkTableSplitArray.length - 1];
String sinkSchemaName;
if (sinkTableSplitArray.length > 1) {
sinkSchemaName = sinkTableSplitArray[sinkTableSplitArray.length - 2];
} else {
sinkSchemaName = null;
}
if (StringUtils.isNotBlank(catalogOptions.get(JdbcCatalogOptions.SCHEMA))) {
sinkSchemaName = catalogOptions.get(JdbcCatalogOptions.SCHEMA);
}
// to add tablePrefix and tableSuffix
String tempTableName;
String prefix = catalogOptions.get(JdbcCatalogOptions.TABLE_PREFIX);
String suffix = catalogOptions.get(JdbcCatalogOptions.TABLE_SUFFIX);
if (StringUtils.isNotEmpty(prefix) || StringUtils.isNotEmpty(suffix)) {
tempTableName = StringUtils.isNotEmpty(prefix) ? prefix + sinkTableName : sinkTableName;
tempTableName = StringUtils.isNotEmpty(suffix) ? tempTableName + suffix : tempTableName;

} else {
tempTableName = sinkTableName;
}
// to replace
String finalDatabaseName = sinkDatabaseName;
if (StringUtils.isNotEmpty(sourceDatabaseName)) {
finalDatabaseName =
sinkDatabaseName.replace(REPLACE_DATABASE_NAME_KEY, sourceDatabaseName);
}
} else {
tempTableName = sinkTableName;
}
// to replace
String finalDatabaseName = sinkDatabaseName;
if (StringUtils.isNotEmpty(sourceDatabaseName)) {
finalDatabaseName =
sinkDatabaseName.replace(REPLACE_DATABASE_NAME_KEY, sourceDatabaseName);
}

String finalSchemaName;
if (sinkSchemaName != null) {
if (sourceSchemaName == null) {
finalSchemaName = sinkSchemaName;
} else {
finalSchemaName =
sinkSchemaName.replace(REPLACE_SCHEMA_NAME_KEY, sourceSchemaName);
}
String finalSchemaName;
if (sinkSchemaName != null) {
if (sourceSchemaName == null) {
finalSchemaName = sinkSchemaName;
} else {
finalSchemaName = null;
}
String finalTableName = sinkTableName;
if (StringUtils.isNotEmpty(sourceTableName)) {
finalTableName = tempTableName.replace(REPLACE_TABLE_NAME_KEY, sourceTableName);
finalSchemaName = sinkSchemaName.replace(REPLACE_SCHEMA_NAME_KEY, sourceSchemaName);
}
} else {
finalSchemaName = null;
}
String finalTableName = sinkTableName;
if (StringUtils.isNotEmpty(sourceTableName)) {
finalTableName = tempTableName.replace(REPLACE_TABLE_NAME_KEY, sourceTableName);
}

// rebuild TableIdentifier and catalogTable
TableIdentifier newTableId =
TableIdentifier.of(
tableId.getCatalogName(),
finalDatabaseName,
finalSchemaName,
finalTableName);
catalogTable =
CatalogTable.of(
newTableId,
catalogTable.getTableSchema(),
catalogTable.getOptions(),
catalogTable.getPartitionKeys(),
catalogTable.getComment(),
catalogTable.getCatalogName());
Map<String, String> map = config.toMap();
if (catalogTable.getTableId().getSchemaName() != null) {
map.put(
TABLE.key(),
catalogTable.getTableId().getSchemaName()
+ "."
+ catalogTable.getTableId().getTableName());
} else {
map.put(TABLE.key(), catalogTable.getTableId().getTableName());
}
map.put(DATABASE.key(), catalogTable.getTableId().getDatabaseName());
PrimaryKey primaryKey = catalogTable.getTableSchema().getPrimaryKey();
// rebuild TableIdentifier and catalogTable
TableIdentifier newTableId =
TableIdentifier.of(
tableId.getCatalogName(),
finalDatabaseName,
finalSchemaName,
finalTableName);
catalogTable =
CatalogTable.of(
newTableId,
catalogTable.getTableSchema(),
catalogTable.getOptions(),
catalogTable.getPartitionKeys(),
catalogTable.getComment(),
catalogTable.getCatalogName());
Map<String, String> map = config.toMap();
if (catalogTable.getTableId().getSchemaName() != null) {
map.put(
TABLE.key(),
catalogTable.getTableId().getSchemaName()
+ "."
+ catalogTable.getTableId().getTableName());
} else {
map.put(TABLE.key(), catalogTable.getTableId().getTableName());
}
map.put(DATABASE.key(), catalogTable.getTableId().getDatabaseName());
PrimaryKey primaryKey = catalogTable.getTableSchema().getPrimaryKey();
if (!config.getOptional(PRIMARY_KEYS).isPresent()) {
if (primaryKey != null && !CollectionUtils.isEmpty(primaryKey.getColumnNames())) {
map.put(PRIMARY_KEYS.key(), String.join(",", primaryKey.getColumnNames()));
} else {
Expand All @@ -202,12 +200,15 @@ public TableSink createSink(TableSinkFactoryContext context) {
.collect(Collectors.joining(",")));
}
}
config = ReadonlyConfig.fromMap(new HashMap<>(map));
}
config = ReadonlyConfig.fromMap(new HashMap<>(map));
// always execute
final ReadonlyConfig options = config;
JdbcSinkConfig sinkConfig = JdbcSinkConfig.of(config);
FieldIdeEnum fieldIdeEnum = config.get(JdbcOptions.FIELD_IDE);
catalogTable
.getOptions()
.put("fieldIde", fieldIdeEnum == null ? null : fieldIdeEnum.getValue());
JdbcDialect dialect =
JdbcDialectLoader.load(
sinkConfig.getJdbcConnectionConfig().getUrl(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
@Slf4j
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "Currently SPARK do not support cdc")
public class SqlServerCDCIT extends TestSuiteBase implements TestResource {

Expand Down

0 comments on commit 2f62235

Please sign in to comment.