Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for add_files and add_files_from_table procedures in Iceberg #22751

Merged
merged 4 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ implementation is used:
* - `iceberg.register-table-procedure.enabled`
- Enable to allow user to call [`register_table` procedure](iceberg-register-table).
- `false`
* - `iceberg.add_files-procedure.enabled`
- Enable to allow user to call `add_files` procedure.
- `false`
* - `iceberg.query-partition-filter-required`
- Set to `true` to force a query to use a partition filter for schemas
specified with `iceberg.query-partition-filter-required-schemas`. Equivalent
Expand Down Expand Up @@ -562,6 +565,65 @@ The default value is `fail`, which causes the migrate procedure to throw an
exception if subdirectories are found. Set the value to `true` to migrate
nested directories, or `false` to ignore them.

(iceberg-add-files)=
#### Add files

The connector can add files from tables or locations if
`iceberg.add_files-procedure.enabled` is set to `true` for the catalog.

Use the procedure `system.add_files_from_table` to add existing files from the Hive
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
table or `system.add_files` to add existing files from specified locations.
The data files must be the Parquet, ORC, or Avro file format.

:::{warning}
The procedure does not check if files are already present in the target table.
:::

The procedure must be called for a specific catalog `example` with the
relevant schema and table names supplied with the required parameters
`schema_name` and `table_name`:

```sql
ALTER TABLE testdb.iceberg_customer_orders EXECUTE add_files_from_table(
schema_name => 'testdb',
table_name => 'hive_customer_orders')
```

You need to provide a `partition_filter` argument to add files from specified partitions.
The following example adds files from a partition where the `region` is `ASIA` and
`country` is `JAPAN`:

```sql
ALTER TABLE testdb.iceberg_customer_orders EXECUTE add_files_from_table(
schema_name => 'testdb',
table_name => 'hive_customer_orders',
partition_filter => map(ARRAY['region', 'country'], ARRAY['ASIA', 'JAPAN']))
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
```

In addition, you can provide a `recursive_directory` argument to migrate a
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
Hive table that contains subdirectories:

```sql
ALTER TABLE testdb.iceberg_customer_orders EXECUTE add_files_from_table(
schema_name => 'testdb',
table_name => 'hive_customer_orders',
recursive_directory => 'true')
```

The default value of `recursive_directory` is `fail`, which causes the procedure
to throw an exception if subdirectories are found. Set the value to `true` to add
files from nested directories, or `false` to ignore them.

`add_files` procedure supports adding files from a specified location.
The procedure does not validate file schemas for compatibility with
the target Iceberg table. The `location` property is supported for partitioned tables.

```sql
ALTER TABLE testdb.iceberg_customer_orders EXECUTE add_files(
location => 's3://my-bucket/a/path',
format => 'ORC')
```

(iceberg-data-management)=
### Data management

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class IcebergConfig
private boolean collectExtendedStatisticsOnWrite = true;
private boolean projectionPushdownEnabled = true;
private boolean registerTableProcedureEnabled;
private boolean addFilesProcedureEnabled;
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
private Optional<String> hiveCatalogName = Optional.empty();
private int formatVersion = FORMAT_VERSION_SUPPORT_MAX;
private Duration expireSnapshotsMinRetention = new Duration(7, DAYS);
Expand Down Expand Up @@ -255,6 +256,19 @@ public IcebergConfig setRegisterTableProcedureEnabled(boolean registerTableProce
return this;
}

public boolean isAddFilesProcedureEnabled()
{
return addFilesProcedureEnabled;
}

@Config("iceberg.add_files-procedure.enabled")
@ConfigDescription("Allow users to call the add_files procedure")
public IcebergConfig setAddFilesProcedureEnabled(boolean addFilesProcedureEnabled)
{
this.addFilesProcedureEnabled = addFilesProcedureEnabled;
return this;
}

