Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid reading min/max/null statistics for planning iceberg inserts #23757

Merged
merged 2 commits into from
Oct 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@
import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields;
import static io.trino.plugin.iceberg.PartitionFields.toPartitionFields;
import static io.trino.plugin.iceberg.SortFieldUtils.parseSortFields;
import static io.trino.plugin.iceberg.TableStatisticsReader.readNdvs;
import static io.trino.plugin.iceberg.TableStatisticsWriter.StatsUpdateMode.INCREMENTAL_UPDATE;
import static io.trino.plugin.iceberg.TableStatisticsWriter.StatsUpdateMode.REPLACE;
import static io.trino.plugin.iceberg.TableType.DATA;
Expand Down Expand Up @@ -346,6 +347,7 @@
import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP;
import static org.apache.iceberg.SnapshotSummary.REMOVED_EQ_DELETES_PROP;
import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP;
import static org.apache.iceberg.SnapshotSummary.TOTAL_RECORDS_PROP;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The calling code in getStatisticsCollectionMetadataForWrite was not
using all the statistics and is simplified to only fetch NDVs.

For regression prevention purposes: Can we ensure through a test that the calling code does not depend on min/max stats retrieved from io.trino.plugin.iceberg.TableStatisticsReader#makeTableStatistics ?

I remember raising a while ago the idea promoted by this PR and @findepi was rather cautious in doing this change because of potential regressions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The important thing here is to not go through all manifest files for planning inserts rather than usage of min/max/null stats. There are tests which assert on manifest file accesses on filesystem. If this code were getting those stats through some other cheaper means, then we wouldn't care about it.
At worst, there's couple of ways of making a mistake in this code:

  1. We generate NDV stats even though we don't know existing NDVs and end up under counting NDVs by recording only the NDVs collected on write. The other min/max/nulls stats would still be correct, so the CBO may give worse plans but it's not the end of the world. Eventually the stats will become more accurate, or a call to ANALYZE will fix the whole thing.
  2. We fail to detect that the table is empty or that NDVs are known and skip generating them on write. Again we get possibly worse plans from CBO while the other non-ndv stats are still intact and a call to ANALYZE will fix the problem.
    Either way, these are much more tolerable problems than the problem caused by the current code where it can bottleneck INSERT queries for minutes on planning.

import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
Expand Down Expand Up @@ -2461,21 +2463,34 @@ public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(Connector

ConnectorTableHandle tableHandle = getTableHandle(session, tableMetadata.getTable(), Optional.empty(), Optional.empty());
if (tableHandle == null) {
// Assume new table (CTAS), collect all stats possible
// Assume new table (CTAS), collect NDV stats on all columns
return getStatisticsCollectionMetadata(tableMetadata, Optional.empty(), availableColumnNames -> {});
}
IcebergTableHandle table = checkValidTableHandle(tableHandle);
Schema schema = SchemaParser.fromJson(table.getTableSchemaJson());
TableStatistics tableStatistics = getTableStatistics(
session,
table.withProjectedColumns(ImmutableSet.copyOf(getTopLevelColumns(schema, typeManager))));
if (tableStatistics.getRowCount().getValue() == 0.0) {
// Table has no data (empty, or wiped out). Collect all stats possible
if (table.getSnapshotId().isEmpty()) {
// Table has no data (empty, or wiped out). Collect NDV stats on all columns
return getStatisticsCollectionMetadata(tableMetadata, Optional.empty(), availableColumnNames -> {});
}

Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());
long snapshotId = table.getSnapshotId().orElseThrow();
Snapshot snapshot = icebergTable.snapshot(snapshotId);
raunaqmorarka marked this conversation as resolved.
Show resolved Hide resolved
String totalRecords = snapshot.summary().get(TOTAL_RECORDS_PROP);
if (totalRecords != null && Long.parseLong(totalRecords) == 0) {
// Table has no data (empty, or wiped out). Collect NDV stats on all columns
return getStatisticsCollectionMetadata(tableMetadata, Optional.empty(), availableColumnNames -> {});
}
Set<String> columnsWithExtendedStatistics = tableStatistics.getColumnStatistics().entrySet().stream()
.filter(entry -> !entry.getValue().getDistinctValuesCount().isUnknown())
.map(entry -> ((IcebergColumnHandle) entry.getKey()).getName())

Schema schema = SchemaParser.fromJson(table.getTableSchemaJson());
List<IcebergColumnHandle> columns = getTopLevelColumns(schema, typeManager);
Set<Integer> columnIds = columns.stream()
.map(IcebergColumnHandle::getId)
.collect(toImmutableSet());
Map<Integer, Long> ndvs = readNdvs(icebergTable, snapshotId, columnIds, true);
// Avoid collecting NDV stats on columns where we don't know the existing NDV count
Set<String> columnsWithExtendedStatistics = columns.stream()
.filter(column -> ndvs.containsKey(column.getId()))
.map(IcebergColumnHandle::getName)
.collect(toImmutableSet());
return getStatisticsCollectionMetadata(tableMetadata, Optional.of(columnsWithExtendedStatistics), availableColumnNames -> {});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ else if (columnHandle.getBaseType() == VARBINARY) {
return new TableStatistics(Estimate.of(recordCount), columnHandleBuilder.buildOrThrow());
}

private static Map<Integer, Long> readNdvs(Table icebergTable, long snapshotId, Set<Integer> columnIds, boolean extendedStatisticsEnabled)
public static Map<Integer, Long> readNdvs(Table icebergTable, long snapshotId, Set<Integer> columnIds, boolean extendedStatisticsEnabled)
{
if (!extendedStatisticsEnabled) {
return ImmutableMap.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ public void testCacheFileOperations()
.add(new CacheOperation("Alluxio.writeCache", METADATA_JSON))
.addCopies(new CacheOperation("Alluxio.readCached", SNAPSHOT), 2)
.add(new CacheOperation("InputFile.length", SNAPSHOT))
.add(new CacheOperation("Alluxio.readExternalStream", MANIFEST))
.addCopies(new CacheOperation("Alluxio.readExternalStream", MANIFEST), 2)
.addCopies(new CacheOperation("Alluxio.readCached", MANIFEST), 4)
.add(new CacheOperation("Alluxio.writeCache", MANIFEST))
.addCopies(new CacheOperation("Alluxio.writeCache", MANIFEST), 2)
.build());

assertFileSystemAccesses(
Expand Down Expand Up @@ -129,9 +129,9 @@ public void testCacheFileOperations()
.add(new CacheOperation("Alluxio.writeCache", METADATA_JSON))
.addCopies(new CacheOperation("Alluxio.readCached", SNAPSHOT), 2)
.add(new CacheOperation("InputFile.length", SNAPSHOT))
.add(new CacheOperation("Alluxio.readExternalStream", MANIFEST))
.addCopies(new CacheOperation("Alluxio.readExternalStream", MANIFEST), 3)
.addCopies(new CacheOperation("Alluxio.readCached", MANIFEST), 10)
.add(new CacheOperation("Alluxio.writeCache", MANIFEST))
.addCopies(new CacheOperation("Alluxio.writeCache", MANIFEST), 3)
.build());
assertFileSystemAccesses(
"SELECT * FROM test_cache_file_operations",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,41 @@ public void testCreateOrReplaceTableAsSelect()
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(METADATA_JSON, "OutputFile.create"), 2)
.addCopies(new FileOperation(METADATA_JSON, "InputFile.newStream"), 2)
.addCopies(new FileOperation(SNAPSHOT, "InputFile.newStream"), 2)
.addCopies(new FileOperation(SNAPSHOT, "InputFile.length"), 2)
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "OutputFile.create"))
.add(new FileOperation(MANIFEST, "OutputFile.create"))
.add(new FileOperation(STATS, "OutputFile.create"))
.build());
}

