diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 8566f700f0a02..f04de6ff40416 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.iceberg; +import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.base.Splitter.MapSplitter; import com.google.common.base.Suppliers; @@ -240,6 +241,7 @@ import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_ROW_ID_NAME; import static io.trino.plugin.iceberg.IcebergColumnHandle.fileModifiedTimeColumnHandle; import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnHandle; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; @@ -916,6 +918,13 @@ public void createSchema(ConnectorSession session, String schemaName, Map nestedNamespaces = getChildNamespaces(session, schemaName); + if (!nestedNamespaces.isEmpty()) { + throw new TrinoException( + ICEBERG_CATALOG_ERROR, + format("Cannot drop non-empty schema: %s, contains %s nested schema(s)", schemaName, Joiner.on(", ").join(nestedNamespaces))); + } + for (SchemaTableName materializedView : listMaterializedViews(session, Optional.of(schemaName))) { dropMaterializedView(session, materializedView); } @@ -1169,6 +1178,19 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto return newWritableTableHandle(table.getSchemaTableName(), icebergTable, retryMode); } + private List getChildNamespaces(ConnectorSession session, String parentNamespace) + { + Optional namespaceSeparator = catalog.getNamespaceSeparator(); + + if (namespaceSeparator.isEmpty()) { + return ImmutableList.of(); + } + + return catalog.listNamespaces(session).stream() + .filter(namespace -> namespace.startsWith(parentNamespace + namespaceSeparator.get())) + .collect(toImmutableList()); + } + private IcebergWritableTableHandle newWritableTableHandle(SchemaTableName name, Table table, RetryMode retryMode) { return new IcebergWritableTableHandle( diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java index 96cd7822c2e2f..005760040b1bd 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java @@ -66,6 +66,11 @@ public interface TrinoCatalog void dropNamespace(ConnectorSession session, String namespace); + default Optional getNamespaceSeparator() + { + return Optional.empty(); + } + Map loadNamespaceMetadata(ConnectorSession session, String namespace); Optional getNamespacePrincipal(ConnectorSession session, String namespace); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java index 5f4ea2c28e86b..11fcf055ca477 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.iceberg.catalog.rest; +import com.google.common.base.Splitter; import com.google.common.cache.Cache; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -79,6 +80,7 @@ import java.util.function.Predicate; import java.util.function.Supplier; import java.util.function.UnaryOperator; +import java.util.stream.Stream; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -102,6 +104,7 @@ public class TrinoRestCatalog private static final Logger log = Logger.get(TrinoRestCatalog.class); private static final int PER_QUERY_CACHE_SIZE = 1000; + private static final String NAMESPACE_SEPARATOR = "."; private final RESTSessionCatalog restSessionCatalog; private final CatalogName catalogName; @@ -136,6 +139,12 @@ public TrinoRestCatalog( this.useUniqueTableLocation = useUniqueTableLocation; } + @Override + public Optional getNamespaceSeparator() + { + return Optional.of(NAMESPACE_SEPARATOR); + } + @Override public boolean namespaceExists(ConnectorSession session, String namespace) { @@ -144,9 +153,16 @@ public boolean namespaceExists(ConnectorSession session, String namespace) @Override public List listNamespaces(ConnectorSession session) + { + return collectNamespaces(session, parentNamespace); + } + + private List collectNamespaces(ConnectorSession session, Namespace parentNamespace) { return restSessionCatalog.listNamespaces(convert(session), parentNamespace).stream() - .map(this::toSchemaName) + .flatMap(childNamespace -> Stream.concat( + Stream.of(childNamespace.toString()), + collectNamespaces(session, childNamespace).stream())) .collect(toImmutableList()); } @@ -154,7 +170,7 @@ public List listNamespaces(ConnectorSession session) public void dropNamespace(ConnectorSession session, String namespace) { try { - restSessionCatalog.dropNamespace(convert(session), Namespace.of(namespace)); + restSessionCatalog.dropNamespace(convert(session), toNamespace(namespace)); } catch (NoSuchNamespaceException e) { throw new SchemaNotFoundException(namespace); @@ -188,7 +204,7 @@ public void createNamespace(ConnectorSession session, String namespace, Map { if (property instanceof String stringProperty) { return stringProperty; @@ -439,7 +455,7 @@ public void createView(ConnectorSession session, SchemaTableName schemaViewName, ViewBuilder viewBuilder = restSessionCatalog.buildView(convert(session), toIdentifier(schemaViewName)); viewBuilder = viewBuilder.withSchema(schema) .withQuery("trino", definition.getOriginalSql()) - .withDefaultNamespace(Namespace.of(schemaViewName.getSchemaName())) + .withDefaultNamespace(toNamespace(schemaViewName.getSchemaName())) .withDefaultCatalog(definition.getCatalog().orElse(null)) .withProperties(properties.buildOrThrow()) .withLocation(defaultTableLocation(session, schemaViewName)); @@ -663,10 +679,10 @@ private void invalidateTableCache(SchemaTableName schemaTableName) private Namespace toNamespace(String schemaName) { - if (parentNamespace.isEmpty()) { - return Namespace.of(schemaName); + if (!parentNamespace.isEmpty()) { + schemaName = parentNamespace + NAMESPACE_SEPARATOR + schemaName; } - return Namespace.of(parentNamespace + "." + schemaName); + return Namespace.of(Splitter.on(NAMESPACE_SEPARATOR).omitEmptyStrings().trimResults().splitToList(schemaName).toArray(new String[0])); } private String toSchemaName(Namespace namespace) @@ -675,15 +691,12 @@ private String toSchemaName(Namespace namespace) return namespace.toString(); } return Arrays.stream(namespace.levels(), this.parentNamespace.length(), namespace.length()) - .collect(joining(".")); + .collect(joining(NAMESPACE_SEPARATOR)); } private TableIdentifier toIdentifier(SchemaTableName schemaTableName) { - if (parentNamespace.isEmpty()) { - return TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName()); - } - return TableIdentifier.of(Namespace.of(parentNamespace + "." + schemaTableName.getSchemaName()), schemaTableName.getTableName()); + return TableIdentifier.of(toNamespace(schemaTableName.getSchemaName()), schemaTableName.getTableName()); } private List listNamespaces(ConnectorSession session, Optional namespace) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/SchemaInitializer.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/SchemaInitializer.java index c3c005ec0111d..0000e931da1cd 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/SchemaInitializer.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/SchemaInitializer.java @@ -51,7 +51,7 @@ public void accept(QueryRunner queryRunner) String schemaProperties = this.schemaProperties.entrySet().stream() .map(entry -> entry.getKey() + " = " + entry.getValue()) .collect(Collectors.joining(", ", " WITH ( ", " )")); - queryRunner.execute("CREATE SCHEMA IF NOT EXISTS " + schemaName + (this.schemaProperties.size() > 0 ? schemaProperties : "")); + queryRunner.execute("CREATE SCHEMA IF NOT EXISTS \"" + schemaName + "\"" + (this.schemaProperties.size() > 0 ? schemaProperties : "")); copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, clonedTpchTables); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogNestedNamespaceConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogNestedNamespaceConnectorSmokeTest.java new file mode 100644 index 0000000000000..cccfeec7bb1f2 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogNestedNamespaceConnectorSmokeTest.java @@ -0,0 +1,249 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import io.airlift.http.server.testing.TestingHttpServer; +import io.trino.filesystem.Location; +import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.IcebergQueryRunner; +import io.trino.plugin.iceberg.SchemaInitializer; +import io.trino.testing.QueryRunner; +import io.trino.testing.TestingConnectorBehavior; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.jdbc.JdbcCatalog; +import org.apache.iceberg.rest.DelegatingRestSessionCatalog; +import org.assertj.core.util.Files; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Path; +import java.util.Optional; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.trino.plugin.iceberg.IcebergTestUtils.checkOrcFileSorting; +import static io.trino.plugin.iceberg.IcebergTestUtils.checkParquetFileSorting; +import static io.trino.plugin.iceberg.catalog.rest.RestCatalogTestUtils.backendCatalog; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static java.lang.String.format; +import static org.apache.iceberg.FileFormat.PARQUET; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +final class TestIcebergRestCatalogNestedNamespaceConnectorSmokeTest + extends BaseIcebergConnectorSmokeTest +{ + private File warehouseLocation; + private JdbcCatalog backend; + + public TestIcebergRestCatalogNestedNamespaceConnectorSmokeTest() + { + super(new IcebergConfig().getFileFormat().toIceberg()); + } + + @Override + protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) + { + return switch (connectorBehavior) { + case SUPPORTS_CREATE_MATERIALIZED_VIEW -> false; + default -> super.hasBehavior(connectorBehavior); + }; + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + warehouseLocation = Files.newTemporaryFolder(); + closeAfterClass(() -> deleteRecursively(warehouseLocation.toPath(), ALLOW_INSECURE)); + + backend = closeAfterClass((JdbcCatalog) backendCatalog(warehouseLocation)); + + DelegatingRestSessionCatalog delegatingCatalog = DelegatingRestSessionCatalog.builder() + .delegate(backend) + .build(); + + TestingHttpServer testServer = delegatingCatalog.testServer(); + testServer.start(); + closeAfterClass(testServer::stop); + + return IcebergQueryRunner.builder("level_1.level_2") + .setBaseDataDir(Optional.of(warehouseLocation.toPath())) + .addIcebergProperty("iceberg.file-format", format.name()) + .addIcebergProperty("iceberg.catalog.type", "rest") + .addIcebergProperty("iceberg.rest-catalog.uri", testServer.getBaseUrl().toString()) + .addIcebergProperty("iceberg.register-table-procedure.enabled", "true") + .addIcebergProperty("iceberg.writer-sort-buffer-size", "1MB") + .setSchemaInitializer(SchemaInitializer.builder() + .withSchemaName("level_1.level_2") + .withClonedTpchTables(REQUIRED_TPCH_TABLES) + .build()) + .build(); + } + + @Test + void testDropNestedSchemaCascade() + { + String rootSchemaName = "test_root" + randomNameSuffix(); + assertUpdate("CREATE SCHEMA " + rootSchemaName); + assertUpdate("CREATE SCHEMA \"" + rootSchemaName + ".test_nested\""); + + assertQueryFails("DROP SCHEMA " + rootSchemaName + " CASCADE", "Cannot drop non-empty schema: .*"); + + assertUpdate("DROP SCHEMA \"" + rootSchemaName + ".test_nested\" CASCADE"); + assertUpdate("DROP SCHEMA " + rootSchemaName); + } + + @Test + @Override // Override because the schema name requires double quotes + public void testShowCreateTable() + { + String schemaName = getSession().getSchema().orElseThrow(); + assertThat((String) computeScalar("SHOW CREATE TABLE region")) + .matches("" + + "CREATE TABLE iceberg.\"" + schemaName + "\".region \\(\n" + + " regionkey bigint,\n" + + " name varchar,\n" + + " comment varchar\n" + + "\\)\n" + + "WITH \\(\n" + + " format = '" + format.name() + "',\n" + + " format_version = 2,\n" + + format(" location = '.*/" + schemaName + "/region.*'\n") + + "\\)"); + } + + @Test + @Override // Override because the schema name requires double quotes + public void testView() + { + String viewName = "test_view_" + randomNameSuffix(); + assertUpdate("CREATE VIEW " + viewName + " AS SELECT * FROM nation"); + + assertThat(query("SELECT * FROM " + viewName)) + .skippingTypesCheck() + .matches("SELECT * FROM nation"); + + assertThat((String) computeScalar("SHOW CREATE VIEW " + viewName)) + .isEqualTo(""" + CREATE VIEW iceberg."level_1.level_2".%s SECURITY DEFINER AS + SELECT * + FROM + nation""".formatted(viewName)); + + assertUpdate("DROP VIEW " + viewName); + } + + @Test + @Override + public void testRenameSchema() + { + assertThatThrownBy(super::testRenameSchema) + .hasMessageContaining("renameNamespace is not supported for Iceberg REST catalog"); + } + + @Test + @Override + public void testMaterializedView() + { + assertThatThrownBy(super::testMaterializedView) + .hasMessageContaining("createMaterializedView is not supported for Iceberg REST catalog"); + } + + @Test + @Override + public void testDropTableWithMissingMetadataFile() + { + assertThatThrownBy(super::testDropTableWithMissingMetadataFile) + .hasMessageMatching("Failed to load table: (.*)"); + } + + @Test + @Override + public void testDropTableWithMissingSnapshotFile() + { + assertThatThrownBy(super::testDropTableWithMissingSnapshotFile) + .hasMessageMatching("Server error: NotFoundException: Failed to open input stream for file: (.*)"); + } + + @Test + @Override + public void testDropTableWithMissingManifestListFile() + { + assertThatThrownBy(super::testDropTableWithMissingManifestListFile) + .hasMessageContaining("Table location should not exist"); + } + + @Test + @Override + public void testDropTableWithNonExistentTableLocation() + { + assertThatThrownBy(super::testDropTableWithNonExistentTableLocation) + .hasMessageMatching("Failed to load table: (.*)"); + } + + @Override + protected boolean isFileSorted(Location path, String sortColumnName) + { + if (format == PARQUET) { + return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); + } + return checkOrcFileSorting(fileSystem, path, sortColumnName); + } + + @Override + protected void deleteDirectory(String location) + { + try { + deleteRecursively(Path.of(location), ALLOW_INSECURE); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + protected void dropTableFromMetastore(String tableName) + { + backend.dropTable(toIdentifier(tableName), false); + } + + @Override + protected String getMetadataLocation(String tableName) + { + BaseTable table = (BaseTable) backend.loadTable(toIdentifier(tableName)); + return table.operations().current().metadataFileLocation(); + } + + @Override + protected String schemaPath() + { + return format("%s/%s", warehouseLocation, getSession().getSchema()); + } + + @Override + protected boolean locationExists(String location) + { + return java.nio.file.Files.exists(Path.of(location)); + } + + private TableIdentifier toIdentifier(String tableName) + { + return TableIdentifier.of(getSession().getSchema().orElseThrow(), tableName); + } +} diff --git a/plugin/trino-iceberg/src/test/java/org/apache/iceberg/rest/DelegatingRestSessionCatalog.java b/plugin/trino-iceberg/src/test/java/org/apache/iceberg/rest/DelegatingRestSessionCatalog.java index 94dd598432531..54a65a2da1da1 100644 --- a/plugin/trino-iceberg/src/test/java/org/apache/iceberg/rest/DelegatingRestSessionCatalog.java +++ b/plugin/trino-iceberg/src/test/java/org/apache/iceberg/rest/DelegatingRestSessionCatalog.java @@ -66,7 +66,7 @@ public TestingHttpServer testServer() HttpServerInfo httpServerInfo = new HttpServerInfo(config, nodeInfo); RestCatalogServlet servlet = new RestCatalogServlet(adapter); - return new TestingHttpServer(httpServerInfo, nodeInfo, config, servlet); + return new TestingHttpServer(httpServerInfo, nodeInfo, config, servlet, false, true, false); } public static Builder builder()