Skip to content

Commit

Permalink
Avoid reading min/max/null statistics for planning iceberg inserts
Browse files Browse the repository at this point in the history
For large tables going through statistics for all files can be slow.
The calling code in getStatisticsCollectionMetadataForWrite was not
using all the statistics and is simplified to only fetch NDVs.
  • Loading branch information
raunaqmorarka committed Oct 11, 2024
1 parent 847bd1f commit 7e669a2
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@
import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields;
import static io.trino.plugin.iceberg.PartitionFields.toPartitionFields;
import static io.trino.plugin.iceberg.SortFieldUtils.parseSortFields;
import static io.trino.plugin.iceberg.TableStatisticsReader.readNdvs;
import static io.trino.plugin.iceberg.TableStatisticsWriter.StatsUpdateMode.INCREMENTAL_UPDATE;
import static io.trino.plugin.iceberg.TableStatisticsWriter.StatsUpdateMode.REPLACE;
import static io.trino.plugin.iceberg.TableType.DATA;
Expand Down Expand Up @@ -346,6 +347,7 @@
import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP;
import static org.apache.iceberg.SnapshotSummary.REMOVED_EQ_DELETES_PROP;
import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP;
import static org.apache.iceberg.SnapshotSummary.TOTAL_RECORDS_PROP;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
Expand Down Expand Up @@ -2461,21 +2463,34 @@ public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(Connector

ConnectorTableHandle tableHandle = getTableHandle(session, tableMetadata.getTable(), Optional.empty(), Optional.empty());
if (tableHandle == null) {
// Assume new table (CTAS), collect all stats possible
// Assume new table (CTAS), collect NDV stats on all columns
return getStatisticsCollectionMetadata(tableMetadata, Optional.empty(), availableColumnNames -> {});
}
IcebergTableHandle table = checkValidTableHandle(tableHandle);
Schema schema = SchemaParser.fromJson(table.getTableSchemaJson());
TableStatistics tableStatistics = getTableStatistics(
session,
table.withProjectedColumns(ImmutableSet.copyOf(getTopLevelColumns(schema, typeManager))));
if (tableStatistics.getRowCount().getValue() == 0.0) {
// Table has no data (empty, or wiped out). Collect all stats possible
if (table.getSnapshotId().isEmpty()) {
// Table has no data (empty, or wiped out). Collect NDV stats on all columns
return getStatisticsCollectionMetadata(tableMetadata, Optional.empty(), availableColumnNames -> {});
}

Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());
long snapshotId = table.getSnapshotId().orElseThrow();
Snapshot snapshot = icebergTable.snapshot(snapshotId);
String totalRecords = snapshot.summary().get(TOTAL_RECORDS_PROP);
if (totalRecords != null && Long.parseLong(totalRecords) == 0) {
// Table has no data (empty, or wiped out). Collect NDV stats on all columns
return getStatisticsCollectionMetadata(tableMetadata, Optional.empty(), availableColumnNames -> {});
}
Set<String> columnsWithExtendedStatistics = tableStatistics.getColumnStatistics().entrySet().stream()
.filter(entry -> !entry.getValue().getDistinctValuesCount().isUnknown())
.map(entry -> ((IcebergColumnHandle) entry.getKey()).getName())

Schema schema = SchemaParser.fromJson(table.getTableSchemaJson());
List<IcebergColumnHandle> projectedColumns = getTopLevelColumns(schema, typeManager);
Set<Integer> columnIds = projectedColumns.stream()
.map(IcebergColumnHandle::getId)
.collect(toImmutableSet());
Map<Integer, Long> ndvs = readNdvs(icebergTable, snapshotId, columnIds, true);
// Avoid collecting NDV stats on columns where we don't know the existing NDV count
Set<String> columnsWithExtendedStatistics = projectedColumns.stream()
.filter(column -> ndvs.containsKey(column.getId()))
.map(IcebergColumnHandle::getName)
.collect(toImmutableSet());
return getStatisticsCollectionMetadata(tableMetadata, Optional.of(columnsWithExtendedStatistics), availableColumnNames -> {});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ else if (columnHandle.getBaseType() == VARBINARY) {
return new TableStatistics(Estimate.of(recordCount), columnHandleBuilder.buildOrThrow());
}

private static Map<Integer, Long> readNdvs(Table icebergTable, long snapshotId, Set<Integer> columnIds, boolean extendedStatisticsEnabled)
public static Map<Integer, Long> readNdvs(Table icebergTable, long snapshotId, Set<Integer> columnIds, boolean extendedStatisticsEnabled)
{
if (!extendedStatisticsEnabled) {
return ImmutableMap.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ public void testCacheFileOperations()
.add(new CacheOperation("Alluxio.writeCache", METADATA_JSON))
.addCopies(new CacheOperation("Alluxio.readCached", SNAPSHOT), 2)
.add(new CacheOperation("InputFile.length", SNAPSHOT))
.add(new CacheOperation("Alluxio.readExternalStream", MANIFEST))
.addCopies(new CacheOperation("Alluxio.readExternalStream", MANIFEST), 2)
.addCopies(new CacheOperation("Alluxio.readCached", MANIFEST), 4)
.add(new CacheOperation("Alluxio.writeCache", MANIFEST))
.addCopies(new CacheOperation("Alluxio.writeCache", MANIFEST), 2)
.build());

assertFileSystemAccesses(
Expand Down Expand Up @@ -129,9 +129,9 @@ public void testCacheFileOperations()
.add(new CacheOperation("Alluxio.writeCache", METADATA_JSON))
.addCopies(new CacheOperation("Alluxio.readCached", SNAPSHOT), 2)
.add(new CacheOperation("InputFile.length", SNAPSHOT))
.add(new CacheOperation("Alluxio.readExternalStream", MANIFEST))
.addCopies(new CacheOperation("Alluxio.readExternalStream", MANIFEST), 3)
.addCopies(new CacheOperation("Alluxio.readCached", MANIFEST), 10)
.add(new CacheOperation("Alluxio.writeCache", MANIFEST))
.addCopies(new CacheOperation("Alluxio.writeCache", MANIFEST), 3)
.build());
assertFileSystemAccesses(
"SELECT * FROM test_cache_file_operations",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,10 @@ public void testCreateOrReplaceTableAsSelect()
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(METADATA_JSON, "OutputFile.create"), 2)
.addCopies(new FileOperation(METADATA_JSON, "InputFile.newStream"), 2)
.addCopies(new FileOperation(SNAPSHOT, "InputFile.newStream"), 2)
.addCopies(new FileOperation(SNAPSHOT, "InputFile.length"), 2)
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "OutputFile.create"))
.add(new FileOperation(MANIFEST, "OutputFile.create"))
.add(new FileOperation(MANIFEST, "InputFile.newStream"))
.add(new FileOperation(STATS, "OutputFile.create"))
.build());
}
Expand Down Expand Up @@ -234,7 +233,6 @@ public void testInsert()
.add(new FileOperation(STATS, "InputFile.newStream"))
.add(new FileOperation(SNAPSHOT, "OutputFile.create"))
.add(new FileOperation(MANIFEST, "OutputFile.create"))
.add(new FileOperation(MANIFEST, "InputFile.newStream"))
.add(new FileOperation(STATS, "OutputFile.create"))
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void testCacheFileOperations()
.add(new CacheOperation("FileSystemCache.cacheStream", METADATA_JSON))
.add(new CacheOperation("FileSystemCache.cacheLength", SNAPSHOT))
.add(new CacheOperation("FileSystemCache.cacheStream", SNAPSHOT))
.add(new CacheOperation("Input.readTail", MANIFEST))
.addCopies(new CacheOperation("Input.readTail", MANIFEST), 2)
.addCopies(new CacheOperation("FileSystemCache.cacheStream", MANIFEST), 2)
.build());

Expand Down Expand Up @@ -116,7 +116,7 @@ public void testCacheFileOperations()
.add(new CacheOperation("FileSystemCache.cacheStream", METADATA_JSON))
.add(new CacheOperation("FileSystemCache.cacheLength", SNAPSHOT))
.add(new CacheOperation("FileSystemCache.cacheStream", SNAPSHOT))
.add(new CacheOperation("Input.readTail", MANIFEST))
.addCopies(new CacheOperation("Input.readTail", MANIFEST), 3)
.addCopies(new CacheOperation("FileSystemCache.cacheStream", MANIFEST), 5)
.build());

Expand Down

0 comments on commit 7e669a2

Please sign in to comment.