diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index 88a2d59e3f1..81222695a38 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -553,6 +553,8 @@ jobs: java-version: ${{ matrix.java }} distribution: 'temurin' cache: 'maven' + - name: free disk space + run: tools/github/free_disk_space.sh - name: run seatunnel zeta integration test if: needs.changes.outputs.api == 'true' run: | @@ -609,6 +611,8 @@ jobs: java-version: ${{ matrix.java }} distribution: 'temurin' cache: 'maven' + - name: free disk space + run: tools/github/free_disk_space.sh - name: run transform-v2 integration test (part-1) if: needs.changes.outputs.api == 'true' run: | @@ -633,6 +637,8 @@ jobs: java-version: ${{ matrix.java }} distribution: 'temurin' cache: 'maven' + - name: free disk space + run: tools/github/free_disk_space.sh - name: run transform-v2 integration test (part-2) if: needs.changes.outputs.api == 'true' run: | @@ -657,6 +663,8 @@ jobs: java-version: ${{ matrix.java }} distribution: 'temurin' cache: 'maven' + - name: free disk space + run: tools/github/free_disk_space.sh - name: run connector-v2 integration test (part-1) if: needs.changes.outputs.api == 'true' run: | @@ -684,6 +692,8 @@ jobs: java-version: ${{ matrix.java }} distribution: 'temurin' cache: 'maven' + - name: free disk space + run: tools/github/free_disk_space.sh - name: run connector-v2 integration test (part-2) if: needs.changes.outputs.api == 'true' run: | @@ -711,6 +721,8 @@ jobs: java-version: ${{ matrix.java }} distribution: 'temurin' cache: 'maven' + - name: free disk space + run: tools/github/free_disk_space.sh - name: run connector-v2 integration test (part-3) if: needs.changes.outputs.api == 'true' run: | @@ -738,6 +750,8 @@ jobs: java-version: ${{ matrix.java }} distribution: 'temurin' cache: 'maven' + - name: free disk space + run: tools/github/free_disk_space.sh - name: run connector-v2 integration test (part-4) if: needs.changes.outputs.api == 'true' run: | @@ -765,6 +779,8 @@ jobs: java-version: ${{ matrix.java }} distribution: 'temurin' cache: 'maven' + - name: free disk space + run: tools/github/free_disk_space.sh - name: run connector-v2 integration test (part-5) if: needs.changes.outputs.api == 'true' run: | @@ -792,6 +808,8 @@ jobs: java-version: ${{ matrix.java }} distribution: 'temurin' cache: 'maven' + - name: free disk space + run: tools/github/free_disk_space.sh - name: run connector-v2 integration test (part-6) if: needs.changes.outputs.api == 'true' run: | @@ -819,6 +837,8 @@ jobs: java-version: ${{ matrix.java }} distribution: 'temurin' cache: 'maven' + - name: free disk space + run: tools/github/free_disk_space.sh - name: run connector-v2 integration test (part-7) if: needs.changes.outputs.api == 'true' run: | @@ -898,6 +918,8 @@ jobs: java-version: ${{ matrix.java }} distribution: 'temurin' cache: 'maven' + - name: free disk space + run: tools/github/free_disk_space.sh - name: run jdbc connectors integration test (part-3) if: needs.changes.outputs.api == 'true' run: | @@ -922,6 +944,8 @@ jobs: java-version: ${{ matrix.java }} distribution: 'temurin' cache: 'maven' + - name: free disk space + run: tools/github/free_disk_space.sh - name: run jdbc connectors integration test (part-4) if: needs.changes.outputs.api == 'true' run: | @@ -946,6 +970,8 @@ jobs: java-version: ${{ matrix.java }} distribution: 'temurin' cache: 'maven' + - name: free disk space + run: tools/github/free_disk_space.sh - name: run jdbc connectors integration test (part-5) if: needs.changes.outputs.api == 'true' run: | @@ -996,6 +1022,8 @@ jobs: java-version: ${{ matrix.java }} distribution: 'temurin' cache: 'maven' + - name: free disk space + run: tools/github/free_disk_space.sh - name: run jdbc connectors integration test (part-7) if: needs.changes.outputs.api == 'true' run: | @@ -1020,6 +1048,8 @@ jobs: java-version: ${{ matrix.java }} distribution: 'temurin' cache: 'maven' + - name: free disk space + run: tools/github/free_disk_space.sh - name: run kudu connector integration test run: | ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-kudu-e2e -am -Pci @@ -1043,6 +1073,8 @@ jobs: java-version: ${{ matrix.java }} distribution: 'temurin' cache: 'maven' + - name: free disk space + run: tools/github/free_disk_space.sh - name: run amazonsqs connector integration test run: | ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-amazonsqs-e2e -am -Pci @@ -1066,6 +1098,8 @@ jobs: java-version: ${{ matrix.java }} distribution: 'temurin' cache: 'maven' + - name: free disk space + run: tools/github/free_disk_space.sh - name: run kafka connector integration test run: | ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-kafka-e2e -am -Pci @@ -1089,6 +1123,8 @@ jobs: java-version: ${{ matrix.java }} distribution: 'temurin' cache: 'maven' + - name: free disk space + run: tools/github/free_disk_space.sh - name: run rocket connector integration test run: | ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-rocketmq-e2e -am -Pci @@ -1139,6 +1175,8 @@ jobs: java-version: ${{ matrix.java }} distribution: 'temurin' cache: 'maven' + - name: free disk space + run: tools/github/free_disk_space.sh - name: run oracle cdc connector integration test run: | ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-cdc-oracle-e2e -am -Pci diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java index 10436da09b8..5664e48b4e6 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java @@ -18,9 +18,16 @@ package org.apache.seatunnel.api.table.factory; import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.common.utils.SeaTunnelException; + +import org.apache.commons.lang3.StringUtils; import lombok.Getter; +import java.util.ArrayList; +import java.util.List; + @Getter public abstract class TableFactoryContext { @@ -31,4 +38,25 @@ public TableFactoryContext(ReadonlyConfig options, ClassLoader classLoader) { this.options = options; this.classLoader = classLoader; } + + protected static void checkCatalogTableIllegal(List catalogTables) { + for (CatalogTable catalogTable : catalogTables) { + List alreadyChecked = new ArrayList<>(); + for (String fieldName : catalogTable.getTableSchema().getFieldNames()) { + if (StringUtils.isBlank(fieldName)) { + throw new SeaTunnelException( + String.format( + "Table %s field name cannot be empty", + catalogTable.getTablePath().getFullName())); + } + if (alreadyChecked.contains(fieldName)) { + throw new SeaTunnelException( + String.format( + "Table %s field %s duplicate", + catalogTable.getTablePath().getFullName(), fieldName)); + } + alreadyChecked.add(fieldName); + } + } + } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java index 9565bad6a03..3e0eb24cd59 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java @@ -21,18 +21,24 @@ import org.apache.seatunnel.api.sink.TablePlaceholder; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import com.google.common.annotations.VisibleForTesting; import lombok.Getter; import java.util.Collection; +import java.util.Collections; @Getter public class TableSinkFactoryContext extends TableFactoryContext { private final CatalogTable catalogTable; - protected TableSinkFactoryContext( + @VisibleForTesting + public TableSinkFactoryContext( CatalogTable catalogTable, ReadonlyConfig options, ClassLoader classLoader) { super(options, classLoader); + if (catalogTable != null) { + checkCatalogTableIllegal(Collections.singletonList(catalogTable)); + } this.catalogTable = catalogTable; } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactoryContext.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactoryContext.java index bf8176c7a8d..8e274a8e5e5 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactoryContext.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactoryContext.java @@ -32,6 +32,7 @@ public class TableTransformFactoryContext extends TableFactoryContext { public TableTransformFactoryContext( List catalogTables, ReadonlyConfig options, ClassLoader classLoader) { super(options, classLoader); + checkCatalogTableIllegal(catalogTables); this.catalogTables = catalogTables; } } diff --git a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableTest.java b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableTest.java index d3c7692b606..0ed70456052 100644 --- a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableTest.java +++ b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableTest.java @@ -18,7 +18,11 @@ package org.apache.seatunnel.api.table.catalog; import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; +import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; +import org.apache.seatunnel.common.utils.SeaTunnelException; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -89,4 +93,62 @@ public void testReadCatalogTableWithUnsupportedType() { }); Assertions.assertEquals(result, exception.getParamsValueAs("tableUnsupportedTypes")); } + + @Test + public void testCatalogTableWithIllegalFieldNames() { + CatalogTable catalogTable = + CatalogTable.of( + TableIdentifier.of("catalog", "database", "table"), + TableSchema.builder() + .column( + PhysicalColumn.of( + " ", BasicType.STRING_TYPE, 1L, true, null, "")) + .build(), + Collections.emptyMap(), + Collections.emptyList(), + "comment"); + SeaTunnelException exception = + Assertions.assertThrows( + SeaTunnelException.class, + () -> + new TableTransformFactoryContext( + Collections.singletonList(catalogTable), null, null)); + SeaTunnelException exception2 = + Assertions.assertThrows( + SeaTunnelException.class, + () -> new TableSinkFactoryContext(catalogTable, null, null)); + Assertions.assertEquals( + "Table database.table field name cannot be empty", exception.getMessage()); + Assertions.assertEquals( + "Table database.table field name cannot be empty", exception2.getMessage()); + + CatalogTable catalogTable2 = + CatalogTable.of( + TableIdentifier.of("catalog", "database", "table"), + TableSchema.builder() + .column( + PhysicalColumn.of( + "name1", BasicType.STRING_TYPE, 1L, true, null, "")) + .column( + PhysicalColumn.of( + "name1", BasicType.STRING_TYPE, 1L, true, null, "")) + .build(), + Collections.emptyMap(), + Collections.emptyList(), + "comment"); + SeaTunnelException exception3 = + Assertions.assertThrows( + SeaTunnelException.class, + () -> + new TableTransformFactoryContext( + Collections.singletonList(catalogTable2), null, null)); + SeaTunnelException exception4 = + Assertions.assertThrows( + SeaTunnelException.class, + () -> new TableSinkFactoryContext(catalogTable2, null, null)); + Assertions.assertEquals( + "Table database.table field name1 duplicate", exception3.getMessage()); + Assertions.assertEquals( + "Table database.table field name1 duplicate", exception4.getMessage()); + } }