@Test
public void testInsert()
{
assertUpdate("CREATE TABLE test_insert (id VARCHAR, age INT)");

assertFileSystemAccesses(
"INSERT INTO test_insert VALUES('a', 1)",
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(METADATA_JSON, "OutputFile.create"), 2)
.addCopies(new FileOperation(METADATA_JSON, "InputFile.newStream"), 3)
.addCopies(new FileOperation(SNAPSHOT, "InputFile.length"), 3)
.addCopies(new FileOperation(SNAPSHOT, "InputFile.newStream"), 3)
.add(new FileOperation(SNAPSHOT, "OutputFile.create"))
.add(new FileOperation(MANIFEST, "OutputFile.create"))
.add(new FileOperation(STATS, "OutputFile.create"))
.build());

assertFileSystemAccesses(
"INSERT INTO test_insert VALUES('b', 2)",
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(METADATA_JSON, "OutputFile.create"), 2)
.addCopies(new FileOperation(METADATA_JSON, "InputFile.newStream"), 3)
.addCopies(new FileOperation(SNAPSHOT, "InputFile.newStream"), 3)
.addCopies(new FileOperation(SNAPSHOT, "InputFile.length"), 3)
.add(new FileOperation(STATS, "InputFile.newStream"))
.add(new FileOperation(SNAPSHOT, "OutputFile.create"))
.add(new FileOperation(MANIFEST, "OutputFile.create"))
.add(new FileOperation(MANIFEST, "InputFile.newStream"))
.add(new FileOperation(STATS, "OutputFile.create"))
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void testCacheFileOperations()
.add(new CacheOperation("FileSystemCache.cacheStream", METADATA_JSON))
.add(new CacheOperation("FileSystemCache.cacheLength", SNAPSHOT))
.add(new CacheOperation("FileSystemCache.cacheStream", SNAPSHOT))
.add(new CacheOperation("Input.readTail", MANIFEST))
.addCopies(new CacheOperation("Input.readTail", MANIFEST), 2)
.addCopies(new CacheOperation("FileSystemCache.cacheStream", MANIFEST), 2)
.build());

Expand Down Expand Up @@ -116,7 +116,7 @@ public void testCacheFileOperations()
.add(new CacheOperation("FileSystemCache.cacheStream", METADATA_JSON))
.add(new CacheOperation("FileSystemCache.cacheLength", SNAPSHOT))
.add(new CacheOperation("FileSystemCache.cacheStream", SNAPSHOT))
.add(new CacheOperation("Input.readTail", MANIFEST))
.addCopies(new CacheOperation("Input.readTail", MANIFEST), 3)
.addCopies(new CacheOperation("FileSystemCache.cacheStream", MANIFEST), 5)
.build());

Expand Down