Skip to content

Commit

Permalink
[FLINK-24939][table] Introduce "SHOW CREATE CATALOG" Syntax
Browse files Browse the repository at this point in the history
  • Loading branch information
liyubin117 committed Mar 13, 2024
1 parent 7d0111d commit b2d161b
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -686,3 +686,19 @@ show tables from db1 like 'p_r%';
+------------+
1 row in set
!ok

# ==========================================================================
# test show create catalog
# ==========================================================================

show create catalog catalog1;
+--------------------------------------------------------------------+
| result |
+--------------------------------------------------------------------+
| CREATE CATALOG `catalog1` WITH (
'type' = 'generic_in_memory'
)
|
+--------------------------------------------------------------------+
1 row in set
!ok
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
"org.apache.flink.sql.parser.dql.SqlShowCreate"
"org.apache.flink.sql.parser.dql.SqlShowCreateTable"
"org.apache.flink.sql.parser.dql.SqlShowCreateView"
"org.apache.flink.sql.parser.dql.SqlShowCreateCatalog"
"org.apache.flink.sql.parser.dql.SqlShowViews"
"org.apache.flink.sql.parser.dql.SqlRichDescribeTable"
"org.apache.flink.sql.parser.dql.SqlUnloadModule"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,8 @@ SqlShowViews SqlShowViews() :
}

/**
* SHOW TABLES FROM [catalog.] database sql call.
* Parses a show tables statement.
* SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE pattern ];
*/
SqlShowTables SqlShowTables() :
{
Expand Down Expand Up @@ -653,7 +654,7 @@ SqlShowColumns SqlShowColumns() :
}

