Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
liyubin117 committed Mar 14, 2024
1 parent 44f9144 commit a3f586e
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,28 @@ SqlDrop SqlDropCatalog(Span s, boolean replace) :
}
}

/**
* Parses a alter catalog statement.
* ALTER CATALOG catalog_name SET (key1=val1, key2=val2, ...);
*/
SqlAlterCatalog SqlAlterCatalog() :
{
SqlParserPos startPos;
SqlIdentifier catalogName;
SqlNodeList propertyList = SqlNodeList.EMPTY;
}
{
<ALTER> <CATALOG> { startPos = getPos(); }
catalogName = SimpleIdentifier()
<SET>
propertyList = TableProperties()
{
return new SqlAlterCatalog(startPos.plus(getPos()),
catalogName,
propertyList);
}
}

/**
* Parse a "Show DATABASES" metadata query command.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.sql.parser.ddl;

import org.apache.flink.sql.parser.SqlUnparseUtils;

import org.apache.calcite.sql.*;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.ImmutableNullableList;

import java.util.List;

import static java.util.Objects.requireNonNull;

/** ALTER CATALOG DDL sql call. */
public class SqlAlterCatalog extends SqlCreate {

public static final SqlSpecialOperator OPERATOR =
new SqlSpecialOperator("ALTER CATALOG", SqlKind.OTHER_DDL);

private final SqlIdentifier catalogName;

private final SqlNodeList propertyList;

public SqlAlterCatalog(
SqlParserPos position, SqlIdentifier catalogName, SqlNodeList propertyList) {
super(OPERATOR, position, false, false);
this.catalogName = requireNonNull(catalogName, "catalogName cannot be null");
this.propertyList = requireNonNull(propertyList, "propertyList cannot be null");
}

@Override
public SqlOperator getOperator() {
return OPERATOR;
}

@Override
public List<SqlNode> getOperandList() {
return ImmutableNullableList.of(catalogName, propertyList);
}

public SqlIdentifier getCatalogName() {
return catalogName;
}

public SqlNodeList getPropertyList() {
return propertyList;
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("ALTER CATALOG");
catalogName.unparse(writer, leftPrec, rightPrec);

if (this.propertyList.size() > 0) {
writer.keyword("SET");
SqlWriter.Frame withFrame = writer.startList("(", ")");
for (SqlNode property : propertyList) {
SqlUnparseUtils.printIndent(writer);
property.unparse(writer, leftPrec, rightPrec);
}
writer.newlineAndIndent();
writer.endList(withFrame);
}
}

public String catalogName() {
return catalogName.getSimple();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,51 @@ public void createCatalog(String catalogName, CatalogDescriptor catalogDescripto
catalogStoreHolder.catalogStore().storeCatalog(catalogName, catalogDescriptor);
}

/**
* Alters a catalog under the given name. The catalog name must be unique.
*
* @param catalogName the given catalog name under which to alter the given catalog
* @param catalogDescriptor catalog descriptor for altering catalog
* @throws CatalogException If the catalog neither exists in the catalog store nor in the
* initialized catalogs, or if an error occurs while creating the catalog or storing the
* {@link CatalogDescriptor}
*/
public void alterCatalog(String catalogName, CatalogDescriptor catalogDescriptor)
throws CatalogException {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(catalogName),
"Catalog name cannot be null or empty.");
checkNotNull(catalogDescriptor, "Catalog descriptor cannot be null");
Catalog existingCatalog = getCatalogOrError(catalogName), catalog;
if (catalogStoreHolder.catalogStore().contains(catalogName)
|| catalogs.containsKey(catalogName)) {
catalog = initCatalog(catalogName, catalogDescriptor);
if (existingCatalog.getClass() != catalog.getClass()) {
throw new CatalogException(
String.format(
"Catalog types don't match. Existing catalog is '%s' and new catalog is '%s'.",
existingCatalog.getClass().getName(),
catalog.getClass().getName()));
}
if (catalogs.containsKey(catalogName)) {
Catalog removed = catalogs.remove(catalogName);
removed.close();
}
if (catalogStoreHolder.catalogStore().contains(catalogName)) {
catalogStoreHolder.catalogStore().removeCatalog(catalogName, false);
}
} else {
throw new CatalogException(
format(
"Catalog %s neither exists in the catalog store nor in the initialized catalogs.",
catalogName));
}
catalog.open();
catalogs.put(catalogName, catalog);

catalogStoreHolder.catalogStore().storeCatalog(catalogName, catalogDescriptor);
}

private Catalog initCatalog(String catalogName, CatalogDescriptor catalogDescriptor) {
return FactoryUtil.createCatalog(
catalogName,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations.ddl;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableResultImpl;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.CatalogDescriptor;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.OperationUtils;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;

import static org.apache.flink.util.Preconditions.checkNotNull;

/** Operation to describe a ALTER CATALOG statement. */
@Internal
public class AlterCatalogOperation implements AlterOperation {
private final String catalogName;
private final Map<String, String> properties;

public AlterCatalogOperation(String catalogName, Map<String, String> properties) {
this.catalogName = checkNotNull(catalogName);
this.properties = Collections.unmodifiableMap(checkNotNull(properties));
}

public String getCatalogName() {
return catalogName;
}

public Map<String, String> getProperties() {
return properties;
}

@Override
public String asSummaryString() {
Map<String, Object> params = new LinkedHashMap<>();
params.put("catalogName", catalogName);
params.put("properties", properties);

return OperationUtils.formatWithChildren(
"ALTER CATALOG", params, Collections.emptyList(), Operation::asSummaryString);
}

@Override
public TableResultInternal execute(Context ctx) {
try {
ctx.getCatalogManager()
.alterCatalog(
catalogName,
CatalogDescriptor.of(catalogName, Configuration.fromMap(properties)));

return TableResultImpl.TABLE_RESULT_OK;
} catch (CatalogException e) {
throw new ValidationException(
String.format("Could not execute %s", asSummaryString()), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.operations.converters;

import org.apache.flink.sql.parser.ddl.SqlAlterCatalog;
import org.apache.flink.sql.parser.ddl.SqlCreateCatalog;
import org.apache.flink.sql.parser.ddl.SqlTableOption;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogDescriptor;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.AlterCatalogOperation;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/** A converter for {@link SqlAlterCatalog}. */
public class SqlAlterCatalogConverter implements SqlNodeConverter<SqlAlterCatalog> {

@Override
public Operation convertSqlNode(SqlAlterCatalog sqlAlterCatalog, ConvertContext context) {
String catalogName = sqlAlterCatalog.catalogName();
Optional<Catalog> catalog = context.getCatalogManager().getCatalog(catalogName);
final Map<String, String> properties;
if (catalog.isPresent()) {
Optional<CatalogDescriptor> catalogDescriptor =
context.getCatalogManager().getCatalogDescriptor(catalogName);
properties =
new HashMap<>(
catalogDescriptor.isPresent()
? catalogDescriptor.get().getConfiguration().toMap()
: Collections.emptyMap());
} else {
throw new ValidationException(String.format("Catalog %s not exists", catalogName));
}
// set with properties
sqlAlterCatalog
.getPropertyList()
.getList()
.forEach(
p ->
properties.put(
((SqlTableOption) p).getKeyString(),
((SqlTableOption) p).getValueString()));
return new AlterCatalogOperation(catalogName, properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class SqlNodeConverters {
static {
// register all the converters here
register(new SqlCreateCatalogConverter());
register(new SqlAlterCatalogConverter());
register(new SqlCreateViewConverter());
register(new SqlAlterViewRenameConverter());
register(new SqlAlterViewPropertiesConverter());
Expand Down

0 comments on commit a3f586e

Please sign in to comment.