From 94ccc2b3548f416e2a297ea06c15781f34f460e0 Mon Sep 17 00:00:00 2001 From: Yubin Li Date: Thu, 14 Mar 2024 14:41:08 +0800 Subject: [PATCH] support alter catalog --- .../src/main/codegen/data/Parser.tdd | 1 + .../src/main/codegen/includes/parserImpls.ftl | 22 +++++ .../flink/sql/parser/ddl/SqlAlterCatalog.java | 86 +++++++++++++++++++ .../flink/table/catalog/CatalogManager.java | 43 ++++++++++ .../operations/ddl/AlterCatalogOperation.java | 80 +++++++++++++++++ .../converters/SqlAlterCatalogConverter.java | 64 ++++++++++++++ .../converters/SqlNodeConverters.java | 1 + 7 files changed, 297 insertions(+) create mode 100644 flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterCatalog.java create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOperation.java create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterCatalogConverter.java diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index d88a9c0f4542bc..72b14fb94f2137 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -33,6 +33,7 @@ "org.apache.flink.sql.parser.ddl.SqlAddJar" "org.apache.flink.sql.parser.ddl.SqlAddPartitions" "org.apache.flink.sql.parser.ddl.SqlAddPartitions.AlterTableAddPartitionContext" + "org.apache.flink.sql.parser.ddl.SqlAlterCatalog" "org.apache.flink.sql.parser.ddl.SqlAlterDatabase" "org.apache.flink.sql.parser.ddl.SqlAlterFunction" "org.apache.flink.sql.parser.ddl.SqlAlterTable" diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index dbb1be45b3394c..556efdc3700022 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -141,6 +141,28 @@ SqlDrop SqlDropCatalog(Span s, boolean replace) : } } +/** +* Parses a alter catalog statement. +* ALTER CATALOG catalog_name SET (key1=val1, key2=val2, ...); +*/ +SqlAlterCatalog SqlAlterCatalog() : +{ + SqlParserPos startPos; + SqlIdentifier catalogName; + SqlNodeList propertyList = SqlNodeList.EMPTY; +} +{ + { startPos = getPos(); } + catalogName = SimpleIdentifier() + + propertyList = TableProperties() + { + return new SqlAlterCatalog(startPos.plus(getPos()), + catalogName, + propertyList); + } +} + /** * Parse a "Show DATABASES" metadata query command. */ diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterCatalog.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterCatalog.java new file mode 100644 index 00000000000000..baa59581e68ce6 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterCatalog.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.SqlUnparseUtils; + +import org.apache.calcite.sql.*; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** ALTER CATALOG DDL sql call. */ +public class SqlAlterCatalog extends SqlCreate { + + public static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("ALTER CATALOG", SqlKind.OTHER_DDL); + + private final SqlIdentifier catalogName; + + private final SqlNodeList propertyList; + + public SqlAlterCatalog( + SqlParserPos position, SqlIdentifier catalogName, SqlNodeList propertyList) { + super(OPERATOR, position, false, false); + this.catalogName = requireNonNull(catalogName, "catalogName cannot be null"); + this.propertyList = requireNonNull(propertyList, "propertyList cannot be null"); + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List getOperandList() { + return ImmutableNullableList.of(catalogName, propertyList); + } + + public SqlIdentifier getCatalogName() { + return catalogName; + } + + public SqlNodeList getPropertyList() { + return propertyList; + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("ALTER CATALOG"); + catalogName.unparse(writer, leftPrec, rightPrec); + + if (this.propertyList.size() > 0) { + writer.keyword("SET"); + SqlWriter.Frame withFrame = writer.startList("(", ")"); + for (SqlNode property : propertyList) { + SqlUnparseUtils.printIndent(writer); + property.unparse(writer, leftPrec, rightPrec); + } + writer.newlineAndIndent(); + writer.endList(withFrame); + } + } + + public String catalogName() { + return catalogName.getSimple(); + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index 543105d027a299..73472b0a66041c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -321,6 +321,49 @@ public void createCatalog(String catalogName, CatalogDescriptor catalogDescripto catalogStoreHolder.catalogStore().storeCatalog(catalogName, catalogDescriptor); } + /** + * Alters a catalog under the given name. The catalog name must be unique. + * + * @param catalogName the given catalog name under which to alter the given catalog + * @param catalogDescriptor catalog descriptor for altering catalog + * @throws CatalogException If the catalog neither exists in the catalog store nor in the + * initialized catalogs, or if an error occurs while creating the catalog or storing the + * {@link CatalogDescriptor} + */ + public void alterCatalog(String catalogName, CatalogDescriptor catalogDescriptor) + throws CatalogException { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(catalogName), + "Catalog name cannot be null or empty."); + checkNotNull(catalogDescriptor, "Catalog descriptor cannot be null"); + Catalog existingCatalog = getCatalogOrError(catalogName), catalog; + if (catalogStoreHolder.catalogStore().contains(catalogName) + || catalogs.containsKey(catalogName)) { + catalog = initCatalog(catalogName, catalogDescriptor); + if (existingCatalog.getClass() != catalog.getClass()) { + throw new CatalogException( + String.format( + "Catalog types don't match. Existing catalog is '%s' and new catalog is '%s'.", + existingCatalog.getClass().getName(), + catalog.getClass().getName())); + } + if (catalogs.containsKey(catalogName)) { + catalogs.get(catalogName).close(); + } + if (catalogStoreHolder.catalogStore().contains(catalogName)) { + catalogStoreHolder.catalogStore().removeCatalog(catalogName, false); + } + } else { + throw new CatalogException( + format( + "Catalog %s neither exists in the catalog store nor in the initialized catalogs.", + catalogName)); + } + catalog.open(); + catalogs.put(catalogName, catalog); + catalogStoreHolder.catalogStore().storeCatalog(catalogName, catalogDescriptor); + } + private Catalog initCatalog(String catalogName, CatalogDescriptor catalogDescriptor) { return FactoryUtil.createCatalog( catalogName, diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOperation.java new file mode 100644 index 00000000000000..03f8c510c5856b --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOperation.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.ddl; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.TableResultImpl; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.OperationUtils; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Operation to describe a ALTER CATALOG statement. */ +@Internal +public class AlterCatalogOperation implements AlterOperation { + private final String catalogName; + private final Map properties; + + public AlterCatalogOperation(String catalogName, Map properties) { + this.catalogName = checkNotNull(catalogName); + this.properties = Collections.unmodifiableMap(checkNotNull(properties)); + } + + public String getCatalogName() { + return catalogName; + } + + public Map getProperties() { + return properties; + } + + @Override + public String asSummaryString() { + Map params = new LinkedHashMap<>(); + params.put("catalogName", catalogName); + params.put("properties", properties); + + return OperationUtils.formatWithChildren( + "ALTER CATALOG", params, Collections.emptyList(), Operation::asSummaryString); + } + + @Override + public TableResultInternal execute(Context ctx) { + try { + ctx.getCatalogManager() + .alterCatalog( + catalogName, + CatalogDescriptor.of(catalogName, Configuration.fromMap(properties))); + + return TableResultImpl.TABLE_RESULT_OK; + } catch (CatalogException e) { + throw new ValidationException( + String.format("Could not execute %s", asSummaryString()), e); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterCatalogConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterCatalogConverter.java new file mode 100644 index 00000000000000..b2d1a2130013af --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterCatalogConverter.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlAlterCatalog; +import org.apache.flink.sql.parser.ddl.SqlTableOption; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ddl.AlterCatalogOperation; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** A converter for {@link SqlAlterCatalog}. */ +public class SqlAlterCatalogConverter implements SqlNodeConverter { + + @Override + public Operation convertSqlNode(SqlAlterCatalog sqlAlterCatalog, ConvertContext context) { + String catalogName = sqlAlterCatalog.catalogName(); + Optional catalog = context.getCatalogManager().getCatalog(catalogName); + final Map properties; + if (catalog.isPresent()) { + Optional catalogDescriptor = + context.getCatalogManager().getCatalogDescriptor(catalogName); + properties = + new HashMap<>( + catalogDescriptor.isPresent() + ? catalogDescriptor.get().getConfiguration().toMap() + : Collections.emptyMap()); + } else { + throw new ValidationException(String.format("Catalog %s not exists", catalogName)); + } + // set with properties + sqlAlterCatalog + .getPropertyList() + .getList() + .forEach( + p -> + properties.put( + ((SqlTableOption) p).getKeyString(), + ((SqlTableOption) p).getValueString())); + return new AlterCatalogOperation(catalogName, properties); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java index 78356d16c7d18d..e263ddb8788d45 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java @@ -39,6 +39,7 @@ public class SqlNodeConverters { static { // register all the converters here register(new SqlCreateCatalogConverter()); + register(new SqlAlterCatalogConverter()); register(new SqlCreateViewConverter()); register(new SqlAlterViewRenameConverter()); register(new SqlAlterViewPropertiesConverter());