Skip to content

Commit

Permalink
Update Iceberg table statistics on writes
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed Mar 20, 2023
1 parent 1370c2e commit bf04a72
Show file tree
Hide file tree
Showing 20 changed files with 943 additions and 410 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableMap;
import org.apache.datasketches.theta.CompactSketch;

import java.util.Map;

import static java.util.Objects.requireNonNull;

public record CollectedStatistics(Map<Integer, CompactSketch> ndvSketches)
{
public CollectedStatistics
{
ndvSketches = ImmutableMap.copyOf(requireNonNull(ndvSketches, "ndvSketches is null"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class IcebergConfig
public static final int FORMAT_VERSION_SUPPORT_MAX = 2;
public static final String EXTENDED_STATISTICS_CONFIG = "iceberg.extended-statistics.enabled";
public static final String EXTENDED_STATISTICS_DESCRIPTION = "Enable collection (ANALYZE) and use of extended statistics.";
public static final String COLLECT_EXTENDED_STATISTICS_ON_WRITE_DESCRIPTION = "Collect extended statistics during writes";
public static final String EXPIRE_SNAPSHOTS_MIN_RETENTION = "iceberg.expire_snapshots.min-retention";
public static final String REMOVE_ORPHAN_FILES_MIN_RETENTION = "iceberg.remove_orphan_files.min-retention";

Expand All @@ -58,6 +59,7 @@ public class IcebergConfig
private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS);
private boolean tableStatisticsEnabled = true;
private boolean extendedStatisticsEnabled = true;
private boolean collectExtendedStatisticsOnWrite = true;
private boolean projectionPushdownEnabled = true;
private boolean registerTableProcedureEnabled;
private Optional<String> hiveCatalogName = Optional.empty();
Expand Down Expand Up @@ -202,6 +204,19 @@ public IcebergConfig setExtendedStatisticsEnabled(boolean extendedStatisticsEnab
return this;
}

public boolean isCollectExtendedStatisticsOnWrite()
{
return collectExtendedStatisticsOnWrite;
}

@Config("iceberg.extended-statistics.collect-on-write")
@ConfigDescription(COLLECT_EXTENDED_STATISTICS_ON_WRITE_DESCRIPTION)
public IcebergConfig setCollectExtendedStatisticsOnWrite(boolean collectExtendedStatisticsOnWrite)
{
this.collectExtendedStatisticsOnWrite = collectExtendedStatisticsOnWrite;
return this;
}

public boolean isProjectionPushdownEnabled()
{
return projectionPushdownEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -203,6 +204,7 @@
import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getExpireSnapshotMinRetention;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getRemoveOrphanFilesMinRetention;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isCollectExtendedStatisticsOnWrite;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isExtendedStatisticsEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isMergeManifestsOnWrite;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isProjectionPushdownEnabled;
Expand Down Expand Up @@ -232,6 +234,8 @@
import static io.trino.plugin.iceberg.SortFieldUtils.parseSortFields;
import static io.trino.plugin.iceberg.TableStatisticsReader.TRINO_STATS_COLUMN_ID_PATTERN;
import static io.trino.plugin.iceberg.TableStatisticsReader.TRINO_STATS_PREFIX;
import static io.trino.plugin.iceberg.TableStatisticsWriter.StatsUpdateMode.INCREMENTAL_UPDATE;
import static io.trino.plugin.iceberg.TableStatisticsWriter.StatsUpdateMode.REPLACE;
import static io.trino.plugin.iceberg.TableType.DATA;
import static io.trino.plugin.iceberg.TypeConverter.toIcebergTypeForNewColumn;
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
Expand Down Expand Up @@ -851,6 +855,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,

IcebergWritableTableHandle table = (IcebergWritableTableHandle) insertHandle;
Table icebergTable = transaction.table();
Optional<Long> beforeWriteSnapshotId = Optional.ofNullable(icebergTable.currentSnapshot()).map(Snapshot::snapshotId);
Schema schema = icebergTable.schema();
Type[] partitionColumnTypes = icebergTable.spec().fields().stream()
.map(field -> field.transform().getResultType(
Expand Down Expand Up @@ -883,8 +888,39 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,

commit(appendFiles, session);
transaction.commitTransaction();
// TODO (https://github.com/trinodb/trino/issues/15439) this may not exactly be the snapshot we committed, if there is another writer
long newSnapshotId = transaction.table().currentSnapshot().snapshotId();
transaction = null;

// TODO (https://github.com/trinodb/trino/issues/15439): it would be good to publish data and stats atomically
beforeWriteSnapshotId.ifPresent(previous ->
verify(previous != newSnapshotId, "Failed to get new snapshot ID "));

if (!computedStatistics.isEmpty()) {
try {
beginTransaction(catalog.loadTable(session, table.getName()));
Table reloadedTable = transaction.table();
CollectedStatistics collectedStatistics = processComputedTableStatistics(reloadedTable, computedStatistics);
StatisticsFile statisticsFile = tableStatisticsWriter.writeStatisticsFile(
session,
reloadedTable,
newSnapshotId,
INCREMENTAL_UPDATE,
collectedStatistics);
transaction.updateStatistics()
.setStatistics(newSnapshotId, statisticsFile)
.commit();

transaction.commitTransaction();
}
catch (Exception e) {
// Write was committed, so at this point we cannot fail the query
// TODO (https://github.com/trinodb/trino/issues/15439): it would be good to publish data and stats atomically
log.error(e, "Failed to save table statistics");
}
transaction = null;
}

return Optional.of(new HiveWrittenPartitions(commitTasks.stream()
.map(CommitTaskData::getPath)
.collect(toImmutableList())));
Expand Down Expand Up @@ -1740,6 +1776,30 @@ private List<ColumnMetadata> getColumnMetadatas(Schema schema)
return columns.build();
}

@Override
public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
if (!isExtendedStatisticsEnabled(session) || !isCollectExtendedStatisticsOnWrite(session)) {
return TableStatisticsMetadata.empty();
}

IcebergTableHandle tableHandle = getTableHandle(session, tableMetadata.getTable(), Optional.empty(), Optional.empty());
if (tableHandle == null) {
// Assume new table (CTAS), collect all stats possible
return getStatisticsCollectionMetadata(tableMetadata, Optional.empty(), availableColumnNames -> {});
}
TableStatistics tableStatistics = getTableStatistics(session, tableHandle);
if (tableStatistics.getRowCount().getValue() == 0.0) {
// Table has no data (empty, or wiped out). Collect all stats possible
return getStatisticsCollectionMetadata(tableMetadata, Optional.empty(), availableColumnNames -> {});
}
Set<String> columnsWithExtendedStatistics = tableStatistics.getColumnStatistics().entrySet().stream()
.filter(entry -> !entry.getValue().getDistinctValuesCount().isUnknown())
.map(entry -> ((IcebergColumnHandle) entry.getKey()).getName())
.collect(toImmutableSet());
return getStatisticsCollectionMetadata(tableMetadata, Optional.of(columnsWithExtendedStatistics), availableColumnNames -> {});
}

@Override
public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, Map<String, Object> analyzeProperties)
{
Expand All @@ -1758,35 +1818,53 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession
}

ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle);
Set<String> allScalarColumnNames = tableMetadata.getColumns().stream()
.filter(column -> !column.isHidden())
.filter(column -> column.getType().getTypeParameters().isEmpty()) // is scalar type
.map(ColumnMetadata::getName)
.collect(toImmutableSet());

Set<String> analyzeColumnNames = getColumnNames(analyzeProperties)
Optional<Set<String>> analyzeColumnNames = getColumnNames(analyzeProperties)
.map(columnNames -> {
// validate that proper column names are passed via `columns` analyze property
if (columnNames.isEmpty()) {
throw new TrinoException(INVALID_ANALYZE_PROPERTY, "Cannot specify empty list of columns for analysis");
}
if (!allScalarColumnNames.containsAll(columnNames)) {
throw new TrinoException(
INVALID_ANALYZE_PROPERTY,
format("Invalid columns specified for analysis: %s", Sets.difference(columnNames, allScalarColumnNames)));
}
return columnNames;
})
.orElse(allScalarColumnNames);
});