/**
* Parse a "Show Create Table" query and "Show Create View" query commands.
* Parse a "Show Create Table" query and "Show Create View" and "Show Create Catalog" query commands.
*/
SqlShowCreate SqlShowCreate() :
{
Expand All @@ -676,6 +677,13 @@ SqlShowCreate SqlShowCreate() :
{
return new SqlShowCreateView(pos, sqlIdentifier);
}
|
<CATALOG>
{ pos = getPos(); }
sqlIdentifier = CompoundIdentifier()
{
return new SqlShowCreateCatalog(pos, sqlIdentifier);
}
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.sql.parser.dql;

import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;

import java.util.Collections;
import java.util.List;
import java.util.Objects;

/** SHOW CREATE CATALOG sql call. */
public class SqlShowCreateCatalog extends SqlShowCreate {

public static final SqlSpecialOperator OPERATOR =
new SqlSpecialOperator("SHOW CREATE CATALOG", SqlKind.OTHER_DDL);

protected final SqlIdentifier catalogName;

public SqlShowCreateCatalog(SqlParserPos pos, SqlIdentifier catalogName) {
super(pos, catalogName);
this.catalogName = catalogName;
}

public String[] getCatalogName() {
return Objects.isNull(this.catalogName)
? new String[] {}
: catalogName.names.toArray(new String[0]);
}

@Override
public SqlOperator getOperator() {
return OPERATOR;
}

@Override
public List<SqlNode> getOperandList() {
return Collections.singletonList(sqlIdentifier);
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("SHOW CREATE CATALOG");
sqlIdentifier.unparse(writer, leftPrec, rightPrec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ void testCreateCatalog() {
+ ")");
}

@Test
void testShowCreateCatalog() {
sql("show create catalog c1").ok("SHOW CREATE CATALOG `C1`");
}

@Test
void testDropCatalog() {
sql("drop catalog c1").ok("DROP CATALOG `C1`");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDescriptor;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
Expand All @@ -33,10 +34,13 @@

import org.apache.commons.lang3.StringUtils;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.flink.table.utils.EncodingUtils.escapeIdentifier;

/** SHOW CREATE statement Util. */
@Internal
public class ShowCreateUtil {
Expand Down Expand Up @@ -72,7 +76,7 @@ public static String buildShowCreateTableRow(
sb.append("PARTITIONED BY (")
.append(partitionedInfoFormatted)
.append(")\n"));
extractFormattedOptions(table, printIndent)
extractFormattedOptions(table.getOptions(), printIndent)
.ifPresent(v -> sb.append("WITH (\n").append(v).append("\n)\n"));
return sb.toString();
}
Expand Down Expand Up @@ -111,6 +115,18 @@ public static String buildShowCreateViewRow(
return stringBuilder.toString();
}

public static String buildShowCreateCatalogRow(CatalogDescriptor catalogDescriptor) {
final String printIndent = " ";
return String.format(
"CREATE CATALOG %s WITH (%s%s%s)%s",
escapeIdentifier(catalogDescriptor.getCatalogName()),
System.lineSeparator(),
extractFormattedOptions(catalogDescriptor.getConfiguration().toMap(), printIndent)
.orElse(""),
System.lineSeparator(),
System.lineSeparator());
}

static String buildCreateFormattedPrefix(
String tableType, boolean isTemporary, ObjectIdentifier identifier) {
return String.format(
Expand Down Expand Up @@ -213,21 +229,20 @@ static Optional<String> extractFormattedPartitionedInfo(ResolvedCatalogTable cat
.collect(Collectors.joining(", ")));
}

static Optional<String> extractFormattedOptions(
ResolvedCatalogBaseTable<?> table, String printIndent) {
if (Objects.isNull(table.getOptions()) || table.getOptions().isEmpty()) {
static Optional<String> extractFormattedOptions(Map<String, String> conf, String printIndent) {
if (Objects.isNull(conf) || conf.isEmpty()) {
return Optional.empty();
}
return Optional.of(
table.getOptions().entrySet().stream()
conf.entrySet().stream()
.map(
entry ->
String.format(
"%s'%s' = '%s'",
printIndent,
EncodingUtils.escapeSingleQuotes(entry.getKey()),
EncodingUtils.escapeSingleQuotes(entry.getValue())))
.collect(Collectors.joining(",\n")));
.collect(Collectors.joining("," + System.lineSeparator())));
}

static String extractFormattedColumnNames(ResolvedCatalogBaseTable<?> baseTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ public List<CatalogModificationListener> getCatalogModificationListeners() {
return catalogModificationListeners;
}

public CatalogStore getCatalogStore() {
return catalogStoreHolder.catalogStore();
}

public static Builder newBuilder() {
return new Builder();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.ShowCreateUtil;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.CatalogDescriptor;

import static org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** Operation to describe a SHOW CREATE CATALOG statement. */
@Internal
public class ShowCreateCatalogOperation implements ShowOperation {

private final String catalogName;

public ShowCreateCatalogOperation(String catalogName) {
this.catalogName = checkNotNull(catalogName, "Catalog name must not be null");
}

public String getCatalogName() {
return catalogName;
}

@Override
public String asSummaryString() {
return String.format("SHOW CREATE CATALOG %s", catalogName);
}

@Override
public TableResultInternal execute(Context ctx) {
CatalogDescriptor catalogDescriptor =
ctx.getCatalogManager()
.getCatalogStore()
.getCatalog(catalogName)
.orElseThrow(
() ->
new ValidationException(
String.format(
"Cannot obtain metadata from Catalog %s.",
catalogName)));
String resultRow = ShowCreateUtil.buildShowCreateCatalogRow(catalogDescriptor);

return buildStringArrayResult("result", new String[] {resultRow});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.flink.sql.parser.dql.SqlRichExplain;
import org.apache.flink.sql.parser.dql.SqlShowCatalogs;
import org.apache.flink.sql.parser.dql.SqlShowColumns;
import org.apache.flink.sql.parser.dql.SqlShowCreateCatalog;
import org.apache.flink.sql.parser.dql.SqlShowCreateTable;
import org.apache.flink.sql.parser.dql.SqlShowCreateView;
import org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog;
Expand Down Expand Up @@ -126,6 +127,7 @@
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.ShowCatalogsOperation;
import org.apache.flink.table.operations.ShowColumnsOperation;
import org.apache.flink.table.operations.ShowCreateCatalogOperation;
import org.apache.flink.table.operations.ShowCreateTableOperation;
import org.apache.flink.table.operations.ShowCreateViewOperation;
import org.apache.flink.table.operations.ShowCurrentCatalogOperation;
Expand Down Expand Up @@ -333,6 +335,9 @@ private static Optional<Operation> convertValidatedSqlNode(
return Optional.of(converter.convertShowCreateTable((SqlShowCreateTable) validated));
} else if (validated instanceof SqlShowCreateView) {
return Optional.of(converter.convertShowCreateView((SqlShowCreateView) validated));
} else if (validated instanceof SqlShowCreateCatalog) {
return Optional.of(
converter.convertShowCreateCatalog((SqlShowCreateCatalog) validated));
} else if (validated instanceof SqlRichExplain) {
return Optional.of(converter.convertRichExplain((SqlRichExplain) validated));
} else if (validated instanceof SqlRichDescribeTable) {
Expand Down Expand Up @@ -948,6 +953,19 @@ private Operation convertShowCreateTable(SqlShowCreateTable sqlShowCreateTable)
return new ShowCreateTableOperation(identifier);
}

/** Convert SHOW CREATE CATALOG statement. */
private Operation convertShowCreateCatalog(SqlShowCreateCatalog sqlShowCreateCatalog) {
String[] parsedCatalogName = sqlShowCreateCatalog.getCatalogName();
if (parsedCatalogName.length == 1) {
return new ShowCreateCatalogOperation(parsedCatalogName[0]);
} else {
throw new ValidationException(
String.format(
"show create catalog identifier [ %s ] format error",
String.join(".", parsedCatalogName)));
}
}

/** Convert SHOW CREATE VIEW statement. */
private Operation convertShowCreateView(SqlShowCreateView sqlShowCreateView) {
UnresolvedIdentifier unresolvedIdentifier =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.operations.LoadModuleOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ShowCreateCatalogOperation;
import org.apache.flink.table.operations.ShowDatabasesOperation;
import org.apache.flink.table.operations.ShowFunctionsOperation;
import org.apache.flink.table.operations.ShowModulesOperation;
Expand Down Expand Up @@ -187,6 +188,13 @@ void testShowTables() {
assertThat(showTablesOperation.getPreposition()).isNull();
}

@Test
void testShowCreateCatalog() {
Operation operation = parse("show create catalog cat1");
assertThat(operation).isInstanceOf(ShowCreateCatalogOperation.class);
assertThat(operation.asSummaryString()).isEqualTo("SHOW CREATE CATALOG cat1");
}

@Test
void testShowFullModules() {
final String sql = "SHOW FULL MODULES";
Expand Down

0 comments on commit b2d161b

Please sign in to comment.