Skip to content

Commit

Permalink
feat: Don't fail when reading non-identity partitioning field (deepha…
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam authored Dec 11, 2024
1 parent f32a930 commit 2da9dd5
Show file tree
Hide file tree
Showing 14 changed files with 211 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,31 @@
import io.deephaven.iceberg.util.IcebergTableAdapter;
import io.deephaven.iceberg.internal.DataInstructionsProviderLoader;
import io.deephaven.util.type.TypeUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.iceberg.*;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.jetbrains.annotations.NotNull;

import java.net.URI;
import java.util.*;
import java.util.stream.Collectors;

/**
* Iceberg {@link TableLocationKeyFinder location finder} for tables with partitions that will discover data files from
* a {@link Snapshot}
*/
public final class IcebergKeyValuePartitionedLayout extends IcebergBaseLayout {
private static class ColumnData {
private static class IdentityPartitioningColData {
final String name;
final Class<?> type;
final int index;
final int index; // position in the partition spec

public ColumnData(String name, Class<?> type, int index) {
private IdentityPartitioningColData(String name, Class<?> type, int index) {
this.name = name;
this.type = type;
this.index = index;
}
}

private final List<ColumnData> outputPartitioningColumns;
private final List<IdentityPartitioningColData> identityPartitioningColumns;

/**
* @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table.
Expand All @@ -53,33 +51,26 @@ public IcebergKeyValuePartitionedLayout(

// We can assume due to upstream validation that there are no duplicate names (after renaming) that are included
// in the output definition, so we can ignore duplicates.
final MutableInt icebergIndex = new MutableInt(0);
final Map<String, Integer> availablePartitioningColumns = partitionSpec.fields().stream()
.peek(partitionField -> {
// TODO (deephaven-core#6438): Add support to handle non-identity transforms
if (!partitionField.transform().isIdentity()) {
throw new TableDataException("Partition field " + partitionField.name() + " has a " +
"non-identity transform: " + partitionField.transform() + ", which is not supported");
}
})
.map(PartitionField::name)
.map(name -> instructions.columnRenames().getOrDefault(name, name))
.collect(Collectors.toMap(
name -> name,
name -> icebergIndex.getAndIncrement(),
(v1, v2) -> v1,
LinkedHashMap::new));
final List<PartitionField> partitionFields = partitionSpec.fields();
final int numPartitionFields = partitionFields.size();
identityPartitioningColumns = new ArrayList<>(numPartitionFields);
for (int fieldId = 0; fieldId < numPartitionFields; ++fieldId) {
final PartitionField partitionField = partitionFields.get(fieldId);
if (!partitionField.transform().isIdentity()) {
// TODO (DH-18160): Improve support for handling non-identity transforms
continue;
}
final String icebergColName = partitionField.name();
final String dhColName = instructions.columnRenames().getOrDefault(icebergColName, icebergColName);
final ColumnDefinition<?> columnDef = tableDef.getColumn(dhColName);
if (columnDef == null) {
// Table definition provided by the user doesn't have this column, so skip.
continue;
}
identityPartitioningColumns.add(new IdentityPartitioningColData(dhColName,
TypeUtils.getBoxedType(columnDef.getDataType()), fieldId));

outputPartitioningColumns = tableDef.getColumnStream()
.map((final ColumnDefinition<?> columnDef) -> {
final Integer index = availablePartitioningColumns.get(columnDef.getName());
if (index == null) {
return null;
}
return new ColumnData(columnDef.getName(), TypeUtils.getBoxedType(columnDef.getDataType()), index);
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
}

@Override
Expand All @@ -95,12 +86,11 @@ IcebergTableLocationKey keyFromDataFile(
final Map<String, Comparable<?>> partitions = new LinkedHashMap<>();

final PartitionData partitionData = (PartitionData) dataFile.partition();
for (final ColumnData colData : outputPartitioningColumns) {
for (final IdentityPartitioningColData colData : identityPartitioningColumns) {
final String colName = colData.name;
final Object colValue;
final Object valueFromPartitionData = partitionData.get(colData.index);
if (valueFromPartitionData != null) {
// TODO (deephaven-core#6438): Assuming identity transform here
colValue = IdentityPartitionConverters.convertConstant(
partitionData.getType(colData.index), valueFromPartitionData);
if (!colData.type.isAssignableFrom(colValue.getClass())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.iceberg;

import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.testutil.TstUtils;
import io.deephaven.engine.util.TableTools;
import io.deephaven.iceberg.sqlite.DbResource;
import io.deephaven.iceberg.util.IcebergCatalogAdapter;
import io.deephaven.iceberg.util.IcebergTableAdapter;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.time.LocalDateTime;
import java.net.URISyntaxException;
import java.util.List;
import static io.deephaven.util.QueryConstants.NULL_DOUBLE;

import static org.assertj.core.api.Assertions.assertThat;

/**
* This test shows that we can integrate with data written by <a href="https://py.iceberg.apache.org/">pyiceberg</a>.
* See TESTING.md and generate-pyiceberg-2.py for more details.
*/
@Tag("security-manager-allow")
class Pyiceberg2Test {
private static final Namespace NAMESPACE = Namespace.of("trading");
private static final TableIdentifier TRADING_DATA = TableIdentifier.of(NAMESPACE, "data");

// This will need to be updated if the data is regenerated
private static final long SNAPSHOT_1_ID = 2806418501596315192L;

private static final TableDefinition TABLE_DEFINITION = TableDefinition.of(
ColumnDefinition.fromGenericType("datetime", LocalDateTime.class),
ColumnDefinition.ofString("symbol").withPartitioning(),
ColumnDefinition.ofDouble("bid"),
ColumnDefinition.ofDouble("ask"));

private IcebergCatalogAdapter catalogAdapter;

@BeforeEach
void setUp() throws URISyntaxException {
catalogAdapter = DbResource.openCatalog("pyiceberg-2");
}

@Test
void catalogInfo() {
assertThat(catalogAdapter.listNamespaces()).containsExactly(NAMESPACE);
assertThat(catalogAdapter.listTables(NAMESPACE)).containsExactly(TRADING_DATA);

final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(TRADING_DATA);
final List<Snapshot> snapshots = tableAdapter.listSnapshots();
assertThat(snapshots).hasSize(1);
{
final Snapshot snapshot = snapshots.get(0);
assertThat(snapshot.parentId()).isNull();
assertThat(snapshot.schemaId()).isEqualTo(0);
assertThat(snapshot.sequenceNumber()).isEqualTo(1L);
assertThat(snapshot.snapshotId()).isEqualTo(SNAPSHOT_1_ID);
}
}

@Test
void testDefinition() {
final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(TRADING_DATA);
final TableDefinition td = tableAdapter.definition();
assertThat(td).isEqualTo(TABLE_DEFINITION);

// Check the partition spec
final PartitionSpec partitionSpec = tableAdapter.icebergTable().spec();
assertThat(partitionSpec.fields().size()).isEqualTo(2);
final PartitionField firstPartitionField = partitionSpec.fields().get(0);
assertThat(firstPartitionField.name()).isEqualTo("datetime_day");
assertThat(firstPartitionField.transform().toString()).isEqualTo("day");

final PartitionField secondPartitionField = partitionSpec.fields().get(1);
assertThat(secondPartitionField.name()).isEqualTo("symbol");
assertThat(secondPartitionField.transform().toString()).isEqualTo("identity");
}

@Test
void testData() {
final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(TRADING_DATA);
final Table fromIceberg = tableAdapter.table();
assertThat(fromIceberg.size()).isEqualTo(5);
final Table expectedData = TableTools.newTable(TABLE_DEFINITION,
TableTools.col("datetime",
LocalDateTime.of(2024, 11, 27, 10, 0, 0),
LocalDateTime.of(2024, 11, 27, 10, 0, 0),
LocalDateTime.of(2024, 11, 26, 10, 1, 0),
LocalDateTime.of(2024, 11, 26, 10, 2, 0),
LocalDateTime.of(2024, 11, 28, 10, 3, 0)),
TableTools.stringCol("symbol", "AAPL", "MSFT", "GOOG", "AMZN", "MSFT"),
TableTools.doubleCol("bid", 150.25, 150.25, 2800.75, 3400.5, NULL_DOUBLE),
TableTools.doubleCol("ask", 151.0, 151.0, 2810.5, 3420.0, 250.0));
TstUtils.assertTableEquals(expectedData.sort("datetime", "symbol"),
fromIceberg.sort("datetime", "symbol"));
}
}
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"location":"catalogs/pyiceberg-2/trading.db/data","table-uuid":"d1f874d4-c065-432d-969b-39efb3e9eb1c","last-updated-ms":1733868694938,"last-column-id":4,"schemas":[{"type":"struct","fields":[{"id":1,"name":"datetime","type":"timestamp","required":false},{"id":2,"name":"symbol","type":"string","required":false},{"id":3,"name":"bid","type":"double","required":false},{"id":4,"name":"ask","type":"double","required":false}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"day","name":"datetime_day"},{"source-id":2,"field-id":1001,"transform":"identity","name":"symbol"}]}],"default-spec-id":0,"last-partition-id":1001,"properties":{},"snapshots":[],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":2,"last-sequence-number":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"location":"catalogs/pyiceberg-2/trading.db/data","table-uuid":"d1f874d4-c065-432d-969b-39efb3e9eb1c","last-updated-ms":1733868695120,"last-column-id":4,"schemas":[{"type":"struct","fields":[{"id":1,"name":"datetime","type":"timestamp","required":false},{"id":2,"name":"symbol","type":"string","required":false},{"id":3,"name":"bid","type":"double","required":false},{"id":4,"name":"ask","type":"double","required":false}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"day","name":"datetime_day"},{"source-id":2,"field-id":1001,"transform":"identity","name":"symbol"}]}],"default-spec-id":0,"last-partition-id":1001,"properties":{},"current-snapshot-id":2806418501596315192,"snapshots":[{"snapshot-id":2806418501596315192,"sequence-number":1,"timestamp-ms":1733868695120,"manifest-list":"catalogs/pyiceberg-2/trading.db/data/metadata/snap-2806418501596315192-0-d9c06748-9892-404f-a744-7bbfd06d0eeb.avro","summary":{"operation":"append","added-files-size":"9816","added-data-files":"5","added-records":"5","changed-partition-count":"5","total-data-files":"5","total-delete-files":"0","total-records":"5","total-files-size":"9816","total-position-deletes":"0","total-equality-deletes":"0"},"schema-id":0}],"snapshot-log":[{"snapshot-id":2806418501596315192,"timestamp-ms":1733868695120}],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{"main":{"snapshot-id":2806418501596315192,"type":"branch"}},"format-version":2,"last-sequence-number":1}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
'''
See TESTING.md for how to run this script.
'''

from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, DoubleType
from pyiceberg.catalog.sql import SqlCatalog
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
'''
See TESTING.md for how to run this script.
'''

import pyarrow as pa
from datetime import datetime
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.schema import Schema
from pyiceberg.types import TimestampType, FloatType, DoubleType, StringType, NestedField, StructType
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform, IdentityTransform

catalog = SqlCatalog(
"pyiceberg-2",
**{
"uri": f"sqlite:///dh-iceberg-test.db",
"warehouse": f"catalogs/pyiceberg-2",
},
)

schema = Schema(
NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=False),
NestedField(field_id=2, name="symbol", field_type=StringType(), required=False),
NestedField(field_id=3, name="bid", field_type=DoubleType(), required=False),
NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False),
)