return new ConnectorAnalyzeMetadata(
tableHandle,
getStatisticsCollectionMetadata(
tableMetadata,
analyzeColumnNames,
availableColumnNames -> {
throw new TrinoException(
INVALID_ANALYZE_PROPERTY,
format("Invalid columns specified for analysis: %s", Sets.difference(analyzeColumnNames.orElseThrow(), availableColumnNames)));
}));
}

private TableStatisticsMetadata getStatisticsCollectionMetadata(
ConnectorTableMetadata tableMetadata,
Optional<Set<String>> selectedColumnNames,
Consumer<Set<String>> unsatisfiableSelectedColumnsHandler)
{
Set<String> allScalarColumnNames = tableMetadata.getColumns().stream()
.filter(column -> !column.isHidden())
.filter(column -> column.getType().getTypeParameters().isEmpty()) // is scalar type
.map(ColumnMetadata::getName)
.collect(toImmutableSet());

selectedColumnNames.ifPresent(columnNames -> {
if (!allScalarColumnNames.containsAll(columnNames)) {
unsatisfiableSelectedColumnsHandler.accept(allScalarColumnNames);
}
});

Set<ColumnStatisticMetadata> columnStatistics = tableMetadata.getColumns().stream()
.filter(column -> analyzeColumnNames.contains(column.getName()))
.filter(columnMetadata -> allScalarColumnNames.contains(columnMetadata.getName()))
.filter(selectedColumnNames
.map(columnNames -> (Predicate<ColumnMetadata>) columnMetadata -> columnNames.contains(columnMetadata.getName()))
.orElse(columnMetadata -> true))
.map(column -> new ColumnStatisticMetadata(column.getName(), NUMBER_OF_DISTINCT_VALUES_NAME, NUMBER_OF_DISTINCT_VALUES_FUNCTION))
.collect(toImmutableSet());

return new ConnectorAnalyzeMetadata(
tableHandle,
new TableStatisticsMetadata(columnStatistics, ImmutableSet.of(), ImmutableList.of()));
return new TableStatisticsMetadata(columnStatistics, ImmutableSet.of(), ImmutableList.of());
}

