diff --git a/docs/src/main/sphinx/connector/iceberg.md b/docs/src/main/sphinx/connector/iceberg.md index 55f0eaf959a4e..c6b30872dbc66 100644 --- a/docs/src/main/sphinx/connector/iceberg.md +++ b/docs/src/main/sphinx/connector/iceberg.md @@ -168,6 +168,9 @@ implementation is used: * - `iceberg.register-table-procedure.enabled` - Enable to allow user to call [`register_table` procedure](iceberg-register-table). - `false` +* - `iceberg.add_files-procedure.enabled` + - Enable to allow user to call `add_files` procedure. + - `false` * - `iceberg.query-partition-filter-required` - Set to `true` to force a query to use a partition filter for schemas specified with `iceberg.query-partition-filter-required-schemas`. Equivalent @@ -562,6 +565,65 @@ The default value is `fail`, which causes the migrate procedure to throw an exception if subdirectories are found. Set the value to `true` to migrate nested directories, or `false` to ignore them. +(iceberg-add-files)= +#### Add files + +The connector can add files from tables or locations if +`iceberg.add_files-procedure.enabled` is set to `true` for the catalog. + +Use the procedure `system.add_files_from_table` to add existing files from the Hive +table or `system.add_files` to add existing files from specified locations. +The data files must be the Parquet, ORC, or Avro file format. + +:::{warning} +The procedure does not check if files are already present in the target table. +::: + +The procedure must be called for a specific catalog `example` with the +relevant schema and table names supplied with the required parameters +`schema_name` and `table_name`: + +```sql +ALTER TABLE testdb.iceberg_customer_orders EXECUTE add_files_from_table( + schema_name => 'testdb', + table_name => 'hive_customer_orders') +``` + +You need to provide a `partition_filter` argument to add files from specified partitions. +The following example adds files from a partition where the `region` is `ASIA` and +`country` is `JAPAN`: + +```sql +ALTER TABLE testdb.iceberg_customer_orders EXECUTE add_files_from_table( + schema_name => 'testdb', + table_name => 'hive_customer_orders', + partition_filter => map(ARRAY['region', 'country'], ARRAY['ASIA', 'JAPAN'])) +``` + +In addition, you can provide a `recursive_directory` argument to migrate a +Hive table that contains subdirectories: + +```sql +ALTER TABLE testdb.iceberg_customer_orders EXECUTE add_files_from_table( + schema_name => 'testdb', + table_name => 'hive_customer_orders', + recursive_directory => 'true') +``` + +The default value of `recursive_directory` is `fail`, which causes the procedure +to throw an exception if subdirectories are found. Set the value to `true` to add +files from nested directories, or `false` to ignore them. + +`add_files` procedure supports adding files from a specified location. +The procedure does not validate file schemas for compatibility with +the target Iceberg table. The `location` property is supported for partitioned tables. + +```sql +ALTER TABLE testdb.iceberg_customer_orders EXECUTE add_files( + location => 's3://my-bucket/a/path', + format => 'ORC') +``` + (iceberg-data-management)= ### Data management diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java index 666b80a48cd55..7b0d5fd0c1896 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java @@ -67,6 +67,7 @@ public class IcebergConfig private boolean collectExtendedStatisticsOnWrite = true; private boolean projectionPushdownEnabled = true; private boolean registerTableProcedureEnabled; + private boolean addFilesProcedureEnabled; private Optional hiveCatalogName = Optional.empty(); private int formatVersion = FORMAT_VERSION_SUPPORT_MAX; private Duration expireSnapshotsMinRetention = new Duration(7, DAYS); @@ -255,6 +256,19 @@ public IcebergConfig setRegisterTableProcedureEnabled(boolean registerTableProce return this; } + public boolean isAddFilesProcedureEnabled() + { + return addFilesProcedureEnabled; + } + + @Config("iceberg.add_files-procedure.enabled") + @ConfigDescription("Allow users to call the add_files procedure") + public IcebergConfig setAddFilesProcedureEnabled(boolean addFilesProcedureEnabled) + { + this.addFilesProcedureEnabled = addFilesProcedureEnabled; + return this; + } + public Optional getHiveCatalogName() { return hiveCatalogName; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 31bce54dcb11a..a99dc048d2f69 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -33,21 +33,28 @@ import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; +import io.trino.metastore.Column; +import io.trino.metastore.HiveMetastore; import io.trino.metastore.TableInfo; import io.trino.plugin.base.classloader.ClassLoaderSafeSystemTable; import io.trino.plugin.base.filter.UtcConstraintExtractor; import io.trino.plugin.base.projection.ApplyProjectionUtil; import io.trino.plugin.base.projection.ApplyProjectionUtil.ProjectedColumnRepresentation; +import io.trino.plugin.hive.HiveStorageFormat; import io.trino.plugin.hive.HiveWrittenPartitions; +import io.trino.plugin.hive.metastore.HiveMetastoreFactory; import io.trino.plugin.iceberg.aggregation.DataSketchStateSerializer; import io.trino.plugin.iceberg.aggregation.IcebergThetaSketchForStats; import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.procedure.IcebergAddFilesFromTableHandle; +import io.trino.plugin.iceberg.procedure.IcebergAddFilesHandle; import io.trino.plugin.iceberg.procedure.IcebergDropExtendedStatsHandle; import io.trino.plugin.iceberg.procedure.IcebergExpireSnapshotsHandle; import io.trino.plugin.iceberg.procedure.IcebergOptimizeHandle; import io.trino.plugin.iceberg.procedure.IcebergRemoveOrphanFilesHandle; import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle; import io.trino.plugin.iceberg.procedure.IcebergTableProcedureId; +import io.trino.plugin.iceberg.procedure.MigrationUtils.RecursiveDirectory; import io.trino.plugin.iceberg.util.DataFileWithDeleteFiles; import io.trino.spi.ErrorCode; import io.trino.spi.RefreshType; @@ -197,6 +204,7 @@ import java.util.function.UnaryOperator; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Stream; import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; @@ -214,6 +222,14 @@ import static io.trino.plugin.base.projection.ApplyProjectionUtil.extractSupportedProjectedColumns; import static io.trino.plugin.base.projection.ApplyProjectionUtil.replaceWithNewVariables; import static io.trino.plugin.base.util.Procedures.checkProcedureArgument; +import static io.trino.plugin.hive.HiveMetadata.TRANSACTIONAL; +import static io.trino.plugin.hive.HiveTimestampPrecision.DEFAULT_PRECISION; +import static io.trino.plugin.hive.ViewReaderUtil.isSomeKindOfAView; +import static io.trino.plugin.hive.util.HiveTypeUtil.getTypeSignature; +import static io.trino.plugin.hive.util.HiveUtil.isDeltaLakeTable; +import static io.trino.plugin.hive.util.HiveUtil.isHudiTable; +import static io.trino.plugin.hive.util.HiveUtil.isIcebergTable; +import static io.trino.plugin.iceberg.ColumnIdentity.createColumnIdentity; import static io.trino.plugin.iceberg.ExpressionConverter.isConvertableToIcebergExpression; import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression; import static io.trino.plugin.iceberg.IcebergAnalyzeProperties.getColumnNames; @@ -275,22 +291,30 @@ import static io.trino.plugin.iceberg.TableStatisticsWriter.StatsUpdateMode.INCREMENTAL_UPDATE; import static io.trino.plugin.iceberg.TableStatisticsWriter.StatsUpdateMode.REPLACE; import static io.trino.plugin.iceberg.TableType.DATA; +import static io.trino.plugin.iceberg.TypeConverter.toIcebergType; import static io.trino.plugin.iceberg.TypeConverter.toIcebergTypeForNewColumn; import static io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog.DEPENDS_ON_TABLES; import static io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog.DEPENDS_ON_TABLE_FUNCTIONS; import static io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog.TRINO_QUERY_START_TIME; +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ADD_FILES; +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ADD_FILES_FROM_TABLE; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.DROP_EXTENDED_STATS; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.EXPIRE_SNAPSHOTS; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.OPTIMIZE; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.REMOVE_ORPHAN_FILES; +import static io.trino.plugin.iceberg.procedure.MigrationUtils.addFiles; +import static io.trino.plugin.iceberg.procedure.MigrationUtils.addFilesFromTable; import static io.trino.spi.StandardErrorCode.COLUMN_ALREADY_EXISTS; +import static io.trino.spi.StandardErrorCode.COLUMN_NOT_FOUND; import static io.trino.spi.StandardErrorCode.INVALID_ANALYZE_PROPERTY; import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS; import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static io.trino.spi.StandardErrorCode.PERMISSION_DENIED; import static io.trino.spi.StandardErrorCode.QUERY_REJECTED; import static io.trino.spi.StandardErrorCode.TABLE_ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; +import static io.trino.spi.StandardErrorCode.TYPE_MISMATCH; import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.FRESH; import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.STALE; import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.UNKNOWN; @@ -307,6 +331,7 @@ import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND; import static io.trino.spi.type.TinyintType.TINYINT; import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.lang.Boolean.parseBoolean; import static java.lang.Math.floorDiv; import static java.lang.String.format; import static java.util.Locale.ENGLISH; @@ -351,6 +376,8 @@ public class IcebergMetadata private final TrinoCatalog catalog; private final IcebergFileSystemFactory fileSystemFactory; private final TableStatisticsWriter tableStatisticsWriter; + private final Optional metastoreFactory; + private final boolean addFilesProcedureEnabled; private final Map> tableStatisticsCache = new ConcurrentHashMap<>(); @@ -363,7 +390,9 @@ public IcebergMetadata( JsonCodec commitTaskCodec, TrinoCatalog catalog, IcebergFileSystemFactory fileSystemFactory, - TableStatisticsWriter tableStatisticsWriter) + TableStatisticsWriter tableStatisticsWriter, + Optional metastoreFactory, + boolean addFilesProcedureEnabled) { this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.trinoCatalogHandle = requireNonNull(trinoCatalogHandle, "trinoCatalogHandle is null"); @@ -371,6 +400,8 @@ public IcebergMetadata( this.catalog = requireNonNull(catalog, "catalog is null"); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.tableStatisticsWriter = requireNonNull(tableStatisticsWriter, "tableStatisticsWriter is null"); + this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null"); + this.addFilesProcedureEnabled = addFilesProcedureEnabled; } @Override @@ -1372,6 +1403,8 @@ public Optional getTableHandleForExecute( case DROP_EXTENDED_STATS -> getTableHandleForDropExtendedStats(session, tableHandle); case EXPIRE_SNAPSHOTS -> getTableHandleForExpireSnapshots(session, tableHandle, executeProperties); case REMOVE_ORPHAN_FILES -> getTableHandleForRemoveOrphanFiles(session, tableHandle, executeProperties); + case ADD_FILES -> getTableHandleForAddFiles(session, accessControl, tableHandle, executeProperties); + case ADD_FILES_FROM_TABLE -> getTableHandleForAddFilesFromTable(session, accessControl, tableHandle, executeProperties); }; } @@ -1440,6 +1473,100 @@ private Optional getTableHandleForRemoveOrphanFiles icebergTable.io().properties())); } + private Optional getTableHandleForAddFiles(ConnectorSession session, ConnectorAccessControl accessControl, IcebergTableHandle tableHandle, Map executeProperties) + { + if (!addFilesProcedureEnabled) { + throw new TrinoException(PERMISSION_DENIED, "add_files procedure is disabled"); + } + + accessControl.checkCanInsertIntoTable(null, tableHandle.getSchemaTableName()); + + String location = (String) requireProcedureArgument(executeProperties, "location"); + HiveStorageFormat format = (HiveStorageFormat) requireProcedureArgument(executeProperties, "format"); + RecursiveDirectory recursiveDirectory = (RecursiveDirectory) executeProperties.getOrDefault("recursive_directory", "fail"); + + Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + + return Optional.of(new IcebergTableExecuteHandle( + tableHandle.getSchemaTableName(), + ADD_FILES, + new IcebergAddFilesHandle(location, format, recursiveDirectory), + icebergTable.location(), + icebergTable.io().properties())); + } + + private Optional getTableHandleForAddFilesFromTable(ConnectorSession session, ConnectorAccessControl accessControl, IcebergTableHandle tableHandle, Map executeProperties) + { + accessControl.checkCanInsertIntoTable(null, tableHandle.getSchemaTableName()); + + String schemaName = (String) requireProcedureArgument(executeProperties, "schema_name"); + String tableName = (String) requireProcedureArgument(executeProperties, "table_name"); + @SuppressWarnings("unchecked") + Map partitionFilter = (Map) executeProperties.get("partition_filter"); + RecursiveDirectory recursiveDirectory = (RecursiveDirectory) executeProperties.getOrDefault("recursive_directory", "fail"); + + HiveMetastore metastore = metastoreFactory.orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "This catalog does not support add_files_from_table procedure")) + .createMetastore(Optional.of(session.getIdentity())); + SchemaTableName sourceName = new SchemaTableName(schemaName, tableName); + io.trino.metastore.Table sourceTable = metastore.getTable(schemaName, tableName).orElseThrow(() -> new TableNotFoundException(sourceName)); + accessControl.checkCanSelectFromColumns(null, sourceName, Stream.concat(sourceTable.getDataColumns().stream(), sourceTable.getPartitionColumns().stream()) + .map(Column::getName) + .collect(toImmutableSet())); + + Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + + checkProcedureArgument( + icebergTable.schemas().size() == sourceTable.getDataColumns().size(), + "Data column count mismatch: %d vs %d", icebergTable.schemas().size(), sourceTable.getDataColumns().size()); + + // TODO Add files from all partitions when partition filter is not provided + checkProcedureArgument( + sourceTable.getPartitionColumns().isEmpty() || partitionFilter != null, + "partition_filter argument must be provided for partitioned tables"); + + String transactionalProperty = sourceTable.getParameters().get(TRANSACTIONAL); + if (parseBoolean(transactionalProperty)) { + throw new TrinoException(NOT_SUPPORTED, "Adding files from transactional tables is unsupported"); + } + if (!"MANAGED_TABLE".equalsIgnoreCase(sourceTable.getTableType()) && !"EXTERNAL_TABLE".equalsIgnoreCase(sourceTable.getTableType())) { + throw new TrinoException(NOT_SUPPORTED, "The procedure doesn't support adding files from %s table type".formatted(sourceTable.getTableType())); + } + if (isSomeKindOfAView(sourceTable) || isIcebergTable(sourceTable) || isDeltaLakeTable(sourceTable) || isHudiTable(sourceTable)) { + throw new TrinoException(NOT_SUPPORTED, "Adding files from non-Hive tables is unsupported"); + } + if (sourceTable.getPartitionColumns().isEmpty() && partitionFilter != null && !partitionFilter.isEmpty()) { + throw new TrinoException(NOT_SUPPORTED, "Partition filter is not supported for non-partitioned tables"); + } + + Stream.of(sourceTable.getDataColumns(), sourceTable.getPartitionColumns()) + .flatMap(List::stream) + .forEach(sourceColumn -> { + Types.NestedField targetColumn = icebergTable.schema().caseInsensitiveFindField(sourceColumn.getName()); + if (targetColumn == null) { + throw new TrinoException(COLUMN_NOT_FOUND, "Column '%s' does not exist".formatted(sourceColumn.getName())); + } + ColumnIdentity columnIdentity = createColumnIdentity(targetColumn); + org.apache.iceberg.types.Type sourceColumnType = toIcebergType(typeManager.getType(getTypeSignature(sourceColumn.getType(), DEFAULT_PRECISION)), columnIdentity); + if (!targetColumn.type().equals(sourceColumnType)) { + throw new TrinoException(TYPE_MISMATCH, "Target '%s' column is '%s' type, but got source '%s' type".formatted(targetColumn.name(), targetColumn.type(), sourceColumnType)); + } + }); + + return Optional.of(new IcebergTableExecuteHandle( + tableHandle.getSchemaTableName(), + ADD_FILES_FROM_TABLE, + new IcebergAddFilesFromTableHandle(sourceTable, partitionFilter, recursiveDirectory), + icebergTable.location(), + icebergTable.io().properties())); + } + + private static Object requireProcedureArgument(Map properties, String name) + { + Object value = properties.get(name); + checkProcedureArgument(value != null, "Required procedure argument '%s' is missing", name); + return value; + } + @Override public Optional getLayoutForTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { @@ -1450,6 +1577,8 @@ public Optional getLayoutForTableExecute(ConnectorSession case DROP_EXTENDED_STATS: case EXPIRE_SNAPSHOTS: case REMOVE_ORPHAN_FILES: + case ADD_FILES: + case ADD_FILES_FROM_TABLE: // handled via executeTableExecute } throw new IllegalArgumentException("Unknown procedure '" + executeHandle.procedureId() + "'"); @@ -1477,6 +1606,8 @@ public BeginTableExecuteResult> readerForManifest(Table table, ManifestFile manifest) { return switch (manifest.content()) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java index c9259a57ee892..88c66d1e49084 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java @@ -15,11 +15,15 @@ import com.google.inject.Inject; import io.airlift.json.JsonCodec; +import io.trino.plugin.hive.metastore.HiveMetastoreFactory; +import io.trino.plugin.hive.metastore.RawHiveMetastoreFactory; import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; import io.trino.spi.connector.CatalogHandle; import io.trino.spi.security.ConnectorIdentity; import io.trino.spi.type.TypeManager; +import java.util.Optional; + import static java.util.Objects.requireNonNull; public class IcebergMetadataFactory @@ -30,6 +34,8 @@ public class IcebergMetadataFactory private final TrinoCatalogFactory catalogFactory; private final IcebergFileSystemFactory fileSystemFactory; private final TableStatisticsWriter tableStatisticsWriter; + private final Optional metastoreFactory; + private final boolean addFilesProcedureEnabled; @Inject public IcebergMetadataFactory( @@ -38,7 +44,9 @@ public IcebergMetadataFactory( JsonCodec commitTaskCodec, TrinoCatalogFactory catalogFactory, IcebergFileSystemFactory fileSystemFactory, - TableStatisticsWriter tableStatisticsWriter) + TableStatisticsWriter tableStatisticsWriter, + @RawHiveMetastoreFactory Optional metastoreFactory, + IcebergConfig config) { this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.trinoCatalogHandle = requireNonNull(trinoCatalogHandle, "trinoCatalogHandle is null"); @@ -46,6 +54,8 @@ public IcebergMetadataFactory( this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null"); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.tableStatisticsWriter = requireNonNull(tableStatisticsWriter, "tableStatisticsWriter is null"); + this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null"); + this.addFilesProcedureEnabled = config.isAddFilesProcedureEnabled(); } public IcebergMetadata create(ConnectorIdentity identity) @@ -56,6 +66,8 @@ public IcebergMetadata create(ConnectorIdentity identity) commitTaskCodec, catalogFactory.create(identity), fileSystemFactory, - tableStatisticsWriter); + tableStatisticsWriter, + metastoreFactory, + addFilesProcedureEnabled); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java index 3f106e44ed370..a0633621b9440 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java @@ -24,6 +24,8 @@ import io.trino.plugin.base.session.SessionPropertiesProvider; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.SortingFileWriterConfig; +import io.trino.plugin.hive.metastore.HiveMetastoreFactory; +import io.trino.plugin.hive.metastore.RawHiveMetastoreFactory; import io.trino.plugin.hive.metastore.thrift.TranslateHiveViews; import io.trino.plugin.hive.orc.OrcReaderConfig; import io.trino.plugin.hive.orc.OrcWriterConfig; @@ -34,6 +36,8 @@ import io.trino.plugin.iceberg.functions.IcebergFunctionProvider; import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProcessorProviderFactory; import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProvider; +import io.trino.plugin.iceberg.procedure.AddFilesTableFromTableProcedure; +import io.trino.plugin.iceberg.procedure.AddFilesTableProcedure; import io.trino.plugin.iceberg.procedure.DropExtendedStatsTableProcedure; import io.trino.plugin.iceberg.procedure.ExpireSnapshotsTableProcedure; import io.trino.plugin.iceberg.procedure.OptimizeTableProcedure; @@ -93,6 +97,7 @@ public void configure(Binder binder) binder.bind(TableStatisticsWriter.class).in(Scopes.SINGLETON); binder.bind(IcebergMetadataFactory.class).in(Scopes.SINGLETON); + newOptionalBinder(binder, Key.get(HiveMetastoreFactory.class, RawHiveMetastoreFactory.class)); jsonCodecBinder(binder).bindJsonCodec(CommitTaskData.class); @@ -112,6 +117,8 @@ public void configure(Binder binder) tableProcedures.addBinding().toProvider(DropExtendedStatsTableProcedure.class).in(Scopes.SINGLETON); tableProcedures.addBinding().toProvider(ExpireSnapshotsTableProcedure.class).in(Scopes.SINGLETON); tableProcedures.addBinding().toProvider(RemoveOrphanFilesTableProcedure.class).in(Scopes.SINGLETON); + tableProcedures.addBinding().toProvider(AddFilesTableProcedure.class).in(Scopes.SINGLETON); + tableProcedures.addBinding().toProvider(AddFilesTableFromTableProcedure.class).in(Scopes.SINGLETON); newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(TableChangesFunctionProvider.class).in(Scopes.SINGLETON); binder.bind(FunctionProvider.class).to(IcebergFunctionProvider.class).in(Scopes.SINGLETON); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java index c78bc2b827e24..c1f6e4c6de7d6 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java @@ -149,6 +149,8 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa case DROP_EXTENDED_STATS: case EXPIRE_SNAPSHOTS: case REMOVE_ORPHAN_FILES: + case ADD_FILES: + case ADD_FILES_FROM_TABLE: // handled via ConnectorMetadata.executeTableExecute } throw new IllegalArgumentException("Unknown procedure: " + executeHandle.procedureId()); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/AddFilesTableFromTableProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/AddFilesTableFromTableProcedure.java new file mode 100644 index 0000000000000..b96f2dc42f00e --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/AddFilesTableFromTableProcedure.java @@ -0,0 +1,79 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; +import com.google.inject.Provider; +import io.trino.plugin.iceberg.procedure.MigrationUtils.RecursiveDirectory; +import io.trino.spi.connector.TableProcedureMetadata; +import io.trino.spi.session.PropertyMetadata; +import io.trino.spi.type.MapType; +import io.trino.spi.type.TypeManager; + +import java.util.Map; + +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ADD_FILES_FROM_TABLE; +import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly; +import static io.trino.spi.session.PropertyMetadata.enumProperty; +import static io.trino.spi.session.PropertyMetadata.stringProperty; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.util.Objects.requireNonNull; + +public class AddFilesTableFromTableProcedure + implements Provider +{ + private final TypeManager typeManager; + + @Inject + public AddFilesTableFromTableProcedure(TypeManager typeManager) + { + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + } + + @Override + public TableProcedureMetadata get() + { + return new TableProcedureMetadata( + ADD_FILES_FROM_TABLE.name(), + coordinatorOnly(), + ImmutableList.>builder() + .add(stringProperty( + "schema_name", + "Source schema name", + null, + false)) + .add(stringProperty( + "table_name", + "Source table name", + null, + false)) + .add(new PropertyMetadata<>( + "partition_filter", + "Partition filter", + new MapType(VARCHAR, VARCHAR, typeManager.getTypeOperators()), + Map.class, + null, + false, + object -> (Map) object, + Object::toString)) + .add(enumProperty( + "recursive_directory", + "Recursive directory", + RecursiveDirectory.class, + RecursiveDirectory.FAIL, + false)) + .build()); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/AddFilesTableProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/AddFilesTableProcedure.java new file mode 100644 index 0000000000000..3319c63e8dd76 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/AddFilesTableProcedure.java @@ -0,0 +1,62 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Provider; +import io.trino.plugin.hive.HiveStorageFormat; +import io.trino.plugin.iceberg.procedure.MigrationUtils.RecursiveDirectory; +import io.trino.spi.connector.TableProcedureMetadata; +import io.trino.spi.session.PropertyMetadata; + +import static io.trino.plugin.base.util.Procedures.checkProcedureArgument; +import static io.trino.plugin.hive.HiveStorageFormat.AVRO; +import static io.trino.plugin.hive.HiveStorageFormat.ORC; +import static io.trino.plugin.hive.HiveStorageFormat.PARQUET; +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ADD_FILES; +import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly; +import static io.trino.spi.session.PropertyMetadata.enumProperty; +import static io.trino.spi.session.PropertyMetadata.stringProperty; + +public class AddFilesTableProcedure + implements Provider +{ + @Override + public TableProcedureMetadata get() + { + return new TableProcedureMetadata( + ADD_FILES.name(), + coordinatorOnly(), + ImmutableList.>builder() + .add(stringProperty( + "location", + "location", + null, + false)) + .add(enumProperty( + "format", + "File format", + HiveStorageFormat.class, + null, + value -> checkProcedureArgument(value == ORC || value == PARQUET || value == AVRO, "The procedure does not support storage format: %s", value), + false)) + .add(enumProperty( + "recursive_directory", + "Recursive directory", + RecursiveDirectory.class, + RecursiveDirectory.FAIL, + false)) + .build()); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergAddFilesFromTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergAddFilesFromTableHandle.java new file mode 100644 index 0000000000000..1a3d313a28f12 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergAddFilesFromTableHandle.java @@ -0,0 +1,34 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import io.trino.plugin.iceberg.procedure.MigrationUtils.RecursiveDirectory; +import jakarta.annotation.Nullable; + +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +public record IcebergAddFilesFromTableHandle( + io.trino.metastore.Table table, + @Nullable Map partitionFilter, + RecursiveDirectory recursiveDirectory) + implements IcebergProcedureHandle +{ + public IcebergAddFilesFromTableHandle + { + requireNonNull(table, "table is null"); + requireNonNull(recursiveDirectory, "recursiveDirectory is null"); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergAddFilesHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergAddFilesHandle.java new file mode 100644 index 0000000000000..6fd365a540893 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergAddFilesHandle.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import io.trino.plugin.hive.HiveStorageFormat; +import io.trino.plugin.iceberg.procedure.MigrationUtils.RecursiveDirectory; + +import static java.util.Objects.requireNonNull; + +public record IcebergAddFilesHandle(String location, HiveStorageFormat format, RecursiveDirectory recursiveDirectory) + implements IcebergProcedureHandle +{ + public IcebergAddFilesHandle + { + requireNonNull(location, "location is null"); + requireNonNull(format, "format is null"); + requireNonNull(recursiveDirectory, "recursiveDirectory is null"); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java index 671a92c12d43b..1577b867363e0 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java @@ -24,5 +24,7 @@ @JsonSubTypes.Type(value = IcebergExpireSnapshotsHandle.class, name = "expire_snapshots"), @JsonSubTypes.Type(value = IcebergOptimizeHandle.class, name = "optimize"), @JsonSubTypes.Type(value = IcebergRemoveOrphanFilesHandle.class, name = "remove_orphan_files"), + @JsonSubTypes.Type(value = IcebergAddFilesHandle.class, name = "add_files"), + @JsonSubTypes.Type(value = IcebergAddFilesFromTableHandle.class, name = "add_files_from_table"), }) public interface IcebergProcedureHandle {} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java index 8b1c68fb23ed5..31884f752469f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java @@ -19,4 +19,6 @@ public enum IcebergTableProcedureId DROP_EXTENDED_STATS, EXPIRE_SNAPSHOTS, REMOVE_ORPHAN_FILES, + ADD_FILES, + ADD_FILES_FROM_TABLE, } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrateProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrateProcedure.java index caaead5fc3f48..8cc17bcb464b8 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrateProcedure.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrateProcedure.java @@ -19,35 +19,22 @@ import com.google.inject.Inject; import com.google.inject.Provider; import io.airlift.log.Logger; -import io.trino.filesystem.FileEntry; -import io.trino.filesystem.FileIterator; -import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; -import io.trino.filesystem.TrinoInputFile; import io.trino.metastore.Column; import io.trino.metastore.HiveMetastore; import io.trino.metastore.Partition; import io.trino.metastore.PrincipalPrivileges; import io.trino.metastore.Storage; -import io.trino.parquet.ParquetDataSource; -import io.trino.parquet.ParquetReaderOptions; -import io.trino.parquet.metadata.ParquetMetadata; -import io.trino.parquet.reader.MetadataReader; -import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.HiveStorageFormat; import io.trino.plugin.hive.metastore.HiveMetastoreFactory; import io.trino.plugin.hive.metastore.RawHiveMetastoreFactory; -import io.trino.plugin.hive.parquet.TrinoParquetDataSource; import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.IcebergFileFormat; import io.trino.plugin.iceberg.IcebergSecurityConfig; -import io.trino.plugin.iceberg.PartitionData; import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; -import io.trino.plugin.iceberg.fileio.ForwardingInputFile; -import io.trino.plugin.iceberg.util.OrcMetrics; -import io.trino.plugin.iceberg.util.ParquetUtil; +import io.trino.plugin.iceberg.procedure.MigrationUtils.RecursiveDirectory; import io.trino.spi.TrinoException; import io.trino.spi.classloader.ThreadContextClassLoader; import io.trino.spi.connector.ConnectorSession; @@ -60,21 +47,16 @@ import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; -import org.apache.iceberg.Metrics; -import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; -import org.apache.iceberg.avro.Avro; import org.apache.iceberg.mapping.MappingUtil; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; -import java.io.IOException; -import java.io.UncheckedIOException; import java.lang.invoke.MethodHandle; import java.util.ArrayList; import java.util.HashMap; @@ -82,9 +64,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Stream; -import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Streams.concat; import static io.airlift.slice.Slices.utf8Slice; @@ -100,6 +80,7 @@ import static io.trino.plugin.iceberg.IcebergSecurityConfig.IcebergSecurity.SYSTEM; import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields; import static io.trino.plugin.iceberg.TypeConverter.toIcebergTypeForNewColumn; +import static io.trino.plugin.iceberg.procedure.MigrationUtils.buildDataFiles; import static io.trino.spi.StandardErrorCode.INVALID_PROCEDURE_ARGUMENT; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.type.SmallintType.SMALLINT; @@ -125,7 +106,6 @@ public class MigrateProcedure public static final String PROVIDER_PROPERTY_KEY = "provider"; public static final String PROVIDER_PROPERTY_VALUE = "iceberg"; - private static final MetricsConfig METRICS_CONFIG = MetricsConfig.getDefault(); private final TrinoCatalogFactory catalogFactory; private final HiveMetastoreFactory metastoreFactory; @@ -134,14 +114,6 @@ public class MigrateProcedure private final int formatVersion; private final boolean isUsingSystemSecurity; - private enum RecursiveDirectory - { - TRUE, - FALSE, - FAIL, - /**/ - } - private static final MethodHandle MIGRATE; static { @@ -227,10 +199,11 @@ public void doMigrate(ConnectorSession session, String schemaName, String tableN Map properties = icebergTableProperties(location, hiveTable.getParameters(), nameMapping, toIcebergFileFormat(storageFormat)); PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitionColumnNames(hiveTable)); try { + TrinoFileSystem fileSystem = fileSystemFactory.create(session); ImmutableList.Builder dataFilesBuilder = ImmutableList.builder(); if (hiveTable.getPartitionColumns().isEmpty()) { log.debug("Building data files from %s", location); - dataFilesBuilder.addAll(buildDataFiles(session, recursive, storageFormat, location, partitionSpec, new PartitionData(new Object[0]), schema)); + dataFilesBuilder.addAll(buildDataFiles(fileSystem, recursive, storageFormat, location, partitionSpec, Optional.empty(), schema)); } else { Map> partitions = listAllPartitions(metastore, hiveTable); @@ -240,7 +213,7 @@ public void doMigrate(ConnectorSession session, String schemaName, String tableN log.debug("Building data files from '%s' for partition %d of %d", storage.getLocation(), fileCount++, partitions.size()); HiveStorageFormat partitionStorageFormat = extractHiveStorageFormat(storage.getStorageFormat()); StructLike partitionData = DataFiles.data(partitionSpec, partition.getKey()); - dataFilesBuilder.addAll(buildDataFiles(session, recursive, partitionStorageFormat, storage.getLocation(), partitionSpec, partitionData, schema)); + dataFilesBuilder.addAll(buildDataFiles(fileSystem, recursive, partitionStorageFormat, storage.getLocation(), partitionSpec, Optional.of(partitionData), schema)); } } @@ -333,50 +306,6 @@ public Map> listAllPartitions(HiveMetastore metastor return metastore.getPartitionsByNames(table, partitions.get()); } - private List buildDataFiles( - ConnectorSession session, - RecursiveDirectory recursive, - HiveStorageFormat format, - String location, - PartitionSpec partitionSpec, - StructLike partition, - Schema schema) - throws IOException - { - // TODO: Introduce parallelism - TrinoFileSystem fileSystem = fileSystemFactory.create(session); - FileIterator files = fileSystem.listFiles(Location.of(location)); - ImmutableList.Builder dataFilesBuilder = ImmutableList.builder(); - while (files.hasNext()) { - FileEntry file = files.next(); - String fileLocation = file.location().toString(); - String relativePath = fileLocation.substring(location.length()); - if (relativePath.contains("/_") || relativePath.contains("/.")) { - continue; - } - if (recursive == RecursiveDirectory.FALSE && isRecursive(location, fileLocation)) { - continue; - } - if (recursive == RecursiveDirectory.FAIL && isRecursive(location, fileLocation)) { - throw new TrinoException(NOT_SUPPORTED, "Recursive directory must not exist when recursive_directory argument is 'fail': " + file.location()); - } - - Metrics metrics = loadMetrics(fileSystem.newInputFile(file.location()), format, schema); - DataFile dataFile = buildDataFile(file, partition, partitionSpec, format.name(), metrics); - dataFilesBuilder.add(dataFile); - } - List dataFiles = dataFilesBuilder.build(); - log.debug("Found %d files in '%s'", dataFiles.size(), location); - return dataFiles; - } - - private static boolean isRecursive(String baseLocation, String location) - { - verify(location.startsWith(baseLocation), "%s should start with %s", location, baseLocation); - String suffix = location.substring(baseLocation.length() + 1).replaceFirst("^/+", ""); - return suffix.contains("/"); - } - private static IcebergFileFormat toIcebergFileFormat(HiveStorageFormat storageFormat) { return switch (storageFormat) { @@ -387,27 +316,6 @@ private static IcebergFileFormat toIcebergFileFormat(HiveStorageFormat storageFo }; } - private static Metrics loadMetrics(TrinoInputFile file, HiveStorageFormat storageFormat, Schema schema) - { - return switch (storageFormat) { - case ORC -> OrcMetrics.fileMetrics(file, METRICS_CONFIG, schema); - case PARQUET -> parquetMetrics(file, METRICS_CONFIG, MappingUtil.create(schema)); - case AVRO -> new Metrics(Avro.rowCount(new ForwardingInputFile(file)), null, null, null, null); - default -> throw new TrinoException(NOT_SUPPORTED, "Unsupported storage format: " + storageFormat); - }; - } - - private static Metrics parquetMetrics(TrinoInputFile file, MetricsConfig metricsConfig, NameMapping nameMapping) - { - try (ParquetDataSource dataSource = new TrinoParquetDataSource(file, new ParquetReaderOptions(), new FileFormatDataSourceStats())) { - ParquetMetadata metadata = MetadataReader.readFooter(dataSource, Optional.empty()); - return ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsConfig, nameMapping); - } - catch (IOException e) { - throw new UncheckedIOException("Failed to read file footer: " + file.location(), e); - } - } - private static List toPartitionFields(io.trino.metastore.Table table) { ImmutableList.Builder fields = ImmutableList.builder(); @@ -421,15 +329,4 @@ private static List getPartitionColumnNames(io.trino.metastore.Table tab .map(Column::getName) .collect(toImmutableList()); } - - private static DataFile buildDataFile(FileEntry file, StructLike partition, PartitionSpec spec, String format, Metrics metrics) - { - return DataFiles.builder(spec) - .withPath(file.location().toString()) - .withFormat(format) - .withFileSizeInBytes(file.length()) - .withMetrics(metrics) - .withPartition(partition) - .build(); - } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrationUtils.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrationUtils.java new file mode 100644 index 0000000000000..45547b46fc569 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrationUtils.java @@ -0,0 +1,306 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import io.airlift.log.Logger; +import io.trino.filesystem.FileEntry; +import io.trino.filesystem.FileIterator; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoInputFile; +import io.trino.metastore.HiveMetastore; +import io.trino.metastore.Partition; +import io.trino.metastore.Storage; +import io.trino.parquet.ParquetDataSource; +import io.trino.parquet.ParquetReaderOptions; +import io.trino.parquet.metadata.ParquetMetadata; +import io.trino.parquet.reader.MetadataReader; +import io.trino.plugin.hive.FileFormatDataSourceStats; +import io.trino.plugin.hive.HiveStorageFormat; +import io.trino.plugin.hive.metastore.HiveMetastoreFactory; +import io.trino.plugin.hive.parquet.TrinoParquetDataSource; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.fileio.ForwardingInputFile; +import io.trino.plugin.iceberg.util.OrcMetrics; +import io.trino.plugin.iceberg.util.ParquetUtil; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.SchemaTableName; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.types.Types; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Stream; + +import static com.google.common.base.MoreObjects.firstNonNull; +import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.trino.plugin.base.util.Procedures.checkProcedureArgument; +import static io.trino.plugin.hive.HiveMetadata.extractHiveStorageFormat; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR; +import static io.trino.plugin.iceberg.IcebergSessionProperties.isMergeManifestsOnWrite; +import static io.trino.spi.StandardErrorCode.CONSTRAINT_VIOLATION; +import static io.trino.spi.StandardErrorCode.NOT_FOUND; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; +import static org.apache.iceberg.mapping.NameMappingParser.toJson; + +public final class MigrationUtils +{ + private static final Logger log = Logger.get(MigrationUtils.class); + private static final Joiner.MapJoiner PARTITION_JOINER = Joiner.on("/").withKeyValueSeparator("="); + + private static final MetricsConfig METRICS_CONFIG = MetricsConfig.getDefault(); + + public enum RecursiveDirectory + { + TRUE, + FALSE, + FAIL, + /**/ + } + + private MigrationUtils() {} + + public static List buildDataFiles( + TrinoFileSystem fileSystem, + RecursiveDirectory recursive, + HiveStorageFormat format, + String location, + PartitionSpec partitionSpec, + Optional partition, + Schema schema) + throws IOException + { + // TODO: Introduce parallelism + FileIterator files = fileSystem.listFiles(Location.of(location)); + ImmutableList.Builder dataFilesBuilder = ImmutableList.builder(); + while (files.hasNext()) { + FileEntry file = files.next(); + String fileLocation = file.location().toString(); + String relativePath = fileLocation.substring(location.length()); + if (relativePath.contains("/_") || relativePath.contains("/.")) { + continue; + } + if (recursive == RecursiveDirectory.FALSE && isRecursive(location, fileLocation)) { + continue; + } + if (recursive == RecursiveDirectory.FAIL && isRecursive(location, fileLocation)) { + throw new TrinoException(NOT_SUPPORTED, "Recursive directory must not exist when recursive_directory argument is 'fail': " + file.location()); + } + + Metrics metrics = loadMetrics(fileSystem.newInputFile(file.location(), file.length()), format, schema); + DataFile dataFile = buildDataFile(fileLocation, file.length(), partition, partitionSpec, format.name(), metrics); + dataFilesBuilder.add(dataFile); + } + List dataFiles = dataFilesBuilder.build(); + log.debug("Found %d files in '%s'", dataFiles.size(), location); + return dataFiles; + } + + private static boolean isRecursive(String baseLocation, String location) + { + verify(location.startsWith(baseLocation), "%s should start with %s", location, baseLocation); + String suffix = location.substring(baseLocation.length() + 1).replaceFirst("^/+", ""); + return suffix.contains("/"); + } + + public static Metrics loadMetrics(TrinoInputFile file, HiveStorageFormat storageFormat, Schema schema) + { + return switch (storageFormat) { + case ORC -> OrcMetrics.fileMetrics(file, METRICS_CONFIG, schema); + case PARQUET -> parquetMetrics(file, METRICS_CONFIG, MappingUtil.create(schema)); + case AVRO -> new Metrics(Avro.rowCount(new ForwardingInputFile(file)), null, null, null, null); + default -> throw new TrinoException(NOT_SUPPORTED, "Unsupported storage format: " + storageFormat); + }; + } + + private static Metrics parquetMetrics(TrinoInputFile file, MetricsConfig metricsConfig, NameMapping nameMapping) + { + try (ParquetDataSource dataSource = new TrinoParquetDataSource(file, new ParquetReaderOptions(), new FileFormatDataSourceStats())) { + ParquetMetadata metadata = MetadataReader.readFooter(dataSource, Optional.empty()); + return ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsConfig, nameMapping); + } + catch (IOException e) { + throw new UncheckedIOException("Failed to read file footer: " + file.location(), e); + } + } + + public static void addFiles( + ConnectorSession session, + TrinoFileSystem fileSystem, + TrinoCatalog catalog, + SchemaTableName targetName, + String location, + HiveStorageFormat format, + RecursiveDirectory recursiveDirectory) + { + Table table = catalog.loadTable(session, targetName); + PartitionSpec partitionSpec = table.spec(); + + checkProcedureArgument(partitionSpec.isUnpartitioned(), "The procedure does not support partitioned tables"); + + try { + List dataFiles = buildDataFilesFromLocation(fileSystem, recursiveDirectory, format, location, partitionSpec, Optional.empty(), table.schema()); + addFiles(session, table, dataFiles); + } + catch (Exception e) { + throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to add files: " + firstNonNull(e.getMessage(), e), e); + } + } + + private static List buildDataFilesFromLocation( + TrinoFileSystem fileSystem, + RecursiveDirectory recursive, + HiveStorageFormat format, + String location, + PartitionSpec partitionSpec, + Optional partition, + Schema schema) + throws IOException + { + if (fileSystem.directoryExists(Location.of(location)).orElse(false)) { + return MigrationUtils.buildDataFiles(fileSystem, recursive, format, location, partitionSpec, partition, schema); + } + + TrinoInputFile file = fileSystem.newInputFile(Location.of(location)); + if (file.exists()) { + Metrics metrics = loadMetrics(file, format, schema); + return ImmutableList.of(buildDataFile(file.location().toString(), file.length(), partition, partitionSpec, format.name(), metrics)); + } + + throw new TrinoException(NOT_FOUND, "Location not found: " + location); + } + + public static void addFilesFromTable( + ConnectorSession session, + TrinoFileSystem fileSystem, + HiveMetastoreFactory metastoreFactory, + Table targetTable, + io.trino.metastore.Table sourceTable, + Map partitionFilter, + RecursiveDirectory recursiveDirectory) + { + HiveMetastore metastore = metastoreFactory.createMetastore(Optional.of(session.getIdentity())); + + PartitionSpec partitionSpec = targetTable.spec(); + Schema schema = targetTable.schema(); + NameMapping nameMapping = MappingUtil.create(schema); + + HiveStorageFormat storageFormat = extractHiveStorageFormat(sourceTable.getStorage().getStorageFormat()); + String location = sourceTable.getStorage().getLocation(); + + try { + ImmutableList.Builder dataFilesBuilder = ImmutableList.builder(); + if (partitionSpec.isUnpartitioned()) { + log.debug("Building data files from %s", location); + dataFilesBuilder.addAll(buildDataFiles(fileSystem, recursiveDirectory, storageFormat, location, partitionSpec, Optional.empty(), schema)); + } + else { + List partitionNames = partitionFilter == null ? ImmutableList.of() : ImmutableList.of(PARTITION_JOINER.join(partitionFilter)); + Map> partitions = metastore.getPartitionsByNames(sourceTable, partitionNames); + for (Map.Entry> partition : partitions.entrySet()) { + Storage storage = partition.getValue().orElseThrow(() -> new IllegalArgumentException("Invalid partition: " + partition.getKey())).getStorage(); + log.debug("Building data files from partition: %s", partition); + HiveStorageFormat partitionStorageFormat = extractHiveStorageFormat(storage.getStorageFormat()); + StructLike partitionData = DataFiles.data(partitionSpec, partition.getKey()); + dataFilesBuilder.addAll(buildDataFiles(fileSystem, recursiveDirectory, partitionStorageFormat, storage.getLocation(), partitionSpec, Optional.of(partitionData), schema)); + } + } + + log.debug("Start new transaction"); + Transaction transaction = targetTable.newTransaction(); + if (!targetTable.properties().containsKey(DEFAULT_NAME_MAPPING)) { + log.debug("Update default name mapping property"); + transaction.updateProperties() + .set(DEFAULT_NAME_MAPPING, toJson(nameMapping)) + .commit(); + } + addFiles(session, targetTable, dataFilesBuilder.build()); + } + catch (Exception e) { + throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to add files: " + firstNonNull(e.getMessage(), e), e); + } + } + + public static DataFile buildDataFile(String path, long length, Optional partition, PartitionSpec spec, String format, Metrics metrics) + { + DataFiles.Builder dataFile = DataFiles.builder(spec) + .withPath(path) + .withFormat(format) + .withFileSizeInBytes(length) + .withMetrics(metrics); + partition.ifPresent(dataFile::withPartition); + return dataFile.build(); + } + + public static void addFiles(ConnectorSession session, Table table, List dataFiles) + { + Schema schema = table.schema(); + Set requiredFields = schema.columns().stream() + .filter(Types.NestedField::isRequired) + .map(Types.NestedField::fieldId) + .collect(toImmutableSet()); + + if (!requiredFields.isEmpty()) { + for (DataFile dataFile : dataFiles) { + Map nullValueCounts = dataFile.nullValueCounts(); + for (Integer field : requiredFields) { + if (nullValueCounts.get(field) > 0) { + throw new TrinoException(CONSTRAINT_VIOLATION, "NULL value not allowed for NOT NULL column: " + schema.findField(field).name()); + } + } + } + } + + try { + log.debug("Start new transaction"); + Transaction transaction = table.newTransaction(); + if (!table.properties().containsKey(DEFAULT_NAME_MAPPING)) { + log.debug("Update default name mapping property"); + transaction.updateProperties() + .set(DEFAULT_NAME_MAPPING, toJson(MappingUtil.create(schema))) + .commit(); + } + log.debug("Append data %d data files", dataFiles.size()); + AppendFiles appendFiles = isMergeManifestsOnWrite(session) ? transaction.newAppend() : transaction.newFastAppend(); + dataFiles.forEach(appendFiles::appendFile); + appendFiles.commit(); + transaction.commitTransaction(); + log.debug("Successfully added files to %s table", table.name()); + } + catch (Exception e) { + throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to add files: " + firstNonNull(e.getMessage(), e), e); + } + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java index 286622dda9b47..2c0aa1907562d 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java @@ -66,6 +66,7 @@ public void testDefaults() .setHideMaterializedViewStorageTable(true) .setMaterializedViewsStorageSchema(null) .setRegisterTableProcedureEnabled(false) + .setAddFilesProcedureEnabled(false) .setSortedWritingEnabled(true) .setQueryPartitionFilterRequired(false) .setQueryPartitionFilterRequiredSchemas(ImmutableSet.of()) @@ -100,6 +101,7 @@ public void testExplicitPropertyMappings() .put("iceberg.materialized-views.hide-storage-table", "false") .put("iceberg.materialized-views.storage-schema", "mv_storage_schema") .put("iceberg.register-table-procedure.enabled", "true") + .put("iceberg.add_files-procedure.enabled", "true") .put("iceberg.sorted-writing-enabled", "false") .put("iceberg.query-partition-filter-required", "true") .put("iceberg.query-partition-filter-required-schemas", "bronze,silver") @@ -131,6 +133,7 @@ public void testExplicitPropertyMappings() .setHideMaterializedViewStorageTable(false) .setMaterializedViewsStorageSchema("mv_storage_schema") .setRegisterTableProcedureEnabled(true) + .setAddFilesProcedureEnabled(true) .setSortedWritingEnabled(false) .setQueryPartitionFilterRequired(true) .setQueryPartitionFilterRequiredSchemas(ImmutableSet.of("bronze", "silver")) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java index 97d15fc681f2d..82b927808c63c 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java @@ -117,7 +117,9 @@ public void testNonLowercaseNamespace() (connectorIdentity, fileIoProperties) -> { throw new UnsupportedOperationException(); }, - new TableStatisticsWriter(new NodeVersion("test-version"))); + new TableStatisticsWriter(new NodeVersion("test-version")), + Optional.empty(), + false); assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)") .isFalse(); assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)") diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java index 492dbf3fa8540..a944c6dd00264 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java @@ -134,7 +134,9 @@ public void testNonLowercaseGlueDatabase() (connectorIdentity, fileIoProperties) -> { throw new UnsupportedOperationException(); }, - new TableStatisticsWriter(new NodeVersion("test-version"))); + new TableStatisticsWriter(new NodeVersion("test-version")), + Optional.empty(), + false); assertThat(icebergMetadata.schemaExists(SESSION, databaseName)).as("icebergMetadata.schemaExists(databaseName)") .isFalse(); assertThat(icebergMetadata.schemaExists(SESSION, trinoSchemaName)).as("icebergMetadata.schemaExists(trinoSchemaName)") diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java index 13a94790763b5..306d7522f48f5 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java @@ -44,6 +44,7 @@ import java.net.URI; import java.nio.file.Path; import java.util.Map; +import java.util.Optional; import static io.airlift.json.JsonCodec.jsonCodec; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; @@ -185,7 +186,9 @@ public void testNonLowercaseNamespace() (connectorIdentity, fileIoProperties) -> { throw new UnsupportedOperationException(); }, - new TableStatisticsWriter(new NodeVersion("test-version"))); + new TableStatisticsWriter(new NodeVersion("test-version")), + Optional.empty(), + false); assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)") .isTrue(); assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)") diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java index ed54ed24fa217..e3118668800ca 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java @@ -111,7 +111,9 @@ public void testNonLowercaseNamespace() (connectorIdentity, fileIoProperties) -> { throw new UnsupportedOperationException(); }, - new TableStatisticsWriter(new NodeVersion("test-version"))); + new TableStatisticsWriter(new NodeVersion("test-version")), + Optional.empty(), + false); assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)") .isTrue(); assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)") diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/procedure/TestIcebergAddFilesProcedure.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/procedure/TestIcebergAddFilesProcedure.java new file mode 100644 index 0000000000000..ead6a7dde0c62 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/procedure/TestIcebergAddFilesProcedure.java @@ -0,0 +1,576 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import com.google.common.collect.ImmutableMap; +import io.trino.filesystem.Location; +import io.trino.plugin.hive.TestingHivePlugin; +import io.trino.plugin.iceberg.IcebergQueryRunner; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import org.junit.jupiter.api.Test; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + +import static io.trino.testing.TestingNames.randomNameSuffix; + +final class TestIcebergAddFilesProcedure + extends AbstractTestQueryFramework +{ + private Path dataDirectory; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + dataDirectory = Files.createTempDirectory("_test_hidden"); + QueryRunner queryRunner = IcebergQueryRunner.builder() + .setMetastoreDirectory(dataDirectory.toFile()) + .addIcebergProperty("iceberg.add_files-procedure.enabled", "true") + .build(); + + queryRunner.installPlugin(new TestingHivePlugin(dataDirectory)); + queryRunner.createCatalog("hive", "hive", ImmutableMap.builder() + .put("hive.security", "allow-all") + .buildOrThrow()); + + return queryRunner; + } + + @Test + void testAddFilesFomTable() + { + String hiveTableName = "test_add_files_" + randomNameSuffix(); + String icebergTableName = "test_add_files_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " AS SELECT 1 x", 1); + assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " AS SELECT 2 x", 1); + + assertUpdate("ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "')"); + + assertQuery("SELECT * FROM hive.tpch." + hiveTableName, "VALUES 1"); + assertQuery("SELECT * FROM iceberg.tpch." + icebergTableName, "VALUES 1, 2"); + + assertUpdate("DROP TABLE hive.tpch." + hiveTableName); + assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName); + } + + @Test + void testAddFilesNotNull() + { + String hiveTableName = "test_add_files_" + randomNameSuffix(); + String icebergTableName = "test_add_files_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " AS SELECT 1 x", 1); + assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + "(x int NOT NULL)"); + + assertUpdate("ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "')"); + assertQuery("SELECT * FROM hive.tpch." + hiveTableName, "VALUES 1"); + assertQuery("SELECT * FROM iceberg.tpch." + icebergTableName, "VALUES 1"); + + assertUpdate("DROP TABLE hive.tpch." + hiveTableName); + assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName); + } + + @Test + void testAddFilesNotNullViolation() + { + String hiveTableName = "test_add_files_" + randomNameSuffix(); + String icebergTableName = "test_add_files_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " AS SELECT CAST(NULL AS int) x", 1); + assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + "(x int NOT NULL)"); + + String path = (String) computeScalar("SELECT \"$path\" FROM hive.tpch." + hiveTableName); + assertQueryFails( + "ALTER TABLE " + icebergTableName + " EXECUTE add_files('" + path + "', 'ORC')", + ".*NULL value not allowed for NOT NULL column: x"); + assertQueryFails( + "ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "')", + ".*NULL value not allowed for NOT NULL column: x"); + + assertQueryReturnsEmptyResult("SELECT * FROM iceberg.tpch." + icebergTableName); + + assertUpdate("DROP TABLE hive.tpch." + hiveTableName); + assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName); + } + + @Test + void testAddFilesDifferentFileFormat() + { + testAddFilesDifferentFileFormat("PARQUET", "ORC"); + testAddFilesDifferentFileFormat("PARQUET", "AVRO"); + testAddFilesDifferentFileFormat("ORC", "PARQUET"); + testAddFilesDifferentFileFormat("ORC", "AVRO"); + testAddFilesDifferentFileFormat("AVRO", "PARQUET"); + testAddFilesDifferentFileFormat("AVRO", "ORC"); + } + + private void testAddFilesDifferentFileFormat(String hiveFormat, String icebergFormat) + { + String hiveTableName = "test_add_files_" + randomNameSuffix(); + String icebergTableName = "test_add_files_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " WITH (format = '" + icebergFormat + "') AS SELECT 1 x", 1); + assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " WITH (format = '" + hiveFormat + "') AS SELECT 2 x", 1); + + assertUpdate("ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "')"); + + assertQuery("SELECT * FROM " + icebergTableName, "VALUES 1, 2"); + + assertUpdate("DROP TABLE hive.tpch." + hiveTableName); + assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName); + } + + @Test + void testAddFilesAcrossSchema() + { + String hiveSchemaName = "test_schema" + randomNameSuffix(); + String hiveTableName = "test_add_files_" + randomNameSuffix(); + String icebergTableName = "test_add_files_" + randomNameSuffix(); + + assertUpdate("CREATE SCHEMA hive." + hiveSchemaName); + + assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " WITH (format = 'PARQUET') AS SELECT 1 x", 1); + assertUpdate("CREATE TABLE hive." + hiveSchemaName + "." + hiveTableName + " WITH (format = 'ORC') AS SELECT 2 x", 1); + + assertUpdate("ALTER TABLE tpch." + icebergTableName + " EXECUTE add_files_from_table('" + hiveSchemaName + "', '" + hiveTableName + "')"); + + assertQuery("SELECT * FROM iceberg.tpch." + icebergTableName, "VALUES 1, 2"); + + assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName); + assertUpdate("DROP SCHEMA hive." + hiveSchemaName + " CASCADE "); + } + + @Test + void testAddFilesTypeMismatch() + { + String hiveTableName = "test_add_files_" + randomNameSuffix(); + String icebergTableName = "test_add_files_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " WITH (format = 'ORC') AS SELECT '1' x", 1); + assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " WITH (format = 'ORC') AS SELECT 2 x", 1); + + assertQueryFails( + "ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "')", + "Target 'x' column is 'string' type, but got source 'int' type"); + + assertUpdate("DROP TABLE hive.tpch." + hiveTableName); + assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName); + } + + @Test + void testAddFilesDifferentDataColumnDefinitions() + { + for (String format : List.of("ORC", "PARQUET", "AVRO")) { + String hiveTableName = "test_add_files_" + randomNameSuffix(); + String icebergTableName = "test_add_files_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " WITH (format = '" + format + "') AS SELECT 1 x", 1); + assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " WITH (format = '" + format + "') AS SELECT 2 y", 1); + + assertQueryFails("ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "')", "Column 'x' does not exist"); + + assertUpdate("DROP TABLE hive.tpch." + hiveTableName); + assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName); + } + } + + @Test + void testAddFilesDifferentPartitionColumnDefinitions() + { + String hiveTableName = "test_add_files_" + randomNameSuffix(); + String icebergTableName = "test_add_files_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " WITH (partitioned_by = ARRAY['hive_part']) AS SELECT 1 x, 10 hive_part", 1); + assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " WITH (partitioning = ARRAY['iceberg_part']) AS SELECT 2 x, 20 iceberg_part", 1); + + assertQueryFails( + "ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "', map(ARRAY['hive_part'], ARRAY['10']))", + "Column 'hive_part' does not exist"); + + assertUpdate("DROP TABLE hive.tpch." + hiveTableName); + assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName); + } + + @Test + void testAddFilesPartitionFilter() + { + String hiveTableName = "test_add_files_partition_" + randomNameSuffix(); + String icebergTableName = "test_add_files_partition_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + "(id int, part varchar) WITH (partitioning = ARRAY['part'])"); + assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + "(id int, part varchar) WITH (partitioned_by = ARRAY['part'])"); + assertUpdate("INSERT INTO hive.tpch." + hiveTableName + " VALUES (1, 'test1'), (2, 'test2'), (3, 'test3'), (4, 'test4')", 4); + + assertUpdate("ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "', map(ARRAY['part'], ARRAY['test1']))"); + assertQuery("SELECT * FROM " + icebergTableName, "VALUES (1, 'test1')"); + + assertUpdate("ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "', map(ARRAY['part'], ARRAY['test2']))"); + assertQuery("SELECT * FROM " + icebergTableName, "VALUES (1, 'test1'), (2, 'test2')"); + + // no-partition filter on the partitioned table + assertQueryFails( + "ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "')", + "partition_filter argument must be provided for partitioned tables"); + assertQuery("SELECT * FROM " + icebergTableName, "VALUES (1, 'test1'), (2, 'test2')"); + + // empty partition filter on the partitioned table + assertQueryFails( + "ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "', map())", + ".* partition value count must match partition column count"); + assertQuery("SELECT * FROM " + icebergTableName, "VALUES (1, 'test1'), (2, 'test2')"); + + assertUpdate("DROP TABLE hive.tpch." + hiveTableName); + assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName); + } + + @Test + void tetAddFilesNonPartitionTableWithPartitionFilter() + { + String hiveTableName = "test_add_files_non_partition_" + randomNameSuffix(); + String icebergTableName = "test_add_files_partition_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + "(id int, part varchar) WITH (partitioning = ARRAY['part'])"); + assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + "(id int)"); + + assertQueryFails( + "ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "', map(ARRAY['part'], ARRAY['test1']))", + "Partition filter is not supported for non-partitioned tables"); + + assertUpdate("DROP TABLE hive.tpch." + hiveTableName); + assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName); + } + + @Test + void tetAddFilesInvalidPartitionFilter() + { + String hiveTableName = "test_add_files_partition_" + randomNameSuffix(); + String icebergTableName = "test_add_files_partition_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + "(id int, part varchar) WITH (partitioning = ARRAY['part'])"); + assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + "(id int, part varchar) WITH (partitioned_by = ARRAY['part'])"); + + // Invalid partition key + assertQueryFails( + "ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "', map(ARRAY['invalid_part'], ARRAY['test1']))", + ".*Invalid partition: invalid_part=test1"); + // Invalid partition value + assertQueryFails( + "ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "', map(ARRAY['part'], ARRAY['invalid_value']))", + ".*Invalid partition: part=invalid_value"); + + assertUpdate("DROP TABLE hive.tpch." + hiveTableName); + assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName); + } + + @Test + void testAddFilesNestedPartitionFilter() + { + String hiveTableName = "test_add_files_nested_partition_" + randomNameSuffix(); + String icebergTableName = "test_add_files_nested_partition_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + "(id int, parent varchar, child varchar) WITH (partitioning = ARRAY['parent', 'child'])"); + assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + "(id int, parent varchar, child varchar) WITH (partitioned_by = ARRAY['parent', 'child'])"); + assertUpdate("INSERT INTO hive.tpch." + hiveTableName + " VALUES (1, 'parent1', 'child1'), (2, 'parent1', 'child2'), (3, 'parent2', 'child3'), (4, 'parent2', 'child4')", 4); + + assertUpdate("ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "', map(ARRAY['parent', 'child'], ARRAY['parent1', 'child1']))"); + assertQuery("SELECT * FROM " + icebergTableName, "VALUES (1, 'parent1', 'child1')"); + + assertUpdate("ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "', map(ARRAY['parent', 'child'], ARRAY['parent1', 'child2']))"); + assertQuery("SELECT * FROM " + icebergTableName, "VALUES (1, 'parent1', 'child1'), (2, 'parent1', 'child2')"); + + // TODO: Add support for partial partition filters + assertQueryFails( + "ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "', map(ARRAY['parent'], ARRAY['parent2']))", + ".*partition value count must match partition column count.*"); + + assertUpdate("DROP TABLE hive.tpch." + hiveTableName); + assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName); + } + + @Test + void testAddFilesWithRecursiveDirectory() + throws Exception + { + String hiveTableName = "test_migrate_recursive_directory_" + randomNameSuffix(); + String icebergTableName = "test_migrate_recursive_directory_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " WITH (format = 'ORC') AS SELECT 1 x", 1); + assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + "(x int)"); + + // Move a file to the nested directory + String path = (String) computeScalar("SELECT \"$path\" FROM hive.tpch." + hiveTableName); + String fileName = Location.of(path).fileName(); + Path tableLocation = dataDirectory.resolve("tpch").resolve(hiveTableName); + Files.createDirectory(tableLocation.resolve("nested")); + Files.move(tableLocation.resolve(fileName), tableLocation.resolve("nested").resolve(fileName)); + + assertQueryFails( + "ALTER TABLE " + icebergTableName + " EXECUTE add_files('" + tableLocation + "', 'ORC')", + ".*Recursive directory must not exist when recursive_directory argument is 'fail'.*"); + assertQueryFails( + "ALTER TABLE " + icebergTableName + " EXECUTE add_files('" + tableLocation + "', 'ORC', 'fail')", + ".*Recursive directory must not exist when recursive_directory argument is 'fail'.*"); + + assertUpdate("ALTER TABLE " + icebergTableName + " EXECUTE add_files('" + tableLocation + "', 'ORC', 'false')"); + assertQueryReturnsEmptyResult("SELECT * FROM " + icebergTableName); + + assertUpdate("ALTER TABLE " + icebergTableName + " EXECUTE add_files('" + tableLocation + "', 'ORC', 'true')"); + assertQuery("SELECT * FROM " + icebergTableName, "VALUES 1"); + + assertUpdate("DROP TABLE hive.tpch." + hiveTableName); + assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName); + } + + @Test + void testAddFilesDirectoryLocation() + { + String hiveTableName = "test_add_files_location_" + randomNameSuffix(); + String icebergTableName = "test_add_files_location_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " AS SELECT 1 x", 1); + assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " AS SELECT 2 x", 1); + + String path = (String) computeScalar("SELECT \"$path\" FROM hive.tpch." + hiveTableName); + String directory = Location.of(path).parentDirectory().toString(); + + assertUpdate("ALTER TABLE " + icebergTableName + " EXECUTE add_files('" + directory + "', 'ORC')"); + + assertQuery("SELECT * FROM " + icebergTableName, "VALUES 1, 2"); + + assertUpdate("DROP TABLE hive.tpch." + hiveTableName); + assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName); + } + + @Test + void testAddFilesFileLocation() + { + // Spark allows adding files from specific file location + String hiveTableName = "test_add_files_location_" + randomNameSuffix(); + String icebergTableName = "test_add_files_location_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " AS SELECT 1 x", 1); + assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " AS SELECT 2 x", 1); + + String path = (String) computeScalar("SELECT \"$path\" FROM hive.tpch." + hiveTableName); + + assertUpdate("ALTER TABLE " + icebergTableName + " EXECUTE add_files('" + path + "', 'ORC')"); + + assertQuery("SELECT * FROM iceberg.tpch." + icebergTableName, "VALUES 1, 2"); + + assertUpdate("DROP TABLE hive.tpch." + hiveTableName); + assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName); + } + + @Test + void testAddFilesToPartitionTabtestAddFilesToPartitionTableWithLocationleWithLocation() + { + String hiveTableName = "test_add_files_location_" + randomNameSuffix(); + String icebergTableName = "test_add_files_location_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " WITH (partitioning = ARRAY['part']) AS SELECT 1 x, 'test' part", 1); + assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " AS SELECT 2 x", 1); + + String path = (String) computeScalar("SELECT \"$path\" FROM hive.tpch." + hiveTableName); + String directory = Location.of(path).parentDirectory().toString(); + + assertQueryFails( + "ALTER TABLE " + icebergTableName + " EXECUTE add_files('" + directory + "', 'ORC')", + ".*The procedure does not support partitioned tables"); + + assertUpdate("DROP TABLE hive.tpch." + hiveTableName); + assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName); + } + + @Test + void testAddFilesLocationWithWrongFormat() + { + String hiveTableName = "test_add_files_location_" + randomNameSuffix(); + String icebergTableName = "test_add_files_location_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " AS SELECT 1 x", 1); + assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " WITH (format = 'ORC') AS SELECT 2 x", 1); + + String path = (String) computeScalar("SELECT \"$path\" FROM hive.tpch." + hiveTableName); + String location = Location.of(path).parentDirectory().toString(); + + assertQueryFails( + "ALTER TABLE " + icebergTableName + " EXECUTE add_files('" + location + "', 'TEXTFILE')", + ".* The procedure does not support storage format: TEXTFILE"); + assertQueryFails( + "ALTER TABLE " + icebergTableName + " EXECUTE add_files(location=>'" + location + "', format=>'PARQUET')", + ".*Failed to read file footer.*"); + + assertUpdate("DROP TABLE hive.tpch." + hiveTableName); + assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName); + } + + @Test + void testAddFilesUnsupportedFileFormat() + { + String hiveTableName = "test_add_files_" + randomNameSuffix(); + String icebergTableName = "test_add_files_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " AS SELECT '1' x", 1); + assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " WITH (format = 'TEXTFILE') AS SELECT '2' x", 1); + + assertQueryFails("ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "')", ".*Unsupported storage format: TEXTFILE.*"); + + assertQuery("SELECT * FROM " + icebergTableName, "VALUES '1'"); + + assertUpdate("DROP TABLE hive.tpch." + hiveTableName); + assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName); + } + + @Test + void testAddFilesToNonIcebergTable() + { + String sourceHiveTableName = "test_add_files_" + randomNameSuffix(); + String targetHiveTableName = "test_add_files_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE hive.tpch." + sourceHiveTableName + " AS SELECT 1 x", 1); + assertUpdate("CREATE TABLE hive.tpch." + targetHiveTableName + " AS SELECT 2 x", 1); + + assertQueryFails( + "ALTER TABLE " + targetHiveTableName + " EXECUTE add_files_from_table('tpch', '" + sourceHiveTableName + "')", + "Not an Iceberg table: .*"); + + assertQuery("SELECT * FROM hive.tpch." + sourceHiveTableName, "VALUES 1"); + assertQuery("SELECT * FROM hive.tpch." + targetHiveTableName, "VALUES 2"); + + assertUpdate("DROP TABLE hive.tpch." + sourceHiveTableName); + assertUpdate("DROP TABLE hive.tpch." + targetHiveTableName); + } + + @Test + void testAddFilesToView() + { + String sourceViewName = "test_add_files_" + randomNameSuffix(); + String targetIcebergTableName = "test_add_files_" + randomNameSuffix(); + + assertUpdate("CREATE VIEW iceberg.tpch." + sourceViewName + " AS SELECT 1 x"); + assertUpdate("CREATE TABLE iceberg.tpch." + targetIcebergTableName + " AS SELECT 2 x", 1); + + assertQueryFails( + "ALTER TABLE " + targetIcebergTableName + " EXECUTE add_files_from_table('tpch', '" + sourceViewName + "')", + "The procedure doesn't support adding files from VIRTUAL_VIEW table type"); + + assertQuery("SELECT * FROM iceberg.tpch." + sourceViewName, "VALUES 1"); + assertQuery("SELECT * FROM iceberg.tpch." + targetIcebergTableName, "VALUES 2"); + + assertUpdate("DROP VIEW iceberg.tpch." + sourceViewName); + assertUpdate("DROP TABLE iceberg.tpch." + targetIcebergTableName); + } + + @Test + void testAddFilesFromIcebergTable() + { + String sourceIcebergTableName = "test_add_files_" + randomNameSuffix(); + String targetIcebergTableName = "test_add_files_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE iceberg.tpch." + sourceIcebergTableName + " AS SELECT 1 x", 1); + assertUpdate("CREATE TABLE iceberg.tpch." + targetIcebergTableName + " AS SELECT 2 x", 1); + + assertQueryFails( + "ALTER TABLE " + targetIcebergTableName + " EXECUTE add_files_from_table('tpch', '" + sourceIcebergTableName + "')", + "Adding files from non-Hive tables is unsupported"); + + assertQuery("SELECT * FROM iceberg.tpch." + sourceIcebergTableName, "VALUES 1"); + assertQuery("SELECT * FROM iceberg.tpch." + targetIcebergTableName, "VALUES 2"); + + assertUpdate("DROP TABLE iceberg.tpch." + sourceIcebergTableName); + assertUpdate("DROP TABLE iceberg.tpch." + targetIcebergTableName); + } + + @Test + void testAddDuplicatedFiles() + { + // TODO Consider adding 'check_duplicate_files' option like Spark + String hiveTableName = "test_add_files_" + randomNameSuffix(); + String icebergTableName = "test_add_files_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " AS SELECT 1 x", 1); + assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " AS SELECT 2 x", 1); + + assertUpdate("ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "')"); + assertUpdate("ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "')"); + + assertQuery("SELECT * FROM iceberg.tpch." + icebergTableName, "VALUES 1, 2, 1"); + + assertUpdate("DROP TABLE hive.tpch." + hiveTableName); + assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName); + } + + @Test + void testAddFilesTargetTableNotFound() + { + String hiveTableName = "test_add_files_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " AS SELECT 1 x", 1); + assertQueryFails( + "ALTER TABLE table_not_found EXECUTE add_files_from_table('tpch', '" + hiveTableName + "')", + ".* Table 'iceberg.tpch.table_not_found' does not exist"); + assertUpdate("DROP TABLE hive.tpch." + hiveTableName); + } + + @Test + void testAddFilesSourceTableNotFound() + { + String icebergTableName = "test_add_files_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + "(x int)"); + assertQueryFails( + "ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', 'table_not_found')", + "Table 'tpch.table_not_found' not found"); + assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName); + } + + @Test + void testAddFilesLocationNotFound() + { + String tableName = "test_add_files_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE iceberg.tpch." + tableName + "(x int)"); + assertQueryFails( + "ALTER TABLE " + tableName + " EXECUTE add_files('file:///location-not-found', 'ORC')", + ".*Location not found.*"); + assertUpdate("DROP TABLE iceberg.tpch." + tableName); + } + + @Test + void testAddFilesInvalidArguments() + { + String tableName = "test_add_files_" + randomNameSuffix(); + assertUpdate("CREATE TABLE iceberg.tpch." + tableName + "(x int)"); + + assertQueryFails( + "ALTER TABLE " + tableName + " EXECUTE add_files_from_table(schema_name=>'tpch')", + "Required procedure argument 'table_name' is missing"); + assertQueryFails( + "ALTER TABLE " + tableName + " EXECUTE add_files_from_table(table_name=>'test')", + "Required procedure argument 'schema_name' is missing"); + + assertQueryFails( + "ALTER TABLE " + tableName + " EXECUTE add_files(location=>'file:///tmp')", + "Required procedure argument 'format' is missing"); + assertQueryFails( + "ALTER TABLE " + tableName + " EXECUTE add_files(format=>'ORC')", + "Required procedure argument 'location' is missing"); + + assertUpdate("DROP TABLE iceberg.tpch." + tableName); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMigrateProcedure.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/procedure/TestIcebergMigrateProcedure.java similarity index 99% rename from plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMigrateProcedure.java rename to plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/procedure/TestIcebergMigrateProcedure.java index c0ac0f89ceb82..46003cae5961d 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMigrateProcedure.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/procedure/TestIcebergMigrateProcedure.java @@ -12,10 +12,12 @@ * limitations under the License. */ -package io.trino.plugin.iceberg; +package io.trino.plugin.iceberg.procedure; import com.google.common.collect.ImmutableMap; import io.trino.plugin.hive.TestingHivePlugin; +import io.trino.plugin.iceberg.IcebergFileFormat; +import io.trino.plugin.iceberg.IcebergQueryRunner; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.QueryRunner; import org.junit.jupiter.api.Test; diff --git a/plugin/trino-iceberg/src/test/java/org/apache/iceberg/snowflake/TestTrinoSnowflakeCatalog.java b/plugin/trino-iceberg/src/test/java/org/apache/iceberg/snowflake/TestTrinoSnowflakeCatalog.java index 7c371b8ae7554..0185f15329947 100644 --- a/plugin/trino-iceberg/src/test/java/org/apache/iceberg/snowflake/TestTrinoSnowflakeCatalog.java +++ b/plugin/trino-iceberg/src/test/java/org/apache/iceberg/snowflake/TestTrinoSnowflakeCatalog.java @@ -216,7 +216,9 @@ public void testNonLowercaseNamespace() (connectorIdentity, fileIOProperties) -> { throw new UnsupportedOperationException(); }, - new TableStatisticsWriter(new NodeVersion("test-version"))); + new TableStatisticsWriter(new NodeVersion("test-version")), + Optional.empty(), + false); assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)") .isTrue(); assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)")