Skip to content

Commit

Permalink
Add multipart namespaces support to TrinoNessieCatalog
Browse files Browse the repository at this point in the history
  • Loading branch information
sftwrdvlpr committed Sep 28, 2024
1 parent 5741265 commit 8f940d0
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.types.Type.PrimitiveType;
import org.apache.iceberg.types.TypeUtil;
Expand Down Expand Up @@ -203,6 +204,7 @@ public final class IcebergUtil
// - v0.gz.metadata.json
// - v0.metadata.json.gz
private static final Pattern HADOOP_GENERATED_METADATA_FILE_NAME_PATTERN = Pattern.compile("v(?<version>\\d+)(?<compression>\\.[a-zA-Z0-9]+)?" + Pattern.quote(METADATA_FILE_EXTENSION) + "(?<compression2>\\.[a-zA-Z0-9]+)?");
private static final String DOT_SEPARATOR = ".";

private IcebergUtil() {}

Expand Down Expand Up @@ -1041,4 +1043,9 @@ public static long getModificationTime(String path, TrinoFileSystem fileSystem)
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to get file modification time: " + path, e);
}
}

public static Namespace toNamespace(String namespace)
{
return Namespace.of(Splitter.on(DOT_SEPARATOR).omitEmptyStrings().trimResults().splitToList(namespace).toArray(new String[0]));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.iceberg.catalog.nessie;

import io.trino.plugin.iceberg.IcebergUtil;
import io.trino.spi.connector.SchemaTableName;
import org.apache.iceberg.catalog.TableIdentifier;

Expand All @@ -22,6 +23,6 @@ private IcebergNessieUtil() {}

static TableIdentifier toIdentifier(SchemaTableName schemaTableName)
{
return TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName());
return TableIdentifier.of(IcebergUtil.toNamespace(schemaTableName.getSchemaName()), schemaTableName.getTableName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.nessie.NessieIcebergClient;
import org.projectnessie.error.NessieReferenceNotFoundException;
import org.projectnessie.model.IcebergTable;

import java.util.Iterator;
Expand All @@ -64,8 +65,10 @@
import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName;
import static io.trino.plugin.iceberg.IcebergUtil.toNamespace;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.plugin.iceberg.catalog.nessie.IcebergNessieUtil.toIdentifier;
import static io.trino.spi.StandardErrorCode.NOT_FOUND;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.connector.SchemaTableName.schemaTableName;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -100,7 +103,7 @@ public TrinoNessieCatalog(
public boolean namespaceExists(ConnectorSession session, String namespace)
{
try {
return nessieClient.loadNamespaceMetadata(Namespace.of(namespace)) != null;
return nessieClient.loadNamespaceMetadata(toNamespace(namespace)) != null;
}
catch (Exception e) {
return false;
Expand All @@ -110,22 +113,32 @@ public boolean namespaceExists(ConnectorSession session, String namespace)
@Override
public List<String> listNamespaces(ConnectorSession session)
{
return nessieClient.listNamespaces(Namespace.empty()).stream()
.map(Namespace::toString)
.collect(toImmutableList());
try {
return nessieClient.getApi()
.getMultipleNamespaces()
.reference(nessieClient.getReference())
.get()
.getNamespaces()
.stream()
.map(org.projectnessie.model.Namespace::toString)
.collect(toImmutableList());
}
catch (NessieReferenceNotFoundException e) {
throw new TrinoException(NOT_FOUND, e.getMessage());
}
}

@Override
public void dropNamespace(ConnectorSession session, String namespace)
{
nessieClient.dropNamespace(Namespace.of(namespace));
nessieClient.dropNamespace(toNamespace(namespace));
}

@Override
public Map<String, Object> loadNamespaceMetadata(ConnectorSession session, String namespace)
{
try {
return ImmutableMap.copyOf(nessieClient.loadNamespaceMetadata(Namespace.of(namespace)));
return ImmutableMap.copyOf(nessieClient.loadNamespaceMetadata(toNamespace(namespace)));
}
catch (NoSuchNamespaceException e) {
throw new SchemaNotFoundException(namespace);
Expand All @@ -141,7 +154,7 @@ public Optional<TrinoPrincipal> getNamespacePrincipal(ConnectorSession session,
@Override
public void createNamespace(ConnectorSession session, String namespace, Map<String, Object> properties, TrinoPrincipal owner)
{
nessieClient.createNamespace(Namespace.of(namespace), Maps.transformValues(properties, property -> {
nessieClient.createNamespace(toNamespace(namespace), Maps.transformValues(properties, property -> {
if (property instanceof String stringProperty) {
return stringProperty;
}
Expand All @@ -165,7 +178,7 @@ public void renameNamespace(ConnectorSession session, String source, String targ
public List<TableInfo> listTables(ConnectorSession session, Optional<String> namespace)
{
// views and materialized views are currently not supported, so everything is a table
return nessieClient.listTables(namespace.isEmpty() ? Namespace.empty() : Namespace.of(namespace.get()))
return nessieClient.listTables(namespace.isEmpty() ? Namespace.empty() : toNamespace(namespace.get()))
.stream()
.map(id -> new TableInfo(schemaTableName(id.namespace().toString(), id.name()), TableInfo.ExtendedRelationType.TABLE))
.collect(toImmutableList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.iceberg.catalog.nessie;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.hdfs.HdfsFileSystemFactory;
Expand Down Expand Up @@ -198,4 +199,31 @@ public void testNonLowercaseNamespace()
catalog.dropNamespace(SESSION, namespace);
}
}

@Test
public void testMultipartNamespace()
{
String dotSeparator = ".";
TrinoCatalog catalog = createTrinoCatalog(false);

ImmutableList<String> namespacesList = ImmutableList.of(
"level-1" + randomNameSuffix(),
"level-1" + randomNameSuffix() + dotSeparator + "level-2" + randomNameSuffix(),
"level-1" + randomNameSuffix() + dotSeparator + "level-2" + randomNameSuffix() + dotSeparator + "level-3" + randomNameSuffix());

for (String namespace : namespacesList) {
catalog.createNamespace(SESSION, namespace, Map.of(), new TrinoPrincipal(PrincipalType.USER, SESSION.getUser()));
}

try {
assertThat(catalog.listNamespaces(SESSION)).as("catalog.listNamespaces")
.hasSize(namespacesList.size())
.containsAll(namespacesList);
}
finally {
for (String namespace : namespacesList) {
catalog.dropNamespace(SESSION, namespace);
}
}
}
}

0 comments on commit 8f940d0

Please sign in to comment.