Skip to content

Commit

Permalink
Allow Iceberg MV with partitioning transforms on timestamptz
Browse files Browse the repository at this point in the history
Allow creation of Iceberg Materialized Views partitioned with a
temporal partitioning function on a `timestamp with time zone` column.

In MVs, the `timestamp with time zone` columns are generally stored as
text to preserve time zone information. However, this prevents use of
temporal partitioning functions on these columns. The commit keeps
`timestamp with time zone` columns with partitioning applied on them as
`timestamp with time zone` in the storage table.

An obvious downside to this approach is that the time zone information
is erased and it is not known whether this aligns with user intention or
not. A better solution would be to introduce a point-in-time type
(#2273) to discern between the
cases where time zone information is important (like Java's
`ZonedDateTime`) from cases where only point-in-time matters (like
Java's `Instant`).
  • Loading branch information
findepi committed Mar 29, 2023
1 parent 7e01216 commit 54395fd
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public static ColumnTransform getColumnTransform(PartitionField field, Type sour

private static ColumnTransform identity(Type type)
{
return new ColumnTransform(type, false, true, Function.identity(), ValueTransform.identity(type));
return new ColumnTransform(type, false, true, false, Function.identity(), ValueTransform.identity(type));
}

@VisibleForTesting
Expand All @@ -174,6 +174,7 @@ static ColumnTransform bucket(Type type, int count)
INTEGER,
false,
false,
false,
block -> bucketBlock(block, count, hasher),
(block, position) -> {
if (block.isNull(position)) {
Expand Down Expand Up @@ -230,6 +231,7 @@ private static ColumnTransform yearsFromDate()
INTEGER,
false,
true,
true,
block -> transformBlock(DATE, INTEGER, block, transform),
ValueTransform.from(DATE, transform));
}
Expand All @@ -241,6 +243,7 @@ private static ColumnTransform monthsFromDate()
INTEGER,
false,
true,
true,
block -> transformBlock(DATE, INTEGER, block, transform),
ValueTransform.from(DATE, transform));
}
Expand All @@ -252,6 +255,7 @@ private static ColumnTransform daysFromDate()
INTEGER,
false,
true,
true,
block -> transformBlock(DATE, INTEGER, block, transform),
ValueTransform.from(DATE, transform));
}
Expand All @@ -263,6 +267,7 @@ private static ColumnTransform yearsFromTimestamp()
INTEGER,
false,
true,
true,
block -> transformBlock(TIMESTAMP_MICROS, INTEGER, block, transform),
ValueTransform.from(TIMESTAMP_MICROS, transform));
}
Expand All @@ -274,6 +279,7 @@ private static ColumnTransform monthsFromTimestamp()
INTEGER,
false,
true,
true,
block -> transformBlock(TIMESTAMP_MICROS, INTEGER, block, transform),
ValueTransform.from(TIMESTAMP_MICROS, transform));
}
Expand All @@ -285,6 +291,7 @@ private static ColumnTransform daysFromTimestamp()
INTEGER,
false,
true,
true,
block -> transformBlock(TIMESTAMP_MICROS, INTEGER, block, transform),
ValueTransform.from(TIMESTAMP_MICROS, transform));
}
Expand All @@ -296,6 +303,7 @@ private static ColumnTransform hoursFromTimestamp()
INTEGER,
false,
true,
true,
block -> transformBlock(TIMESTAMP_MICROS, INTEGER, block, transform),
ValueTransform.from(TIMESTAMP_MICROS, transform));
}
Expand All @@ -307,6 +315,7 @@ private static ColumnTransform yearsFromTimestampWithTimeZone()
INTEGER,
false,
true,
true,
block -> extractTimestampWithTimeZone(block, transform),
ValueTransform.fromTimestampTzTransform(transform));
}
Expand All @@ -318,6 +327,7 @@ private static ColumnTransform monthsFromTimestampWithTimeZone()
INTEGER,
false,
true,
true,
block -> extractTimestampWithTimeZone(block, transform),
ValueTransform.fromTimestampTzTransform(transform));
}
Expand All @@ -329,6 +339,7 @@ private static ColumnTransform daysFromTimestampWithTimeZone()
INTEGER,
false,
true,
true,
block -> extractTimestampWithTimeZone(block, transform),
ValueTransform.fromTimestampTzTransform(transform));
}
Expand All @@ -340,6 +351,7 @@ private static ColumnTransform hoursFromTimestampWithTimeZone()
INTEGER,
false,
true,
true,
block -> extractTimestampWithTimeZone(block, transform),
ValueTransform.fromTimestampTzTransform(transform));
}
Expand Down Expand Up @@ -453,6 +465,7 @@ private static ColumnTransform truncateInteger(int width)
INTEGER,
false,
true,
false,
block -> truncateInteger(block, width),
(block, position) -> {
if (block.isNull(position)) {
Expand Down Expand Up @@ -487,6 +500,7 @@ private static ColumnTransform truncateBigint(int width)
BIGINT,
false,
true,
false,
block -> truncateBigint(block, width),
(block, position) -> {
if (block.isNull(position)) {
Expand Down Expand Up @@ -522,6 +536,7 @@ private static ColumnTransform truncateShortDecimal(Type type, int width, Decima
type,
false,
true,
false,
block -> truncateShortDecimal(decimal, block, unscaledWidth),
(block, position) -> {
if (block.isNull(position)) {
Expand Down Expand Up @@ -559,6 +574,7 @@ private static ColumnTransform truncateLongDecimal(Type type, int width, Decimal
type,
false,
true,
false,
block -> truncateLongDecimal(decimal, block, unscaledWidth),
(block, position) -> {
if (block.isNull(position)) {
Expand Down Expand Up @@ -606,6 +622,7 @@ private static ColumnTransform truncateVarchar(int width)
VARCHAR,
false,
true,
false,
block -> truncateVarchar(block, width),
(block, position) -> {
if (block.isNull(position)) {
Expand Down Expand Up @@ -647,6 +664,7 @@ private static ColumnTransform truncateVarbinary(int width)
VARBINARY,
false,
true,
false,
block -> truncateVarbinary(block, width),
(block, position) -> {
if (block.isNull(position)) {
Expand Down Expand Up @@ -685,6 +703,7 @@ private static ColumnTransform voidTransform(Type type)
type,
true,
true,
false,
block -> RunLengthEncodedBlock.create(nullBlock, block.getPositionCount()),
(block, position) -> null);
}
Expand Down Expand Up @@ -739,14 +758,16 @@ public static class ColumnTransform
private final Type type;
private final boolean preservesNonNull;
private final boolean monotonic;
private final boolean temporal;
private final Function<Block, Block> blockTransform;
private final ValueTransform valueTransform;

public ColumnTransform(Type type, boolean preservesNonNull, boolean monotonic, Function<Block, Block> blockTransform, ValueTransform valueTransform)
public ColumnTransform(Type type, boolean preservesNonNull, boolean monotonic, boolean temporal, Function<Block, Block> blockTransform, ValueTransform valueTransform)
{
this.type = requireNonNull(type, "type is null");
this.preservesNonNull = preservesNonNull;
this.monotonic = monotonic;
this.temporal = temporal;
this.blockTransform = requireNonNull(blockTransform, "transform is null");
this.valueTransform = requireNonNull(valueTransform, "valueTransform is null");
}
Expand All @@ -769,6 +790,11 @@ public boolean isMonotonic()
return monotonic;
}

public boolean isTemporal()
{
return temporal;
}

public Function<Block, Block> getBlockTransform()
{
return blockTransform;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.plugin.iceberg.ColumnIdentity;
import io.trino.plugin.iceberg.IcebergMaterializedViewDefinition;
import io.trino.plugin.iceberg.IcebergUtil;
import io.trino.plugin.iceberg.PartitionTransforms.ColumnTransform;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.CatalogSchemaTableName;
import io.trino.spi.connector.ColumnMetadata;
Expand All @@ -48,6 +49,7 @@
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.types.Types;

import java.io.IOException;
import java.time.Duration;
Expand All @@ -56,25 +58,35 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;

import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.hive.HiveMetadata.STORAGE_TABLE;
import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
import static io.trino.plugin.hive.ViewReaderUtil.ICEBERG_MATERIALIZED_VIEW_COMMENT;
import static io.trino.plugin.hive.ViewReaderUtil.PRESTO_VIEW_FLAG;
import static io.trino.plugin.hive.metastore.glue.converter.GlueToTrinoConverter.mappedCopy;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static io.trino.plugin.iceberg.IcebergMaterializedViewAdditionalProperties.STORAGE_SCHEMA;
import static io.trino.plugin.iceberg.IcebergMaterializedViewAdditionalProperties.getStorageSchema;
import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.decodeMaterializedViewData;
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
import static io.trino.plugin.iceberg.IcebergUtil.commit;
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableProperties;
import static io.trino.plugin.iceberg.IcebergUtil.schemaFromMetadata;
import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields;
import static io.trino.plugin.iceberg.PartitionTransforms.getColumnTransform;
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimeType.TIME_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.String.format;
Expand Down Expand Up @@ -217,9 +229,46 @@ protected SchemaTableName createMaterializedViewStorageTable(ConnectorSession se

String storageSchema = getStorageSchema(definition.getProperties()).orElse(viewName.getSchemaName());
SchemaTableName storageTable = new SchemaTableName(storageSchema, storageTableName);
List<ColumnMetadata> columns = definition.getColumns().stream()
.map(column -> new ColumnMetadata(column.getName(), typeForMaterializedViewStorageTable(typeManager.getType(column.getType()))))
.collect(toImmutableList());

Schema schemaWithTimestampTzPreserved = schemaFromMetadata(mappedCopy(
definition.getColumns(),
column -> {
Type type = typeManager.getType(column.getType());
if (type instanceof TimestampWithTimeZoneType timestampTzType && timestampTzType.getPrecision() <= 6) {
// For now preserve timestamptz columns so that we can parse partitioning
type = TIMESTAMP_TZ_MICROS;
}
else {
type = typeForMaterializedViewStorageTable(type);
}
return new ColumnMetadata(column.getName(), type);
}));
PartitionSpec partitionSpec = parsePartitionFields(schemaWithTimestampTzPreserved, getPartitioning(definition.getProperties()));
Set<String> temporalPartitioningSources = partitionSpec.fields().stream()
.flatMap(partitionField -> {
Types.NestedField sourceField = schemaWithTimestampTzPreserved.findField(partitionField.sourceId());
Type sourceType = toTrinoType(sourceField.type(), typeManager);
ColumnTransform columnTransform = getColumnTransform(partitionField, sourceType);
if (!columnTransform.isTemporal()) {
return Stream.of();
}
return Stream.of(sourceField.name());
})
.collect(toImmutableSet());

List<ColumnMetadata> columns = mappedCopy(
definition.getColumns(),
column -> {
Type type = typeManager.getType(column.getType());
if (type instanceof TimestampWithTimeZoneType timestampTzType && timestampTzType.getPrecision() <= 6 && temporalPartitioningSources.contains(column.getName())) {
// Apply point-in-time semantics to maintain partitioning capabilities
type = TIMESTAMP_TZ_MICROS;
}
else {
type = typeForMaterializedViewStorageTable(type);
}
return new ColumnMetadata(column.getName(), type);
});

ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(storageTable, columns, storageTableProperties, Optional.empty());
Transaction transaction = IcebergUtil.newCreateTableTransaction(this, tableMetadata, session);
Expand Down Expand Up @@ -258,9 +307,7 @@ private Type typeForMaterializedViewStorageTable(Type type)
: VARCHAR;
}
if (type instanceof TimestampWithTimeZoneType) {
// Iceberg does not store the time zone
// TODO allow temporal partitioning on these columns, or MV property to
// drop zone info and use timestamptz directly
// Iceberg does not store the time zone.
return VARCHAR;
}
if (type instanceof ArrayType arrayType) {
Expand Down
Loading

0 comments on commit 54395fd

Please sign in to comment.