From b2d161b77608c91bcb0d59b9155a74f21bc6ef22 Mon Sep 17 00:00:00 2001 From: Yubin Li Date: Thu, 7 Dec 2023 17:35:07 +0800 Subject: [PATCH] [FLINK-24939][table] Introduce "SHOW CREATE CATALOG" Syntax --- .../src/test/resources/sql/catalog_database.q | 16 +++++ .../src/main/codegen/data/Parser.tdd | 1 + .../src/main/codegen/includes/parserImpls.ftl | 12 +++- .../sql/parser/dql/SqlShowCreateCatalog.java | 67 +++++++++++++++++++ .../sql/parser/FlinkSqlParserImplTest.java | 5 ++ .../table/api/internal/ShowCreateUtil.java | 27 ++++++-- .../flink/table/catalog/CatalogManager.java | 4 ++ .../ShowCreateCatalogOperation.java | 65 ++++++++++++++++++ .../SqlNodeToOperationConversion.java | 18 +++++ .../SqlOtherOperationConverterTest.java | 8 +++ 10 files changed, 215 insertions(+), 8 deletions(-) create mode 100644 flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateCatalog.java create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateCatalogOperation.java diff --git a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q index 11958807220800..140022ff01ce6c 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q @@ -686,3 +686,19 @@ show tables from db1 like 'p_r%'; +------------+ 1 row in set !ok + +# ========================================================================== +# test show create catalog +# ========================================================================== + +show create catalog catalog1; ++--------------------------------------------------------------------+ +| result | ++--------------------------------------------------------------------+ +| CREATE CATALOG `catalog1` WITH ( + 'type' = 'generic_in_memory' +) + | ++--------------------------------------------------------------------+ +1 row in set +!ok diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 8c39f41b4631c5..d88a9c0f4542bc 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -111,6 +111,7 @@ "org.apache.flink.sql.parser.dql.SqlShowCreate" "org.apache.flink.sql.parser.dql.SqlShowCreateTable" "org.apache.flink.sql.parser.dql.SqlShowCreateView" + "org.apache.flink.sql.parser.dql.SqlShowCreateCatalog" "org.apache.flink.sql.parser.dql.SqlShowViews" "org.apache.flink.sql.parser.dql.SqlRichDescribeTable" "org.apache.flink.sql.parser.dql.SqlUnloadModule" diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index 58d1a2c6c390be..dbb1be45b3394c 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -583,7 +583,8 @@ SqlShowViews SqlShowViews() : } /** -* SHOW TABLES FROM [catalog.] database sql call. +* Parses a show tables statement. +* SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE pattern ]; */ SqlShowTables SqlShowTables() : { @@ -653,7 +654,7 @@ SqlShowColumns SqlShowColumns() : } /** -* Parse a "Show Create Table" query and "Show Create View" query commands. +* Parse a "Show Create Table" query and "Show Create View" and "Show Create Catalog" query commands. */ SqlShowCreate SqlShowCreate() : { @@ -676,6 +677,13 @@ SqlShowCreate SqlShowCreate() : { return new SqlShowCreateView(pos, sqlIdentifier); } + | + + { pos = getPos(); } + sqlIdentifier = CompoundIdentifier() + { + return new SqlShowCreateCatalog(pos, sqlIdentifier); + } ) } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateCatalog.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateCatalog.java new file mode 100644 index 00000000000000..5f3f7f561f3553 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateCatalog.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.dql; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** SHOW CREATE CATALOG sql call. */ +public class SqlShowCreateCatalog extends SqlShowCreate { + + public static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("SHOW CREATE CATALOG", SqlKind.OTHER_DDL); + + protected final SqlIdentifier catalogName; + + public SqlShowCreateCatalog(SqlParserPos pos, SqlIdentifier catalogName) { + super(pos, catalogName); + this.catalogName = catalogName; + } + + public String[] getCatalogName() { + return Objects.isNull(this.catalogName) + ? new String[] {} + : catalogName.names.toArray(new String[0]); + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List getOperandList() { + return Collections.singletonList(sqlIdentifier); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("SHOW CREATE CATALOG"); + sqlIdentifier.unparse(writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index d12b9b0d3faed0..48b3b6b2e127d1 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -127,6 +127,11 @@ void testCreateCatalog() { + ")"); } + @Test + void testShowCreateCatalog() { + sql("show create catalog c1").ok("SHOW CREATE CATALOG `C1`"); + } + @Test void testDropCatalog() { sql("drop catalog c1").ok("DROP CATALOG `C1`"); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java index 2a9eb90bbe75f0..c11aca26046ed9 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.TableException; import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDescriptor; import org.apache.flink.table.catalog.CatalogView; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ObjectIdentifier; @@ -33,10 +34,13 @@ import org.apache.commons.lang3.StringUtils; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; +import static org.apache.flink.table.utils.EncodingUtils.escapeIdentifier; + /** SHOW CREATE statement Util. */ @Internal public class ShowCreateUtil { @@ -72,7 +76,7 @@ public static String buildShowCreateTableRow( sb.append("PARTITIONED BY (") .append(partitionedInfoFormatted) .append(")\n")); - extractFormattedOptions(table, printIndent) + extractFormattedOptions(table.getOptions(), printIndent) .ifPresent(v -> sb.append("WITH (\n").append(v).append("\n)\n")); return sb.toString(); } @@ -111,6 +115,18 @@ public static String buildShowCreateViewRow( return stringBuilder.toString(); } + public static String buildShowCreateCatalogRow(CatalogDescriptor catalogDescriptor) { + final String printIndent = " "; + return String.format( + "CREATE CATALOG %s WITH (%s%s%s)%s", + escapeIdentifier(catalogDescriptor.getCatalogName()), + System.lineSeparator(), + extractFormattedOptions(catalogDescriptor.getConfiguration().toMap(), printIndent) + .orElse(""), + System.lineSeparator(), + System.lineSeparator()); + } + static String buildCreateFormattedPrefix( String tableType, boolean isTemporary, ObjectIdentifier identifier) { return String.format( @@ -213,13 +229,12 @@ static Optional extractFormattedPartitionedInfo(ResolvedCatalogTable cat .collect(Collectors.joining(", "))); } - static Optional extractFormattedOptions( - ResolvedCatalogBaseTable table, String printIndent) { - if (Objects.isNull(table.getOptions()) || table.getOptions().isEmpty()) { + static Optional extractFormattedOptions(Map conf, String printIndent) { + if (Objects.isNull(conf) || conf.isEmpty()) { return Optional.empty(); } return Optional.of( - table.getOptions().entrySet().stream() + conf.entrySet().stream() .map( entry -> String.format( @@ -227,7 +242,7 @@ static Optional extractFormattedOptions( printIndent, EncodingUtils.escapeSingleQuotes(entry.getKey()), EncodingUtils.escapeSingleQuotes(entry.getValue()))) - .collect(Collectors.joining(",\n"))); + .collect(Collectors.joining("," + System.lineSeparator()))); } static String extractFormattedColumnNames(ResolvedCatalogBaseTable baseTable) { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index de6fdb8e8db21d..02136d7dd2a4c1 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -138,6 +138,10 @@ public List getCatalogModificationListeners() { return catalogModificationListeners; } + public CatalogStore getCatalogStore() { + return catalogStoreHolder.catalogStore(); + } + public static Builder newBuilder() { return new Builder(); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateCatalogOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateCatalogOperation.java new file mode 100644 index 00000000000000..d754e1cd2ab280 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateCatalogOperation.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.ShowCreateUtil; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogDescriptor; + +import static org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Operation to describe a SHOW CREATE CATALOG statement. */ +@Internal +public class ShowCreateCatalogOperation implements ShowOperation { + + private final String catalogName; + + public ShowCreateCatalogOperation(String catalogName) { + this.catalogName = checkNotNull(catalogName, "Catalog name must not be null"); + } + + public String getCatalogName() { + return catalogName; + } + + @Override + public String asSummaryString() { + return String.format("SHOW CREATE CATALOG %s", catalogName); + } + + @Override + public TableResultInternal execute(Context ctx) { + CatalogDescriptor catalogDescriptor = + ctx.getCatalogManager() + .getCatalogStore() + .getCatalog(catalogName) + .orElseThrow( + () -> + new ValidationException( + String.format( + "Cannot obtain metadata from Catalog %s.", + catalogName))); + String resultRow = ShowCreateUtil.buildShowCreateCatalogRow(catalogDescriptor); + + return buildStringArrayResult("result", new String[] {resultRow}); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java index 63bae2ecd4411b..be295e4b3bf695 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java @@ -65,6 +65,7 @@ import org.apache.flink.sql.parser.dql.SqlRichExplain; import org.apache.flink.sql.parser.dql.SqlShowCatalogs; import org.apache.flink.sql.parser.dql.SqlShowColumns; +import org.apache.flink.sql.parser.dql.SqlShowCreateCatalog; import org.apache.flink.sql.parser.dql.SqlShowCreateTable; import org.apache.flink.sql.parser.dql.SqlShowCreateView; import org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog; @@ -126,6 +127,7 @@ import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.operations.ShowCatalogsOperation; import org.apache.flink.table.operations.ShowColumnsOperation; +import org.apache.flink.table.operations.ShowCreateCatalogOperation; import org.apache.flink.table.operations.ShowCreateTableOperation; import org.apache.flink.table.operations.ShowCreateViewOperation; import org.apache.flink.table.operations.ShowCurrentCatalogOperation; @@ -333,6 +335,9 @@ private static Optional convertValidatedSqlNode( return Optional.of(converter.convertShowCreateTable((SqlShowCreateTable) validated)); } else if (validated instanceof SqlShowCreateView) { return Optional.of(converter.convertShowCreateView((SqlShowCreateView) validated)); + } else if (validated instanceof SqlShowCreateCatalog) { + return Optional.of( + converter.convertShowCreateCatalog((SqlShowCreateCatalog) validated)); } else if (validated instanceof SqlRichExplain) { return Optional.of(converter.convertRichExplain((SqlRichExplain) validated)); } else if (validated instanceof SqlRichDescribeTable) { @@ -948,6 +953,19 @@ private Operation convertShowCreateTable(SqlShowCreateTable sqlShowCreateTable) return new ShowCreateTableOperation(identifier); } + /** Convert SHOW CREATE CATALOG statement. */ + private Operation convertShowCreateCatalog(SqlShowCreateCatalog sqlShowCreateCatalog) { + String[] parsedCatalogName = sqlShowCreateCatalog.getCatalogName(); + if (parsedCatalogName.length == 1) { + return new ShowCreateCatalogOperation(parsedCatalogName[0]); + } else { + throw new ValidationException( + String.format( + "show create catalog identifier [ %s ] format error", + String.join(".", parsedCatalogName))); + } + } + /** Convert SHOW CREATE VIEW statement. */ private Operation convertShowCreateView(SqlShowCreateView sqlShowCreateView) { UnresolvedIdentifier unresolvedIdentifier = diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java index 48fc6fe61047c1..4acae04ad9c3eb 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java @@ -22,6 +22,7 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.operations.LoadModuleOperation; import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ShowCreateCatalogOperation; import org.apache.flink.table.operations.ShowDatabasesOperation; import org.apache.flink.table.operations.ShowFunctionsOperation; import org.apache.flink.table.operations.ShowModulesOperation; @@ -187,6 +188,13 @@ void testShowTables() { assertThat(showTablesOperation.getPreposition()).isNull(); } + @Test + void testShowCreateCatalog() { + Operation operation = parse("show create catalog cat1"); + assertThat(operation).isInstanceOf(ShowCreateCatalogOperation.class); + assertThat(operation.asSummaryString()).isEqualTo("SHOW CREATE CATALOG cat1"); + } + @Test void testShowFullModules() { final String sql = "SHOW FULL MODULES";