partition_spec = PartitionSpec(
PartitionField(
source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day",
),
PartitionField(
source_id=2, field_id=1001, transform=IdentityTransform(), name="symbol",
)
)

catalog.create_namespace("trading")

tbl = catalog.create_table(
identifier="trading.data",
schema=schema,
partition_spec=partition_spec,
)

# Define the data according to your Iceberg schema
data = [
{"datetime": datetime(2024, 11, 27, 10, 0, 0), "symbol": "AAPL", "bid": 150.25, "ask": 151.0},
{"datetime": datetime(2024, 11, 27, 10, 0, 0), "symbol": "MSFT", "bid": 150.25, "ask": 151.0},
{"datetime": datetime(2024, 11, 26, 10, 1, 0), "symbol": "GOOG", "bid": 2800.75, "ask": 2810.5},
{"datetime": datetime(2024, 11, 26, 10, 2, 0), "symbol": "AMZN", "bid": 3400.5, "ask": 3420.0},
{"datetime": datetime(2024, 11, 28, 10, 3, 0), "symbol": "MSFT", "bid": None, "ask": 250.0},
]

# Create a PyArrow Table
table = pa.Table.from_pylist(data)

# Append the table to the Iceberg table
tbl.append(table)

0 comments on commit 2da9dd5

Please sign in to comment.