Skip to content

Commit

Permalink
support alter catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
liyubin117 committed Mar 15, 2024
1 parent 664c0ee commit 08e0df8
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 0 deletions.
2 changes: 2 additions & 0 deletions flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"org.apache.flink.sql.parser.ddl.SqlAddJar"
"org.apache.flink.sql.parser.ddl.SqlAddPartitions"
"org.apache.flink.sql.parser.ddl.SqlAddPartitions.AlterTableAddPartitionContext"
"org.apache.flink.sql.parser.ddl.SqlAlterCatalog"
"org.apache.flink.sql.parser.ddl.SqlAlterDatabase"
"org.apache.flink.sql.parser.ddl.SqlAlterFunction"
"org.apache.flink.sql.parser.ddl.SqlAlterTable"
Expand Down Expand Up @@ -565,6 +566,7 @@
"SqlUseCatalog()"
"SqlShowDatabases()"
"SqlUseDatabase()"
"SqlAlterCatalog()"
"SqlAlterDatabase()"
"SqlDescribeDatabase()"
"SqlAlterFunction()"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,28 @@ SqlDrop SqlDropCatalog(Span s, boolean replace) :
}
}

/**
* Parses a alter catalog statement.
* ALTER CATALOG catalog_name SET (key1=val1, key2=val2, ...);
*/
SqlAlterCatalog SqlAlterCatalog() :
{
SqlParserPos startPos;
SqlIdentifier catalogName;
SqlNodeList propertyList = SqlNodeList.EMPTY;
}
{
<ALTER> <CATALOG> { startPos = getPos(); }
catalogName = SimpleIdentifier()
<SET>
propertyList = TableProperties()
{
return new SqlAlterCatalog(startPos.plus(getPos()),
catalogName,
propertyList);
}
}