@Override
Expand Down Expand Up @@ -1824,9 +1902,9 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
}
long snapshotId = handle.getSnapshotId().orElseThrow();

Map<String, Integer> columnNameToId = table.schema().columns().stream()
.collect(toImmutableMap(nestedField -> nestedField.name().toLowerCase(ENGLISH), Types.NestedField::fieldId));
Set<Integer> columnIds = ImmutableSet.copyOf(columnNameToId.values());
Set<Integer> columnIds = table.schema().columns().stream()
.map(Types.NestedField::fieldId)
.collect(toImmutableSet());

// TODO (https://github.com/trinodb/trino/issues/15397): remove support for Trino-specific statistics properties
// Drop stats for obsolete columns
Expand All @@ -1845,31 +1923,13 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
.forEach(updateProperties::remove);
updateProperties.commit();

ImmutableMap.Builder<Integer, CompactSketch> ndvSketches = ImmutableMap.builder();
for (ComputedStatistics computedStatistic : computedStatistics) {
verify(computedStatistic.getGroupingColumns().isEmpty() && computedStatistic.getGroupingValues().isEmpty(), "Unexpected grouping");
verify(computedStatistic.getTableStatistics().isEmpty(), "Unexpected table statistics");
for (Map.Entry<ColumnStatisticMetadata, Block> entry : computedStatistic.getColumnStatistics().entrySet()) {
ColumnStatisticMetadata statisticMetadata = entry.getKey();
if (statisticMetadata.getConnectorAggregationId().equals(NUMBER_OF_DISTINCT_VALUES_NAME)) {
Integer columnId = verifyNotNull(
columnNameToId.get(statisticMetadata.getColumnName()),
"Column not found in table: [%s]",
statisticMetadata.getColumnName());
CompactSketch sketch = DataSketchStateSerializer.deserialize(entry.getValue(), 0);
ndvSketches.put(columnId, sketch);
}
else {
throw new UnsupportedOperationException("Unsupported statistic: " + statisticMetadata);
}
}
}

CollectedStatistics collectedStatistics = processComputedTableStatistics(table, computedStatistics);
StatisticsFile statisticsFile = tableStatisticsWriter.writeStatisticsFile(
session,
table,
snapshotId,
ndvSketches.buildOrThrow());
REPLACE,
collectedStatistics);
transaction.updateStatistics()
.setStatistics(snapshotId, statisticsFile)
.commit();
Expand Down Expand Up @@ -2721,6 +2781,34 @@ public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session,
return catalog.redirectTable(session, tableName);
}

