Skip to content

Commit

Permalink
[Improve][Connector-V2] Clean key name in catalog table (apache#6942)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Jun 14, 2024
1 parent 670bba0 commit a399ef4
Show file tree
Hide file tree
Showing 6 changed files with 1,929 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import com.mysql.cj.MysqlType;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -150,8 +152,9 @@ public String build(String catalogName) {

private String buildColumnsIdentifySql(String catalogName) {
List<String> columnSqls = new ArrayList<>();
Map<String, String> columnTypeMap = new HashMap<>();
for (Column column : columns) {
columnSqls.add("\t" + buildColumnIdentifySql(column, catalogName));
columnSqls.add("\t" + buildColumnIdentifySql(column, catalogName, columnTypeMap));
}
if (primaryKey != null) {
columnSqls.add("\t" + buildPrimaryKeySql());
Expand All @@ -161,28 +164,34 @@ private String buildColumnsIdentifySql(String catalogName) {
if (StringUtils.isBlank(constraintKey.getConstraintName())) {
continue;
}
// columnSqls.add("\t" + buildConstraintKeySql(constraintKey));
String constraintKeyStr = buildConstraintKeySql(constraintKey, columnTypeMap);
if (StringUtils.isNotBlank(constraintKeyStr)) {
columnSqls.add("\t" + constraintKeyStr);
}
}
}
return String.join(", \n", columnSqls);
}

private String buildColumnIdentifySql(Column column, String catalogName) {
private String buildColumnIdentifySql(
Column column, String catalogName, Map<String, String> columnTypeMap) {
final List<String> columnSqls = new ArrayList<>();
columnSqls.add(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "`"));
boolean isSupportDef = true;

String type;
if ((SqlType.TIME.equals(column.getDataType().getSqlType())
|| SqlType.TIMESTAMP.equals(column.getDataType().getSqlType()))
&& column.getScale() != null) {
BasicTypeDefine<MysqlType> typeDefine = typeConverter.reconvert(column);
columnSqls.add(typeDefine.getColumnType());
} else if (StringUtils.equals(catalogName, DatabaseIdentifier.MYSQL)) {
columnSqls.add(column.getSourceType());
type = typeDefine.getColumnType();
} else if (StringUtils.equals(catalogName, DatabaseIdentifier.MYSQL)
&& StringUtils.isNotBlank(column.getSourceType())) {
type = column.getSourceType();
} else {
BasicTypeDefine<MysqlType> typeDefine = typeConverter.reconvert(column);
columnSqls.add(typeDefine.getColumnType());
type = typeDefine.getColumnType();
}
columnSqls.add(type);
columnTypeMap.put(column.getName(), type);
// nullable
if (column.isNullable()) {
columnSqls.add("NULL");
Expand All @@ -206,19 +215,32 @@ private String buildPrimaryKeySql() {
return String.format("PRIMARY KEY (%s)", CatalogUtils.quoteIdentifier(key, fieldIde));
}

private String buildConstraintKeySql(ConstraintKey constraintKey) {
private String buildConstraintKeySql(
ConstraintKey constraintKey, Map<String, String> columnTypeMap) {
ConstraintKey.ConstraintType constraintType = constraintKey.getConstraintType();
String indexColumns =
constraintKey.getColumnNames().stream()
.map(
constraintKeyColumn -> {
String columnName = constraintKeyColumn.getColumnName();
boolean withLength = false;
if (columnTypeMap.containsKey(columnName)) {
String columnType = columnTypeMap.get(columnName);
if (columnType.endsWith("BLOB")
|| columnType.endsWith("TEXT")) {
withLength = true;
}
}
if (constraintKeyColumn.getSortType() == null) {
return String.format(
"`%s`", constraintKeyColumn.getColumnName());
"`%s`%s",
CatalogUtils.getFieldIde(columnName, fieldIde),
withLength ? "(255)" : "");
}
return String.format(
"`%s` %s",
constraintKeyColumn.getColumnName(),
"`%s`%s %s",
CatalogUtils.getFieldIde(columnName, fieldIde),
withLength ? "(255)" : "",
constraintKeyColumn.getSortType().name());
})
.collect(Collectors.joining(", "));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public static Optional<PrimaryKey> getPrimaryKey(DatabaseMetaData metaData, Tabl
while (rs.next()) {
String columnName = rs.getString("COLUMN_NAME");
// all the PK_NAME should be the same
pkName = rs.getString("PK_NAME");
pkName = cleanKeyName(rs.getString("PK_NAME"));
int keySeq = rs.getInt("KEY_SEQ");
// KEY_SEQ is 1-based index
primaryKeyColumns.add(Pair.of(keySeq, columnName));
Expand Down Expand Up @@ -152,7 +152,7 @@ public static List<ConstraintKey> getConstraintKeys(
if (columnName == null) {
continue;
}
String indexName = resultSet.getString("INDEX_NAME");
String indexName = cleanKeyName(resultSet.getString("INDEX_NAME"));
boolean noUnique = resultSet.getBoolean("NON_UNIQUE");

ConstraintKey constraintKey =
Expand All @@ -179,6 +179,15 @@ public static List<ConstraintKey> getConstraintKeys(
return new ArrayList<>(constraintKeyMap.values());
}

private static String cleanKeyName(String keyName) {
if (keyName != null) {
// only keep the characters that are valid in an index name
keyName = keyName.replaceAll("[^a-zA-Z0-9_]", "");
keyName = keyName.replaceAll("^_+", "");
}
return keyName;
}

public static TableSchema getTableSchema(
DatabaseMetaData metadata, TablePath tablePath, JdbcDialectTypeMapper typeMapper)
throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MysqlCreateTableSqlBuilder;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;
Expand All @@ -37,6 +38,7 @@

import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;

public class MysqlCreateTableSqlBuilderTest {
Expand All @@ -58,6 +60,14 @@ public void testBuild() {
.column(
PhysicalColumn.of(
"age", BasicType.INT_TYPE, (Long) null, true, null, "age"))
.column(
PhysicalColumn.of(
"blob_v",
PrimitiveByteArrayType.INSTANCE,
Long.MAX_VALUE,
true,
null,
"blob_v"))
.column(
PhysicalColumn.of(
"createTime",
Expand All @@ -76,12 +86,19 @@ public void testBuild() {
"lastUpdateTime"))
.primaryKey(PrimaryKey.of("id", Lists.newArrayList("id")))
.constraintKey(
ConstraintKey.of(
ConstraintKey.ConstraintType.INDEX_KEY,
"name",
Lists.newArrayList(
ConstraintKey.ConstraintKeyColumn.of(
"name", null))))
Arrays.asList(
ConstraintKey.of(
ConstraintKey.ConstraintType.INDEX_KEY,
"name",
Lists.newArrayList(
ConstraintKey.ConstraintKeyColumn.of(
"name", null))),
ConstraintKey.of(
ConstraintKey.ConstraintType.INDEX_KEY,
"blob_v",
Lists.newArrayList(
ConstraintKey.ConstraintKeyColumn.of(
"blob_v", null)))))
.build();
CatalogTable catalogTable =
CatalogTable.of(
Expand All @@ -98,12 +115,15 @@ public void testBuild() {
// create table sql is change; The old unit tests are no longer applicable
String expect =
"CREATE TABLE `test_table` (\n"
+ "\t`id` null NOT NULL COMMENT 'id', \n"
+ "\t`name` null NOT NULL COMMENT 'name', \n"
+ "\t`age` null NULL COMMENT 'age', \n"
+ "\t`createTime` null NULL COMMENT 'createTime', \n"
+ "\t`lastUpdateTime` null NULL COMMENT 'lastUpdateTime', \n"
+ "\tPRIMARY KEY (`id`)\n"
+ "\t`id` BIGINT NOT NULL COMMENT 'id', \n"
+ "\t`name` VARCHAR(128) NOT NULL COMMENT 'name', \n"
+ "\t`age` INT NULL COMMENT 'age', \n"
+ "\t`blob_v` LONGBLOB NULL COMMENT 'blob_v', \n"
+ "\t`createTime` DATETIME NULL COMMENT 'createTime', \n"
+ "\t`lastUpdateTime` DATETIME NULL COMMENT 'lastUpdateTime', \n"
+ "\tPRIMARY KEY (`id`), \n"
+ "\tKEY `name` (`name`), \n"
+ "\tKEY `blob_v` (`blob_v`(255))\n"
+ ") COMMENT = 'User table';";
CONSOLE.println(expect);
Assertions.assertEquals(expect, createTableSql);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils;

import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TablePath;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.sql.SQLException;
import java.util.List;
import java.util.Optional;

public class CatalogUtilsTest {

@Test
void testPrimaryKeysNameWithOutSpecialChar() throws SQLException {
Optional<PrimaryKey> primaryKey =
CatalogUtils.getPrimaryKey(new TestDatabaseMetaData(), TablePath.of("test.test"));
Assertions.assertEquals("testfdawe_", primaryKey.get().getPrimaryKey());
}

@Test
void testConstraintKeysNameWithOutSpecialChar() throws SQLException {
List<ConstraintKey> constraintKeys =
CatalogUtils.getConstraintKeys(
new TestDatabaseMetaData(), TablePath.of("test.test"));
Assertions.assertEquals("testfdawe_", constraintKeys.get(0).getConstraintName());
}
}
Loading

0 comments on commit a399ef4

Please sign in to comment.