/**
* Parse a "Show DATABASES" metadata query command.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.sql.parser.ddl;

import org.apache.flink.sql.parser.SqlUnparseUtils;

import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.ImmutableNullableList;

import java.util.List;

import static java.util.Objects.requireNonNull;

/** ALTER Catalog DDL sql call. */
public class SqlAlterCatalog extends SqlCall {

public static final SqlSpecialOperator OPERATOR =
new SqlSpecialOperator("ALTER CATALOG", SqlKind.OTHER_DDL);

private final SqlIdentifier catalogName;

private final SqlNodeList propertyList;

public SqlAlterCatalog(
SqlParserPos position, SqlIdentifier catalogName, SqlNodeList propertyList) {
super(position);
this.catalogName = requireNonNull(catalogName, "catalogName cannot be null");
this.propertyList = requireNonNull(propertyList, "propertyList cannot be null");
}

@Override
public SqlOperator getOperator() {
return OPERATOR;
}

@Override
public List<SqlNode> getOperandList() {
return ImmutableNullableList.of(catalogName, propertyList);
}

public SqlIdentifier getCatalogName() {
return catalogName;
}

public SqlNodeList getPropertyList() {
return propertyList;
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("ALTER CATALOG");
catalogName.unparse(writer, leftPrec, rightPrec);
writer.keyword("SET");
SqlWriter.Frame withFrame = writer.startList("(", ")");
for (SqlNode property : propertyList) {
SqlUnparseUtils.printIndent(writer);
property.unparse(writer, leftPrec, rightPrec);
}
writer.newlineAndIndent();
writer.endList(withFrame);
}

public String catalogName() {
return catalogName.getSimple();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,49 @@ public void createCatalog(String catalogName, CatalogDescriptor catalogDescripto
catalogStoreHolder.catalogStore().storeCatalog(catalogName, catalogDescriptor);
}

/**
* Alters a catalog under the given name. The catalog name must be unique.
*
* @param catalogName the given catalog name under which to alter the given catalog
* @param catalogDescriptor catalog descriptor for altering catalog
* @throws CatalogException If the catalog neither exists in the catalog store nor in the
* initialized catalogs, or if an error occurs while creating the catalog or storing the
* {@link CatalogDescriptor}
*/
public void alterCatalog(String catalogName, CatalogDescriptor catalogDescriptor)
throws CatalogException {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(catalogName),
"Catalog name cannot be null or empty.");
checkNotNull(catalogDescriptor, "Catalog descriptor cannot be null");
if (catalogStoreHolder.catalogStore().contains(catalogName)
|| catalogs.containsKey(catalogName)) {
Catalog existingCatalog = getCatalogOrError(catalogName);
Catalog catalog = initCatalog(catalogName, catalogDescriptor);
if (existingCatalog.getClass() != catalog.getClass()) {
throw new CatalogException(
String.format(
"Catalog types don't match. Existing catalog is '%s' and new catalog is '%s'.",
existingCatalog.getClass().getName(),
catalog.getClass().getName()));
}
if (catalogs.containsKey(catalogName)) {
catalogs.get(catalogName).close();
}
if (catalogStoreHolder.catalogStore().contains(catalogName)) {
catalogStoreHolder.catalogStore().removeCatalog(catalogName, false);
}
catalog.open();
catalogs.put(catalogName, catalog);
catalogStoreHolder.catalogStore().storeCatalog(catalogName, catalogDescriptor);
} else {
throw new CatalogException(
format(
"Catalog %s neither exists in the catalog store nor in the initialized catalogs.",
catalogName));
}
}

private Catalog initCatalog(String catalogName, CatalogDescriptor catalogDescriptor) {
return FactoryUtil.createCatalog(
catalogName,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations.ddl;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableResultImpl;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.CatalogDescriptor;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.OperationUtils;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;

import static org.apache.flink.util.Preconditions.checkNotNull;

/** Operation to describe a ALTER CATALOG statement. */
@Internal
public class AlterCatalogOperation implements AlterOperation {
private final String catalogName;
private final Map<String, String> properties;

public AlterCatalogOperation(String catalogName, Map<String, String> properties) {
this.catalogName = checkNotNull(catalogName);
this.properties = Collections.unmodifiableMap(checkNotNull(properties));
}

public String getCatalogName() {
return catalogName;
}

public Map<String, String> getProperties() {
return properties;
}

@Override
public String asSummaryString() {
Map<String, Object> params = new LinkedHashMap<>();
params.put("catalogName", catalogName);
params.put("properties", properties);

return OperationUtils.formatWithChildren(
"ALTER CATALOG", params, Collections.emptyList(), Operation::asSummaryString);
}

@Override
public TableResultInternal execute(Context ctx) {
try {
ctx.getCatalogManager()
.alterCatalog(
catalogName,
CatalogDescriptor.of(catalogName, Configuration.fromMap(properties)));

return TableResultImpl.TABLE_RESULT_OK;
} catch (CatalogException e) {
throw new ValidationException(
String.format("Could not execute %s", asSummaryString()), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.operations.converters;

import org.apache.flink.sql.parser.ddl.SqlAlterCatalog;
import org.apache.flink.sql.parser.ddl.SqlTableOption;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogDescriptor;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.AlterCatalogOperation;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/** A converter for {@link SqlAlterCatalog}. */
public class SqlAlterCatalogConverter implements SqlNodeConverter<SqlAlterCatalog> {

@Override
public Operation convertSqlNode(SqlAlterCatalog sqlAlterCatalog, ConvertContext context) {
String catalogName = sqlAlterCatalog.catalogName();
Optional<Catalog> catalog = context.getCatalogManager().getCatalog(catalogName);
final Map<String, String> properties;
if (catalog.isPresent()) {
Optional<CatalogDescriptor> catalogDescriptor =
context.getCatalogManager().getCatalogDescriptor(catalogName);
properties =
new HashMap<>(
catalogDescriptor.isPresent()
? catalogDescriptor.get().getConfiguration().toMap()
: Collections.emptyMap());
} else {
throw new ValidationException(String.format("Catalog %s not exists", catalogName));
}
// set with properties
sqlAlterCatalog
.getPropertyList()
.getList()
.forEach(
p ->
properties.put(
((SqlTableOption) p).getKeyString(),
((SqlTableOption) p).getValueString()));
return new AlterCatalogOperation(catalogName, properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class SqlNodeConverters {
static {
// register all the converters here
register(new SqlCreateCatalogConverter());
register(new SqlAlterCatalogConverter());
register(new SqlCreateViewConverter());
register(new SqlAlterViewRenameConverter());
register(new SqlAlterViewPropertiesConverter());
Expand Down

0 comments on commit 08e0df8

Please sign in to comment.