Skip to content

Commit

Permalink
Add multipart namespace support to TrinoRestCatalog
Browse files Browse the repository at this point in the history
  • Loading branch information
sftwrdvlpr committed Oct 9, 2024
1 parent 5741265 commit 3b2657a
Show file tree
Hide file tree
Showing 9 changed files with 435 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.base.Splitter.MapSplitter;
import com.google.common.base.Suppliers;
Expand Down Expand Up @@ -222,6 +223,7 @@
import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_ROW_ID_NAME;
import static io.trino.plugin.iceberg.IcebergColumnHandle.fileModifiedTimeColumnHandle;
import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnHandle;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
Expand Down Expand Up @@ -880,6 +882,14 @@ public void createSchema(ConnectorSession session, String schemaName, Map<String
public void dropSchema(ConnectorSession session, String schemaName, boolean cascade)
{
if (cascade) {
List<String> nestedNamespaces = getNestedNamespaces(session, schemaName);

if (!nestedNamespaces.isEmpty()) {
throw new TrinoException(
ICEBERG_CATALOG_ERROR,
format("Cannot drop non-empty schema: %s, contains %s nested schema(s)", schemaName, Joiner.on(", ").join(nestedNamespaces)));
}

for (SchemaTableName materializedView : listMaterializedViews(session, Optional.of(schemaName))) {
dropMaterializedView(session, materializedView);
}
Expand Down Expand Up @@ -1130,6 +1140,21 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
return newWritableTableHandle(table.getSchemaTableName(), icebergTable, retryMode);
}

private List<String> getNestedNamespaces(ConnectorSession session, String parentNamespace)
{
Optional<String> namespaceSeparator = catalog.getNamespaceSeparator();

if (namespaceSeparator.isEmpty()) {
return ImmutableList.of();
}

return catalog.listNamespaces(session)
.stream()
.filter(namespace -> !namespace.equals(parentNamespace))
.filter(namespace -> namespace.startsWith(parentNamespace + namespaceSeparator.get()))
.collect(toImmutableList());
}

private IcebergWritableTableHandle newWritableTableHandle(SchemaTableName name, Table table, RetryMode retryMode)
{
return new IcebergWritableTableHandle(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public interface TrinoCatalog

void dropNamespace(ConnectorSession session, String namespace);

default Optional<String> getNamespaceSeparator()
{
return Optional.empty();
}

Map<String, Object> loadNamespaceMetadata(ConnectorSession session, String namespace);

Optional<TrinoPrincipal> getNamespacePrincipal(ConnectorSession session, String namespace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;

import java.net.URI;
import java.util.Optional;
Expand All @@ -40,6 +41,7 @@ public enum SessionType
private Security security = Security.NONE;
private SessionType sessionType = SessionType.NONE;
private boolean vendedCredentialsEnabled;
private Optional<String> namespaceSeparator = Optional.empty();

@NotNull
public URI getBaseUri()
Expand Down Expand Up @@ -123,4 +125,17 @@ public IcebergRestCatalogConfig setVendedCredentialsEnabled(boolean vendedCreden
this.vendedCredentialsEnabled = vendedCredentialsEnabled;
return this;
}

public Optional<@Pattern(regexp = "^\\.$", message = "The value must be a single period '.'") String> getNamespaceSeparator()
{
return this.namespaceSeparator;
}

@Config("iceberg.rest-catalog.namespace-separator")
@ConfigDescription("Namespace separator character")
public IcebergRestCatalogConfig setNamespaceSeparator(String separator)
{
this.namespaceSeparator = Optional.ofNullable(separator);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class TrinoIcebergRestCatalogFactory
private final SecurityProperties securityProperties;
private final boolean uniqueTableLocation;
private final TypeManager typeManager;
private final Optional<String> namespaceSeparator;

@GuardedBy("this")
private RESTSessionCatalog icebergCatalog;
Expand Down Expand Up @@ -81,6 +82,7 @@ public TrinoIcebergRestCatalogFactory(
requireNonNull(icebergConfig, "icebergConfig is null");
this.uniqueTableLocation = icebergConfig.isUniqueTableLocation();
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.namespaceSeparator = restConfig.getNamespaceSeparator();
}

@Override
Expand Down Expand Up @@ -124,6 +126,7 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity)
credentials,
trinoVersion,
typeManager,
uniqueTableLocation);
uniqueTableLocation,
namespaceSeparator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.iceberg.catalog.rest;

import com.google.common.base.Splitter;
import com.google.common.cache.Cache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -78,6 +79,7 @@
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand Down Expand Up @@ -108,6 +110,7 @@ public class TrinoRestCatalog
private final Map<String, String> credentials;
private final String trinoVersion;
private final boolean useUniqueTableLocation;
private final Optional<String> namespaceSeparator;

private final Cache<SchemaTableName, Table> tableCache = EvictableCacheBuilder.newBuilder()
.maximumSize(PER_QUERY_CACHE_SIZE)
Expand All @@ -120,7 +123,8 @@ public TrinoRestCatalog(
Map<String, String> credentials,
String trinoVersion,
TypeManager typeManager,
boolean useUniqueTableLocation)
boolean useUniqueTableLocation,
Optional<String> namespaceSeparator)
{
this.restSessionCatalog = requireNonNull(restSessionCatalog, "restSessionCatalog is null");
this.catalogName = requireNonNull(catalogName, "catalogName is null");
Expand All @@ -129,27 +133,41 @@ public TrinoRestCatalog(
this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.useUniqueTableLocation = useUniqueTableLocation;
this.namespaceSeparator = namespaceSeparator;
}

@Override
public Optional<String> getNamespaceSeparator()
{
return namespaceSeparator;
}

@Override
public boolean namespaceExists(ConnectorSession session, String namespace)
{
return restSessionCatalog.namespaceExists(convert(session), Namespace.of(namespace));
return restSessionCatalog.namespaceExists(convert(session), toNamespace(namespace));
}

@Override
public List<String> listNamespaces(ConnectorSession session)
{
return restSessionCatalog.listNamespaces(convert(session)).stream()
.map(Namespace::toString)
return collectNamespaces(session, Namespace.empty());
}

private List<String> collectNamespaces(ConnectorSession session, Namespace parentNamespace)
{
return restSessionCatalog.listNamespaces(convert(session), parentNamespace).stream()
.flatMap(childNamespace -> Stream.concat(
Stream.of(childNamespace.toString()),
collectNamespaces(session, childNamespace).stream()))
.collect(toImmutableList());
}

@Override
public void dropNamespace(ConnectorSession session, String namespace)
{
try {
restSessionCatalog.dropNamespace(convert(session), Namespace.of(namespace));
restSessionCatalog.dropNamespace(convert(session), toNamespace(namespace));
}
catch (NoSuchNamespaceException e) {
throw new SchemaNotFoundException(namespace);
Expand All @@ -164,7 +182,7 @@ public Map<String, Object> loadNamespaceMetadata(ConnectorSession session, Strin
{
try {
// Return immutable metadata as direct modifications will not be reflected on the namespace
return ImmutableMap.copyOf(restSessionCatalog.loadNamespaceMetadata(convert(session), Namespace.of(namespace)));
return ImmutableMap.copyOf(restSessionCatalog.loadNamespaceMetadata(convert(session), toNamespace(namespace)));
}
catch (NoSuchNamespaceException e) {
throw new SchemaNotFoundException(namespace);
Expand All @@ -183,7 +201,7 @@ public void createNamespace(ConnectorSession session, String namespace, Map<Stri
{
restSessionCatalog.createNamespace(
convert(session),
Namespace.of(namespace),
toNamespace(namespace),
Maps.transformValues(properties, property -> {
if (property instanceof String stringProperty) {
return stringProperty;
Expand Down Expand Up @@ -431,7 +449,7 @@ public void createView(ConnectorSession session, SchemaTableName schemaViewName,
ViewBuilder viewBuilder = restSessionCatalog.buildView(convert(session), toIdentifier(schemaViewName));
viewBuilder = viewBuilder.withSchema(schema)
.withQuery("trino", definition.getOriginalSql())
.withDefaultNamespace(Namespace.of(schemaViewName.getSchemaName()))
.withDefaultNamespace(toNamespace(schemaViewName.getSchemaName()))
.withDefaultCatalog(definition.getCatalog().orElse(null))
.withProperties(properties.buildOrThrow())
.withLocation(defaultTableLocation(session, schemaViewName));
Expand Down Expand Up @@ -653,19 +671,28 @@ private void invalidateTableCache(SchemaTableName schemaTableName)
tableCache.invalidate(schemaTableName);
}

private static TableIdentifier toIdentifier(SchemaTableName schemaTableName)
private TableIdentifier toIdentifier(SchemaTableName schemaTableName)
{
return TableIdentifier.of(toNamespace(schemaTableName.getSchemaName()), schemaTableName.getTableName());
}

private Namespace toNamespace(String namespace)
{
return TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName());
if (this.namespaceSeparator.isEmpty()) {
return Namespace.of(namespace);
}

return Namespace.of(Splitter.on(this.namespaceSeparator.get()).omitEmptyStrings().trimResults().splitToList(namespace).toArray(new String[0]));
}

private List<Namespace> listNamespaces(ConnectorSession session, Optional<String> namespace)
{
if (namespace.isEmpty()) {
return listNamespaces(session).stream()
.map(Namespace::of)
.map(this::toNamespace)
.collect(toImmutableList());
}

return ImmutableList.of(Namespace.of(namespace.get()));
return ImmutableList.of(toNamespace(namespace.get()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public void testDefaults()
.setWarehouse(null)
.setSessionType(IcebergRestCatalogConfig.SessionType.NONE)
.setSecurity(IcebergRestCatalogConfig.Security.NONE)
.setVendedCredentialsEnabled(false));
.setVendedCredentialsEnabled(false)
.setNamespaceSeparator(null));
}

@Test
Expand All @@ -46,6 +47,7 @@ public void testExplicitPropertyMappings()
.put("iceberg.rest-catalog.security", "OAUTH2")
.put("iceberg.rest-catalog.session", "USER")
.put("iceberg.rest-catalog.vended-credentials-enabled", "true")
.put("iceberg.rest-catalog.namespace-separator", ".")
.buildOrThrow();

IcebergRestCatalogConfig expected = new IcebergRestCatalogConfig()
Expand All @@ -54,7 +56,8 @@ public void testExplicitPropertyMappings()
.setWarehouse("test_warehouse_identifier")
.setSessionType(IcebergRestCatalogConfig.SessionType.USER)
.setSecurity(IcebergRestCatalogConfig.Security.OAUTH2)
.setVendedCredentialsEnabled(true);
.setVendedCredentialsEnabled(true)
.setNamespaceSeparator(".");

assertFullMapping(properties, expected);
}
Expand Down
Loading

0 comments on commit 3b2657a

Please sign in to comment.