diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java index d285af7105143c..d95427b9fdc1c7 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java @@ -48,7 +48,7 @@ public class SqlCreateCatalog extends SqlCreate { private final SqlNodeList propertyList; - private final @Nullable SqlNode comment; + @Nullable private final SqlNode comment; public SqlCreateCatalog( SqlParserPos position, diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index 41372cab5532a5..8f75d9f0ad63da 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -26,12 +26,15 @@ import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.parser.SqlParserFixture; import org.apache.calcite.sql.parser.SqlParserTest; +import org.apache.commons.lang3.StringUtils; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; import org.hamcrest.TypeSafeDiagnosingMatcher; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -121,41 +124,29 @@ void testUseCatalog() { sql("use catalog a").ok("USE CATALOG `A`"); } - @Test - void testCreateCatalog() { - sql("create catalog c1\n" + @ParameterizedTest + @CsvSource({"true,true", "true,false", "false,true", "false,false"}) + void testCreateCatalog(boolean ifNotExists, boolean comment) { + String ifNotExistsClause = ifNotExists ? "if not exists " : ""; + String commentClause = comment ? "\ncomment 'HELLO' " : " "; + + sql("create catalog " + + ifNotExistsClause + + "c1" + + commentClause + " WITH (\n" + " 'key1'='value1',\n" + " 'key2'='value2'\n" + " )\n") .ok( - "CREATE CATALOG `C1` " + "CREATE CATALOG " + + StringUtils.upperCase(ifNotExistsClause) + + "`C1`" + + StringUtils.upperCase(commentClause) + "WITH (\n" + " 'key1' = 'value1',\n" + " 'key2' = 'value2'\n" + ")"); - sql("create catalog c1 comment 'hello'\n" - + " WITH (\n" - + " 'key1'='value1',\n" - + " 'key2'='value2'\n" - + " )\n") - .ok( - "CREATE CATALOG `C1`\n" - + "COMMENT 'hello' WITH (\n" - + " 'key1' = 'value1',\n" - + " 'key2' = 'value2'\n" - + ")"); - sql("create catalog if not exists c1 comment 'hello'\n" - + " WITH (\n" - + " 'key1'='value1',\n" - + " 'key2'='value2'\n" - + " )\n") - .ok( - "CREATE CATALOG IF NOT EXISTS `C1`\n" - + "COMMENT 'hello' WITH (\n" - + " 'key1' = 'value1',\n" - + " 'key2' = 'value2'\n" - + ")"); } @Test diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index 1d02db67096e4c..f97a625a476858 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -308,21 +308,18 @@ public void createCatalog( "Catalog name cannot be null or empty."); checkNotNull(catalogDescriptor, "Catalog descriptor cannot be null"); - if (catalogStoreHolder.catalogStore().contains(catalogName)) { + boolean catalogExistsInStore = catalogStoreHolder.catalogStore().contains(catalogName); + boolean catalogExistsInMemory = catalogs.containsKey(catalogName); + + if (catalogExistsInStore || catalogExistsInMemory) { if (!ignoreIfExists) { - throw new CatalogException( - format("Catalog %s already exists in catalog store.", catalogName)); + throw new CatalogException(format("Catalog %s already exists.", catalogName)); } } else { + // Store the catalog in the catalog store catalogStoreHolder.catalogStore().storeCatalog(catalogName, catalogDescriptor); - } - if (catalogs.containsKey(catalogName)) { - if (!ignoreIfExists) { - throw new CatalogException( - format("Catalog %s already exists in initialized catalogs.", catalogName)); - } - } else { + // Initialize and store the catalog in memory Catalog catalog = initCatalog(catalogName, catalogDescriptor); catalog.open(); catalogs.put(catalogName, catalog); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java index 325e7f1ef14fdc..cdacafadefc58c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java @@ -41,7 +41,7 @@ public class CreateCatalogOperation implements CreateOperation { private final String catalogName; private final Map properties; - private final @Nullable String comment; + @Nullable private final String comment; private final boolean ignoreIfExists; public CreateCatalogOperation( @@ -63,10 +63,6 @@ public Map getProperties() { return properties; } - public @Nullable String getComment() { - return comment; - } - public boolean isIgnoreIfExists() { return ignoreIfExists; } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java index f43f781363c72a..632387a0ebcf98 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java @@ -376,6 +376,17 @@ void testCatalogStore() throws Exception { CatalogDescriptor.of( "cat_comment", configuration.clone(), "second comment for catalog"), true); + assertThatThrownBy( + () -> + catalogManager.createCatalog( + "cat_comment", + CatalogDescriptor.of( + "cat_comment", + configuration.clone(), + "third comment for catalog"), + false)) + .isInstanceOf(CatalogException.class) + .hasMessage("Catalog cat_comment already exists."); assertTrue(catalogManager.getCatalog("cat1").isPresent()); assertTrue(catalogManager.getCatalog("cat2").isPresent()); @@ -398,14 +409,14 @@ void testCatalogStore() throws Exception { catalogManager.createCatalog( "cat1", CatalogDescriptor.of("cat1", configuration))) .isInstanceOf(CatalogException.class) - .hasMessageContaining("Catalog cat1 already exists in catalog store."); + .hasMessageContaining("Catalog cat1 already exists."); assertThatThrownBy( () -> catalogManager.createCatalog( "cat4", CatalogDescriptor.of("cat4", configuration))) .isInstanceOf(CatalogException.class) - .hasMessageContaining("Catalog cat4 already exists in initialized catalogs."); + .hasMessageContaining("Catalog cat4 already exists."); catalogManager.createDatabase( "exist_cat", diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDescriptor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDescriptor.java index d80fa8b768995d..f984203672de47 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDescriptor.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDescriptor.java @@ -59,10 +59,6 @@ public Optional getComment() { return Optional.ofNullable(comment); } - public CatalogDescriptor setComment(String comment) { - return new CatalogDescriptor(catalogName, configuration, comment); - } - private CatalogDescriptor( String catalogName, Configuration configuration, @Nullable String comment) { this.catalogName = catalogName; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateCatalogConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateCatalogConverter.java index ecbb11ce5e46ca..ea65f8f4ed8801 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateCatalogConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateCatalogConverter.java @@ -23,11 +23,12 @@ import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateCatalogOperation; +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.util.NlsString; + import java.util.HashMap; import java.util.Map; -import static org.apache.flink.table.planner.utils.OperationConverterUtils.getCatalogComment; - /** A converter for {@link SqlCreateCatalog}. */ public class SqlCreateCatalogConverter implements SqlNodeConverter { @@ -46,7 +47,10 @@ public Operation convertSqlNode(SqlCreateCatalog node, ConvertContext context) { return new CreateCatalogOperation( node.catalogName(), properties, - getCatalogComment(node.getComment()), + node.getComment() + .map(SqlCharStringLiteral.class::cast) + .map(c -> c.getValueAs(NlsString.class).getValue()) + .orElse(null), node.isIfNotExists()); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java index 0f75934f2b415f..1b1d224cf856bd 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java @@ -27,10 +27,8 @@ import org.apache.calcite.sql.SqlCharStringLiteral; import org.apache.calcite.sql.SqlIdentifier; -import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlNumericLiteral; -import org.apache.calcite.util.NlsString; import javax.annotation.Nullable; @@ -87,13 +85,6 @@ public static List buildModifyColumnChange( } } - public static @Nullable String getCatalogComment(Optional catalogComment) { - return catalogComment - .map(SqlCharStringLiteral.class::cast) - .map(c -> c.getValueAs(NlsString.class).getValue()) - .orElse(null); - } - public static @Nullable String getComment(SqlTableColumn column) { return column.getComment() .map(SqlCharStringLiteral.class::cast)