public Optional<String> getHiveCatalogName()
{
return hiveCatalogName;
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@

import com.google.inject.Inject;
import io.airlift.json.JsonCodec;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
import io.trino.plugin.hive.metastore.RawHiveMetastoreFactory;
import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.type.TypeManager;

import java.util.Optional;

import static java.util.Objects.requireNonNull;

public class IcebergMetadataFactory
Expand All @@ -30,6 +34,8 @@ public class IcebergMetadataFactory
private final TrinoCatalogFactory catalogFactory;
private final IcebergFileSystemFactory fileSystemFactory;
private final TableStatisticsWriter tableStatisticsWriter;
private final Optional<HiveMetastoreFactory> metastoreFactory;
private final boolean addFilesProcedureEnabled;

@Inject
public IcebergMetadataFactory(
Expand All @@ -38,14 +44,18 @@ public IcebergMetadataFactory(
JsonCodec<CommitTaskData> commitTaskCodec,
TrinoCatalogFactory catalogFactory,
IcebergFileSystemFactory fileSystemFactory,
TableStatisticsWriter tableStatisticsWriter)
TableStatisticsWriter tableStatisticsWriter,
@RawHiveMetastoreFactory Optional<HiveMetastoreFactory> metastoreFactory,
IcebergConfig config)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.trinoCatalogHandle = requireNonNull(trinoCatalogHandle, "trinoCatalogHandle is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.tableStatisticsWriter = requireNonNull(tableStatisticsWriter, "tableStatisticsWriter is null");
this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null");
this.addFilesProcedureEnabled = config.isAddFilesProcedureEnabled();
}

public IcebergMetadata create(ConnectorIdentity identity)
Expand All @@ -56,6 +66,8 @@ public IcebergMetadata create(ConnectorIdentity identity)
commitTaskCodec,
catalogFactory.create(identity),
fileSystemFactory,
tableStatisticsWriter);
tableStatisticsWriter,
metastoreFactory,
addFilesProcedureEnabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.plugin.hive.FileFormatDataSourceStats;
import io.trino.plugin.hive.SortingFileWriterConfig;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
import io.trino.plugin.hive.metastore.RawHiveMetastoreFactory;
import io.trino.plugin.hive.metastore.thrift.TranslateHiveViews;
import io.trino.plugin.hive.orc.OrcReaderConfig;
import io.trino.plugin.hive.orc.OrcWriterConfig;
Expand All @@ -34,6 +36,8 @@
import io.trino.plugin.iceberg.functions.IcebergFunctionProvider;
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProcessorProviderFactory;
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProvider;
import io.trino.plugin.iceberg.procedure.AddFilesTableFromTableProcedure;
import io.trino.plugin.iceberg.procedure.AddFilesTableProcedure;
import io.trino.plugin.iceberg.procedure.DropExtendedStatsTableProcedure;
import io.trino.plugin.iceberg.procedure.ExpireSnapshotsTableProcedure;
import io.trino.plugin.iceberg.procedure.OptimizeTableProcedure;
Expand Down Expand Up @@ -93,6 +97,7 @@ public void configure(Binder binder)

binder.bind(TableStatisticsWriter.class).in(Scopes.SINGLETON);
binder.bind(IcebergMetadataFactory.class).in(Scopes.SINGLETON);
newOptionalBinder(binder, Key.get(HiveMetastoreFactory.class, RawHiveMetastoreFactory.class));

jsonCodecBinder(binder).bindJsonCodec(CommitTaskData.class);

