diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java index 86e63f7dbb9..288d27b064b 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java @@ -11,33 +11,31 @@ import io.deephaven.iceberg.util.IcebergTableAdapter; import io.deephaven.iceberg.internal.DataInstructionsProviderLoader; import io.deephaven.util.type.TypeUtils; -import org.apache.commons.lang3.mutable.MutableInt; import org.apache.iceberg.*; import org.apache.iceberg.data.IdentityPartitionConverters; import org.jetbrains.annotations.NotNull; import java.net.URI; import java.util.*; -import java.util.stream.Collectors; /** * Iceberg {@link TableLocationKeyFinder location finder} for tables with partitions that will discover data files from * a {@link Snapshot} */ public final class IcebergKeyValuePartitionedLayout extends IcebergBaseLayout { - private static class ColumnData { + private static class IdentityPartitioningColData { final String name; final Class type; - final int index; + final int index; // position in the partition spec - public ColumnData(String name, Class type, int index) { + private IdentityPartitioningColData(String name, Class type, int index) { this.name = name; this.type = type; this.index = index; } } - private final List outputPartitioningColumns; + private final List identityPartitioningColumns; /** * @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table. @@ -53,33 +51,26 @@ public IcebergKeyValuePartitionedLayout( // We can assume due to upstream validation that there are no duplicate names (after renaming) that are included // in the output definition, so we can ignore duplicates. - final MutableInt icebergIndex = new MutableInt(0); - final Map availablePartitioningColumns = partitionSpec.fields().stream() - .peek(partitionField -> { - // TODO (deephaven-core#6438): Add support to handle non-identity transforms - if (!partitionField.transform().isIdentity()) { - throw new TableDataException("Partition field " + partitionField.name() + " has a " + - "non-identity transform: " + partitionField.transform() + ", which is not supported"); - } - }) - .map(PartitionField::name) - .map(name -> instructions.columnRenames().getOrDefault(name, name)) - .collect(Collectors.toMap( - name -> name, - name -> icebergIndex.getAndIncrement(), - (v1, v2) -> v1, - LinkedHashMap::new)); + final List partitionFields = partitionSpec.fields(); + final int numPartitionFields = partitionFields.size(); + identityPartitioningColumns = new ArrayList<>(numPartitionFields); + for (int fieldId = 0; fieldId < numPartitionFields; ++fieldId) { + final PartitionField partitionField = partitionFields.get(fieldId); + if (!partitionField.transform().isIdentity()) { + // TODO (DH-18160): Improve support for handling non-identity transforms + continue; + } + final String icebergColName = partitionField.name(); + final String dhColName = instructions.columnRenames().getOrDefault(icebergColName, icebergColName); + final ColumnDefinition columnDef = tableDef.getColumn(dhColName); + if (columnDef == null) { + // Table definition provided by the user doesn't have this column, so skip. + continue; + } + identityPartitioningColumns.add(new IdentityPartitioningColData(dhColName, + TypeUtils.getBoxedType(columnDef.getDataType()), fieldId)); - outputPartitioningColumns = tableDef.getColumnStream() - .map((final ColumnDefinition columnDef) -> { - final Integer index = availablePartitioningColumns.get(columnDef.getName()); - if (index == null) { - return null; - } - return new ColumnData(columnDef.getName(), TypeUtils.getBoxedType(columnDef.getDataType()), index); - }) - .filter(Objects::nonNull) - .collect(Collectors.toList()); + } } @Override @@ -95,12 +86,11 @@ IcebergTableLocationKey keyFromDataFile( final Map> partitions = new LinkedHashMap<>(); final PartitionData partitionData = (PartitionData) dataFile.partition(); - for (final ColumnData colData : outputPartitioningColumns) { + for (final IdentityPartitioningColData colData : identityPartitioningColumns) { final String colName = colData.name; final Object colValue; final Object valueFromPartitionData = partitionData.get(colData.index); if (valueFromPartitionData != null) { - // TODO (deephaven-core#6438): Assuming identity transform here colValue = IdentityPartitionConverters.convertConstant( partitionData.getType(colData.index), valueFromPartitionData); if (!colData.type.isAssignableFrom(colValue.getClass())) { diff --git a/extensions/iceberg/src/test/java/io/deephaven/iceberg/Pyiceberg2Test.java b/extensions/iceberg/src/test/java/io/deephaven/iceberg/Pyiceberg2Test.java new file mode 100644 index 00000000000..afcb7af1779 --- /dev/null +++ b/extensions/iceberg/src/test/java/io/deephaven/iceberg/Pyiceberg2Test.java @@ -0,0 +1,108 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg; + +import io.deephaven.engine.table.ColumnDefinition; +import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.TableDefinition; +import io.deephaven.engine.testutil.TstUtils; +import io.deephaven.engine.util.TableTools; +import io.deephaven.iceberg.sqlite.DbResource; +import io.deephaven.iceberg.util.IcebergCatalogAdapter; +import io.deephaven.iceberg.util.IcebergTableAdapter; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.time.LocalDateTime; +import java.net.URISyntaxException; +import java.util.List; +import static io.deephaven.util.QueryConstants.NULL_DOUBLE; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * This test shows that we can integrate with data written by pyiceberg. + * See TESTING.md and generate-pyiceberg-2.py for more details. + */ +@Tag("security-manager-allow") +class Pyiceberg2Test { + private static final Namespace NAMESPACE = Namespace.of("trading"); + private static final TableIdentifier TRADING_DATA = TableIdentifier.of(NAMESPACE, "data"); + + // This will need to be updated if the data is regenerated + private static final long SNAPSHOT_1_ID = 2806418501596315192L; + + private static final TableDefinition TABLE_DEFINITION = TableDefinition.of( + ColumnDefinition.fromGenericType("datetime", LocalDateTime.class), + ColumnDefinition.ofString("symbol").withPartitioning(), + ColumnDefinition.ofDouble("bid"), + ColumnDefinition.ofDouble("ask")); + + private IcebergCatalogAdapter catalogAdapter; + + @BeforeEach + void setUp() throws URISyntaxException { + catalogAdapter = DbResource.openCatalog("pyiceberg-2"); + } + + @Test + void catalogInfo() { + assertThat(catalogAdapter.listNamespaces()).containsExactly(NAMESPACE); + assertThat(catalogAdapter.listTables(NAMESPACE)).containsExactly(TRADING_DATA); + + final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(TRADING_DATA); + final List snapshots = tableAdapter.listSnapshots(); + assertThat(snapshots).hasSize(1); + { + final Snapshot snapshot = snapshots.get(0); + assertThat(snapshot.parentId()).isNull(); + assertThat(snapshot.schemaId()).isEqualTo(0); + assertThat(snapshot.sequenceNumber()).isEqualTo(1L); + assertThat(snapshot.snapshotId()).isEqualTo(SNAPSHOT_1_ID); + } + } + + @Test + void testDefinition() { + final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(TRADING_DATA); + final TableDefinition td = tableAdapter.definition(); + assertThat(td).isEqualTo(TABLE_DEFINITION); + + // Check the partition spec + final PartitionSpec partitionSpec = tableAdapter.icebergTable().spec(); + assertThat(partitionSpec.fields().size()).isEqualTo(2); + final PartitionField firstPartitionField = partitionSpec.fields().get(0); + assertThat(firstPartitionField.name()).isEqualTo("datetime_day"); + assertThat(firstPartitionField.transform().toString()).isEqualTo("day"); + + final PartitionField secondPartitionField = partitionSpec.fields().get(1); + assertThat(secondPartitionField.name()).isEqualTo("symbol"); + assertThat(secondPartitionField.transform().toString()).isEqualTo("identity"); + } + + @Test + void testData() { + final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(TRADING_DATA); + final Table fromIceberg = tableAdapter.table(); + assertThat(fromIceberg.size()).isEqualTo(5); + final Table expectedData = TableTools.newTable(TABLE_DEFINITION, + TableTools.col("datetime", + LocalDateTime.of(2024, 11, 27, 10, 0, 0), + LocalDateTime.of(2024, 11, 27, 10, 0, 0), + LocalDateTime.of(2024, 11, 26, 10, 1, 0), + LocalDateTime.of(2024, 11, 26, 10, 2, 0), + LocalDateTime.of(2024, 11, 28, 10, 3, 0)), + TableTools.stringCol("symbol", "AAPL", "MSFT", "GOOG", "AMZN", "MSFT"), + TableTools.doubleCol("bid", 150.25, 150.25, 2800.75, 3400.5, NULL_DOUBLE), + TableTools.doubleCol("ask", 151.0, 151.0, 2810.5, 3420.0, 250.0)); + TstUtils.assertTableEquals(expectedData.sort("datetime", "symbol"), + fromIceberg.sort("datetime", "symbol")); + } +} diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-26/symbol=AMZN/00000-0-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-26/symbol=AMZN/00000-0-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet new file mode 100644 index 00000000000..c9dd7a2aca7 --- /dev/null +++ b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-26/symbol=AMZN/00000-0-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:aef59cba214467bee2cb2d91118aadc6114718be02ab4f04d7742471f9436955 +size 1990 diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-26/symbol=GOOG/00000-1-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-26/symbol=GOOG/00000-1-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet new file mode 100644 index 00000000000..8d7be97be08 --- /dev/null +++ b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-26/symbol=GOOG/00000-1-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:9481c165c6356f03b3d4fe5df967a03422557d8d2bfc1c58d8dc052fe18fec06 +size 1990 diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-27/symbol=AAPL/00000-2-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-27/symbol=AAPL/00000-2-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet new file mode 100644 index 00000000000..4106ca81801 --- /dev/null +++ b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-27/symbol=AAPL/00000-2-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:e1568c608054bbd0de727d4909b29a3bdb777e2b6723a74d1641c6326d3b35cd +size 1990 diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-27/symbol=MSFT/00000-3-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-27/symbol=MSFT/00000-3-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet new file mode 100644 index 00000000000..38515dcd35b --- /dev/null +++ b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-27/symbol=MSFT/00000-3-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:79f26f9cb4e2b00548491122eb8a0efb04a8ff37d0f5bce65350c38b7b17af19 +size 1990 diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-28/symbol=MSFT/00000-4-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-28/symbol=MSFT/00000-4-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet new file mode 100644 index 00000000000..d3b061d46b0 --- /dev/null +++ b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-28/symbol=MSFT/00000-4-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:a6a8ee2286b0cdf702fef4745471375adf9b819883726d659129e0ba95e0c8dd +size 1856 diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/00000-0956ea6c-b522-447f-a2f4-5c6e7b104783.metadata.json b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/00000-0956ea6c-b522-447f-a2f4-5c6e7b104783.metadata.json new file mode 100644 index 00000000000..0bf00e06545 --- /dev/null +++ b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/00000-0956ea6c-b522-447f-a2f4-5c6e7b104783.metadata.json @@ -0,0 +1 @@ +{"location":"catalogs/pyiceberg-2/trading.db/data","table-uuid":"d1f874d4-c065-432d-969b-39efb3e9eb1c","last-updated-ms":1733868694938,"last-column-id":4,"schemas":[{"type":"struct","fields":[{"id":1,"name":"datetime","type":"timestamp","required":false},{"id":2,"name":"symbol","type":"string","required":false},{"id":3,"name":"bid","type":"double","required":false},{"id":4,"name":"ask","type":"double","required":false}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"day","name":"datetime_day"},{"source-id":2,"field-id":1001,"transform":"identity","name":"symbol"}]}],"default-spec-id":0,"last-partition-id":1001,"properties":{},"snapshots":[],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":2,"last-sequence-number":0} \ No newline at end of file diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/00001-4e3fe6dc-5e3e-4da1-9da3-666cbad70ace.metadata.json b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/00001-4e3fe6dc-5e3e-4da1-9da3-666cbad70ace.metadata.json new file mode 100644 index 00000000000..fea345f31a8 --- /dev/null +++ b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/00001-4e3fe6dc-5e3e-4da1-9da3-666cbad70ace.metadata.json @@ -0,0 +1 @@ +{"location":"catalogs/pyiceberg-2/trading.db/data","table-uuid":"d1f874d4-c065-432d-969b-39efb3e9eb1c","last-updated-ms":1733868695120,"last-column-id":4,"schemas":[{"type":"struct","fields":[{"id":1,"name":"datetime","type":"timestamp","required":false},{"id":2,"name":"symbol","type":"string","required":false},{"id":3,"name":"bid","type":"double","required":false},{"id":4,"name":"ask","type":"double","required":false}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"day","name":"datetime_day"},{"source-id":2,"field-id":1001,"transform":"identity","name":"symbol"}]}],"default-spec-id":0,"last-partition-id":1001,"properties":{},"current-snapshot-id":2806418501596315192,"snapshots":[{"snapshot-id":2806418501596315192,"sequence-number":1,"timestamp-ms":1733868695120,"manifest-list":"catalogs/pyiceberg-2/trading.db/data/metadata/snap-2806418501596315192-0-d9c06748-9892-404f-a744-7bbfd06d0eeb.avro","summary":{"operation":"append","added-files-size":"9816","added-data-files":"5","added-records":"5","changed-partition-count":"5","total-data-files":"5","total-delete-files":"0","total-records":"5","total-files-size":"9816","total-position-deletes":"0","total-equality-deletes":"0"},"schema-id":0}],"snapshot-log":[{"snapshot-id":2806418501596315192,"timestamp-ms":1733868695120}],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{"main":{"snapshot-id":2806418501596315192,"type":"branch"}},"format-version":2,"last-sequence-number":1} \ No newline at end of file diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/d9c06748-9892-404f-a744-7bbfd06d0eeb-m0.avro b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/d9c06748-9892-404f-a744-7bbfd06d0eeb-m0.avro new file mode 100644 index 00000000000..36d7d3ca191 Binary files /dev/null and b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/d9c06748-9892-404f-a744-7bbfd06d0eeb-m0.avro differ diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/snap-2806418501596315192-0-d9c06748-9892-404f-a744-7bbfd06d0eeb.avro b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/snap-2806418501596315192-0-d9c06748-9892-404f-a744-7bbfd06d0eeb.avro new file mode 100644 index 00000000000..20d65fea166 Binary files /dev/null and b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/snap-2806418501596315192-0-d9c06748-9892-404f-a744-7bbfd06d0eeb.avro differ diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/dh-iceberg-test.db b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/dh-iceberg-test.db index a59334f571f..44c557bd98e 100644 Binary files a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/dh-iceberg-test.db and b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/dh-iceberg-test.db differ diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/generate-pyiceberg-1.py b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/generate-pyiceberg-1.py index 86c6c2728c8..25b9c88da8d 100644 --- a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/generate-pyiceberg-1.py +++ b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/generate-pyiceberg-1.py @@ -1,3 +1,7 @@ +''' +See TESTING.md for how to run this script. +''' + from pyiceberg.schema import Schema from pyiceberg.types import NestedField, StringType, DoubleType from pyiceberg.catalog.sql import SqlCatalog diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/generate-pyiceberg-2.py b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/generate-pyiceberg-2.py new file mode 100644 index 00000000000..fc7b2c5a588 --- /dev/null +++ b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/generate-pyiceberg-2.py @@ -0,0 +1,58 @@ +''' +See TESTING.md for how to run this script. +''' + +import pyarrow as pa +from datetime import datetime +from pyiceberg.catalog.sql import SqlCatalog +from pyiceberg.schema import Schema +from pyiceberg.types import TimestampType, FloatType, DoubleType, StringType, NestedField, StructType +from pyiceberg.partitioning import PartitionSpec, PartitionField +from pyiceberg.transforms import DayTransform, IdentityTransform + +catalog = SqlCatalog( + "pyiceberg-2", + **{ + "uri": f"sqlite:///dh-iceberg-test.db", + "warehouse": f"catalogs/pyiceberg-2", + }, +) + +schema = Schema( + NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=False), + NestedField(field_id=2, name="symbol", field_type=StringType(), required=False), + NestedField(field_id=3, name="bid", field_type=DoubleType(), required=False), + NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False), +) + +partition_spec = PartitionSpec( + PartitionField( + source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day", + ), + PartitionField( + source_id=2, field_id=1001, transform=IdentityTransform(), name="symbol", + ) +) + +catalog.create_namespace("trading") + +tbl = catalog.create_table( + identifier="trading.data", + schema=schema, + partition_spec=partition_spec, +) + +# Define the data according to your Iceberg schema +data = [ + {"datetime": datetime(2024, 11, 27, 10, 0, 0), "symbol": "AAPL", "bid": 150.25, "ask": 151.0}, + {"datetime": datetime(2024, 11, 27, 10, 0, 0), "symbol": "MSFT", "bid": 150.25, "ask": 151.0}, + {"datetime": datetime(2024, 11, 26, 10, 1, 0), "symbol": "GOOG", "bid": 2800.75, "ask": 2810.5}, + {"datetime": datetime(2024, 11, 26, 10, 2, 0), "symbol": "AMZN", "bid": 3400.5, "ask": 3420.0}, + {"datetime": datetime(2024, 11, 28, 10, 3, 0), "symbol": "MSFT", "bid": None, "ask": 250.0}, +] + +# Create a PyArrow Table +table = pa.Table.from_pylist(data) + +# Append the table to the Iceberg table +tbl.append(table)