private static CollectedStatistics processComputedTableStatistics(Table table, Collection<ComputedStatistics> computedStatistics)
{
Map<String, Integer> columnNameToId = table.schema().columns().stream()
.collect(toImmutableMap(nestedField -> nestedField.name().toLowerCase(ENGLISH), Types.NestedField::fieldId));

ImmutableMap.Builder<Integer, CompactSketch> ndvSketches = ImmutableMap.builder();
for (ComputedStatistics computedStatistic : computedStatistics) {
verify(computedStatistic.getGroupingColumns().isEmpty() && computedStatistic.getGroupingValues().isEmpty(), "Unexpected grouping");
verify(computedStatistic.getTableStatistics().isEmpty(), "Unexpected table statistics");
for (Map.Entry<ColumnStatisticMetadata, Block> entry : computedStatistic.getColumnStatistics().entrySet()) {
ColumnStatisticMetadata statisticMetadata = entry.getKey();
if (statisticMetadata.getConnectorAggregationId().equals(NUMBER_OF_DISTINCT_VALUES_NAME)) {
Integer columnId = verifyNotNull(
columnNameToId.get(statisticMetadata.getColumnName()),
"Column not found in table: [%s]",
statisticMetadata.getColumnName());
CompactSketch sketch = DataSketchStateSerializer.deserialize(entry.getValue(), 0);
ndvSketches.put(columnId, sketch);
}
else {
throw new UnsupportedOperationException("Unsupported statistic: " + statisticMetadata);
}
}
}

return new CollectedStatistics(ndvSketches.buildOrThrow());
}

private void beginTransaction(Table icebergTable)
{
verify(transaction == null, "transaction already set");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.plugin.base.session.PropertyMetadataUtil.dataSizeProperty;
import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty;
import static io.trino.plugin.iceberg.IcebergConfig.COLLECT_EXTENDED_STATISTICS_ON_WRITE_DESCRIPTION;
import static io.trino.plugin.iceberg.IcebergConfig.EXTENDED_STATISTICS_DESCRIPTION;
import static io.trino.spi.StandardErrorCode.INVALID_SESSION_PROPERTY;
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
Expand Down Expand Up @@ -77,6 +78,7 @@ public final class IcebergSessionProperties
public static final String EXTENDED_STATISTICS_ENABLED = "extended_statistics_enabled";
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled";
private static final String TARGET_MAX_FILE_SIZE = "target_max_file_size";
public static final String COLLECT_EXTENDED_STATISTICS_ON_WRITE = "collect_extended_statistics_on_write";
private static final String HIVE_CATALOG_NAME = "hive_catalog_name";
private static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight";
public static final String EXPIRE_SNAPSHOTS_MIN_RETENTION = "expire_snapshots_min_retention";
Expand Down Expand Up @@ -257,6 +259,11 @@ public IcebergSessionProperties(
"Target maximum size of written files; the actual size may be larger",
icebergConfig.getTargetMaxFileSize(),
false))
.add(booleanProperty(
COLLECT_EXTENDED_STATISTICS_ON_WRITE,
COLLECT_EXTENDED_STATISTICS_ON_WRITE_DESCRIPTION,
icebergConfig.isCollectExtendedStatisticsOnWrite(),
false))
.add(stringProperty(
HIVE_CATALOG_NAME,
"Catalog to redirect to when a Hive table is referenced",
Expand Down Expand Up @@ -440,6 +447,11 @@ public static boolean isExtendedStatisticsEnabled(ConnectorSession session)
return session.getProperty(EXTENDED_STATISTICS_ENABLED, Boolean.class);
}

public static boolean isCollectExtendedStatisticsOnWrite(ConnectorSession session)
{
return session.getProperty(COLLECT_EXTENDED_STATISTICS_ON_WRITE, Boolean.class);
}

public static boolean isProjectionPushdownEnabled(ConnectorSession session)
{
return session.getProperty(PROJECTION_PUSHDOWN_ENABLED, Boolean.class);
Expand Down
Loading

0 comments on commit bf04a72

Please sign in to comment.