Skip to content

Commit

Permalink
Add multipart namespace support to TrinoRestCatalog
Browse files Browse the repository at this point in the history
  • Loading branch information
sftwrdvlpr authored and ebyhr committed Oct 20, 2024
1 parent ff794d2 commit 01c9501
Show file tree
Hide file tree
Showing 6 changed files with 303 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.base.Splitter.MapSplitter;
import com.google.common.base.Suppliers;
Expand Down Expand Up @@ -240,6 +241,7 @@
import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_ROW_ID_NAME;
import static io.trino.plugin.iceberg.IcebergColumnHandle.fileModifiedTimeColumnHandle;
import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnHandle;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
Expand Down Expand Up @@ -916,6 +918,13 @@ public void createSchema(ConnectorSession session, String schemaName, Map<String
public void dropSchema(ConnectorSession session, String schemaName, boolean cascade)
{
if (cascade) {
List<String> nestedNamespaces = getChildNamespaces(session, schemaName);
if (!nestedNamespaces.isEmpty()) {
throw new TrinoException(
ICEBERG_CATALOG_ERROR,
format("Cannot drop non-empty schema: %s, contains %s nested schema(s)", schemaName, Joiner.on(", ").join(nestedNamespaces)));
}

for (SchemaTableName materializedView : listMaterializedViews(session, Optional.of(schemaName))) {
dropMaterializedView(session, materializedView);
}
Expand Down Expand Up @@ -1169,6 +1178,19 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
return newWritableTableHandle(table.getSchemaTableName(), icebergTable, retryMode);
}

private List<String> getChildNamespaces(ConnectorSession session, String parentNamespace)
{
Optional<String> namespaceSeparator = catalog.getNamespaceSeparator();

if (namespaceSeparator.isEmpty()) {
return ImmutableList.of();
}

return catalog.listNamespaces(session).stream()
.filter(namespace -> namespace.startsWith(parentNamespace + namespaceSeparator.get()))
.collect(toImmutableList());
}

private IcebergWritableTableHandle newWritableTableHandle(SchemaTableName name, Table table, RetryMode retryMode)
{
return new IcebergWritableTableHandle(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public interface TrinoCatalog

void dropNamespace(ConnectorSession session, String namespace);

default Optional<String> getNamespaceSeparator()
{
return Optional.empty();
}

Map<String, Object> loadNamespaceMetadata(ConnectorSession session, String namespace);

Optional<TrinoPrincipal> getNamespacePrincipal(ConnectorSession session, String namespace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.iceberg.catalog.rest;

import com.google.common.base.Splitter;
import com.google.common.cache.Cache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -79,6 +80,7 @@
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand All @@ -102,6 +104,7 @@ public class TrinoRestCatalog
private static final Logger log = Logger.get(TrinoRestCatalog.class);

private static final int PER_QUERY_CACHE_SIZE = 1000;
private static final String NAMESPACE_SEPARATOR = ".";

private final RESTSessionCatalog restSessionCatalog;
private final CatalogName catalogName;
Expand Down Expand Up @@ -136,6 +139,12 @@ public TrinoRestCatalog(
this.useUniqueTableLocation = useUniqueTableLocation;
}

@Override
public Optional<String> getNamespaceSeparator()
{
return Optional.of(NAMESPACE_SEPARATOR);
}

@Override
public boolean namespaceExists(ConnectorSession session, String namespace)
{
Expand All @@ -144,17 +153,24 @@ public boolean namespaceExists(ConnectorSession session, String namespace)

@Override
public List<String> listNamespaces(ConnectorSession session)
{
return collectNamespaces(session, parentNamespace);
}

private List<String> collectNamespaces(ConnectorSession session, Namespace parentNamespace)
{
return restSessionCatalog.listNamespaces(convert(session), parentNamespace).stream()
.map(this::toSchemaName)
.flatMap(childNamespace -> Stream.concat(
Stream.of(childNamespace.toString()),
collectNamespaces(session, childNamespace).stream()))
.collect(toImmutableList());
}

@Override
public void dropNamespace(ConnectorSession session, String namespace)
{
try {
restSessionCatalog.dropNamespace(convert(session), Namespace.of(namespace));
restSessionCatalog.dropNamespace(convert(session), toNamespace(namespace));
}
catch (NoSuchNamespaceException e) {
throw new SchemaNotFoundException(namespace);
Expand Down Expand Up @@ -188,7 +204,7 @@ public void createNamespace(ConnectorSession session, String namespace, Map<Stri
{
restSessionCatalog.createNamespace(
convert(session),
Namespace.of(namespace),
toNamespace(namespace),
Maps.transformValues(properties, property -> {
if (property instanceof String stringProperty) {
return stringProperty;
Expand Down Expand Up @@ -439,7 +455,7 @@ public void createView(ConnectorSession session, SchemaTableName schemaViewName,
ViewBuilder viewBuilder = restSessionCatalog.buildView(convert(session), toIdentifier(schemaViewName));
viewBuilder = viewBuilder.withSchema(schema)
.withQuery("trino", definition.getOriginalSql())
.withDefaultNamespace(Namespace.of(schemaViewName.getSchemaName()))
.withDefaultNamespace(toNamespace(schemaViewName.getSchemaName()))
.withDefaultCatalog(definition.getCatalog().orElse(null))
.withProperties(properties.buildOrThrow())
.withLocation(defaultTableLocation(session, schemaViewName));
Expand Down Expand Up @@ -663,10 +679,10 @@ private void invalidateTableCache(SchemaTableName schemaTableName)

private Namespace toNamespace(String schemaName)
{
if (parentNamespace.isEmpty()) {
return Namespace.of(schemaName);
if (!parentNamespace.isEmpty()) {
schemaName = parentNamespace + NAMESPACE_SEPARATOR + schemaName;
}
return Namespace.of(parentNamespace + "." + schemaName);
return Namespace.of(Splitter.on(NAMESPACE_SEPARATOR).omitEmptyStrings().trimResults().splitToList(schemaName).toArray(new String[0]));
}

private String toSchemaName(Namespace namespace)
Expand All @@ -675,15 +691,12 @@ private String toSchemaName(Namespace namespace)
return namespace.toString();
}
return Arrays.stream(namespace.levels(), this.parentNamespace.length(), namespace.length())
.collect(joining("."));
.collect(joining(NAMESPACE_SEPARATOR));
}

private TableIdentifier toIdentifier(SchemaTableName schemaTableName)
{
if (parentNamespace.isEmpty()) {
return TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName());
}
return TableIdentifier.of(Namespace.of(parentNamespace + "." + schemaTableName.getSchemaName()), schemaTableName.getTableName());
return TableIdentifier.of(toNamespace(schemaTableName.getSchemaName()), schemaTableName.getTableName());
}

private List<Namespace> listNamespaces(ConnectorSession session, Optional<String> namespace)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void accept(QueryRunner queryRunner)
String schemaProperties = this.schemaProperties.entrySet().stream()
.map(entry -> entry.getKey() + " = " + entry.getValue())
.collect(Collectors.joining(", ", " WITH ( ", " )"));
queryRunner.execute("CREATE SCHEMA IF NOT EXISTS " + schemaName + (this.schemaProperties.size() > 0 ? schemaProperties : ""));
queryRunner.execute("CREATE SCHEMA IF NOT EXISTS \"" + schemaName + "\"" + (this.schemaProperties.size() > 0 ? schemaProperties : ""));
copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, clonedTpchTables);
}

Expand Down
Loading

0 comments on commit 01c9501

Please sign in to comment.