Expand All @@ -112,6 +117,8 @@ public void configure(Binder binder)
tableProcedures.addBinding().toProvider(DropExtendedStatsTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(ExpireSnapshotsTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(RemoveOrphanFilesTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(AddFilesTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(AddFilesTableFromTableProcedure.class).in(Scopes.SINGLETON);

newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(TableChangesFunctionProvider.class).in(Scopes.SINGLETON);
binder.bind(FunctionProvider.class).to(IcebergFunctionProvider.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
case DROP_EXTENDED_STATS:
case EXPIRE_SNAPSHOTS:
case REMOVE_ORPHAN_FILES:
case ADD_FILES:
case ADD_FILES_FROM_TABLE:
// handled via ConnectorMetadata.executeTableExecute
}
throw new IllegalArgumentException("Unknown procedure: " + executeHandle.procedureId());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.procedure;

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import com.google.inject.Provider;
import io.trino.plugin.iceberg.procedure.MigrationUtils.RecursiveDirectory;
import io.trino.spi.connector.TableProcedureMetadata;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.type.MapType;
import io.trino.spi.type.TypeManager;

import java.util.Map;

import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ADD_FILES_FROM_TABLE;
import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly;
import static io.trino.spi.session.PropertyMetadata.enumProperty;
import static io.trino.spi.session.PropertyMetadata.stringProperty;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;

public class AddFilesTableFromTableProcedure
implements Provider<TableProcedureMetadata>
{
private final TypeManager typeManager;

@Inject
public AddFilesTableFromTableProcedure(TypeManager typeManager)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
}

@Override
public TableProcedureMetadata get()
{
return new TableProcedureMetadata(
ADD_FILES_FROM_TABLE.name(),
coordinatorOnly(),
ImmutableList.<PropertyMetadata<?>>builder()
.add(stringProperty(
"schema_name",
"Source schema name",
null,
false))
.add(stringProperty(
"table_name",
"Source table name",
null,
false))
.add(new PropertyMetadata<>(
"partition_filter",
"Partition filter",
new MapType(VARCHAR, VARCHAR, typeManager.getTypeOperators()),
Map.class,
null,
false,
object -> (Map<?, ?>) object,
Object::toString))
.add(enumProperty(
"recursive_directory",
"Recursive directory",
RecursiveDirectory.class,
RecursiveDirectory.FAIL,
false))
.build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.procedure;

import com.google.common.collect.ImmutableList;
import com.google.inject.Provider;
import io.trino.plugin.hive.HiveStorageFormat;
import io.trino.plugin.iceberg.procedure.MigrationUtils.RecursiveDirectory;
import io.trino.spi.connector.TableProcedureMetadata;
import io.trino.spi.session.PropertyMetadata;

import static io.trino.plugin.base.util.Procedures.checkProcedureArgument;
import static io.trino.plugin.hive.HiveStorageFormat.AVRO;
import static io.trino.plugin.hive.HiveStorageFormat.ORC;
import static io.trino.plugin.hive.HiveStorageFormat.PARQUET;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ADD_FILES;
import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly;
import static io.trino.spi.session.PropertyMetadata.enumProperty;
import static io.trino.spi.session.PropertyMetadata.stringProperty;

public class AddFilesTableProcedure
implements Provider<TableProcedureMetadata>
{
@Override
public TableProcedureMetadata get()
{
return new TableProcedureMetadata(
ADD_FILES.name(),
coordinatorOnly(),
ImmutableList.<PropertyMetadata<?>>builder()
.add(stringProperty(
"location",
"location",
null,
false))
.add(enumProperty(
"format",
"File format",
HiveStorageFormat.class,
null,
value -> checkProcedureArgument(value == ORC || value == PARQUET || value == AVRO, "The procedure does not support storage format: %s", value),
false))
.add(enumProperty(
"recursive_directory",
"Recursive directory",
RecursiveDirectory.class,
RecursiveDirectory.FAIL,
false))
.build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.procedure;

import io.trino.plugin.iceberg.procedure.MigrationUtils.RecursiveDirectory;
import jakarta.annotation.Nullable;

import java.util.Map;

import static java.util.Objects.requireNonNull;

public record IcebergAddFilesFromTableHandle(
io.trino.metastore.Table table,
@Nullable Map<String, String> partitionFilter,
RecursiveDirectory recursiveDirectory)
implements IcebergProcedureHandle
{
public IcebergAddFilesFromTableHandle
{
requireNonNull(table, "table is null");
requireNonNull(recursiveDirectory, "recursiveDirectory is null");
}
}
Loading