diff --git a/crates/iceberg/src/avro/mod.rs b/crates/iceberg/src/avro/mod.rs index a6bb0735f..bdccb2ff4 100644 --- a/crates/iceberg/src/avro/mod.rs +++ b/crates/iceberg/src/avro/mod.rs @@ -18,3 +18,4 @@ //! Avro related codes. #[allow(dead_code)] mod schema; +pub(crate) use schema::*; diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs new file mode 100644 index 000000000..71e232594 --- /dev/null +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -0,0 +1,943 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! ManifestList for Iceberg. + +use crate::{avro::schema_to_avro_schema, spec::Literal, Error}; +use apache_avro::{from_value, types::Value, Reader}; + +use super::{FormatVersion, Schema, StructType}; + +/// Snapshots are embedded in table metadata, but the list of manifests for a +/// snapshot are stored in a separate manifest list file. +/// +/// A new manifest list is written for each attempt to commit a snapshot +/// because the list of manifests always changes to produce a new snapshot. +/// When a manifest list is written, the (optimistic) sequence number of the +/// snapshot is written for all new manifest files tracked by the list. +/// +/// A manifest list includes summary metadata that can be used to avoid +/// scanning all of the manifests in a snapshot when planning a table scan. +/// This includes the number of added, existing, and deleted files, and a +/// summary of values for each field of the partition spec used to write the +/// manifest. +#[derive(Debug, Clone)] +pub struct ManifestList { + /// Entries in a manifest list. + entries: Vec, +} + +impl ManifestList { + /// Parse manifest list from bytes. + pub fn parse_with_version( + bs: &[u8], + version: FormatVersion, + partition_type: &StructType, + ) -> Result { + match version { + FormatVersion::V2 => { + let schema = schema_to_avro_schema("manifest_list", &Self::v2_schema()).unwrap(); + let reader = Reader::with_schema(&schema, bs)?; + let values = Value::Array(reader.collect::, _>>()?); + from_value::<_serde::ManifestListV2>(&values)?.try_into(partition_type) + } + FormatVersion::V1 => { + let schema = schema_to_avro_schema("manifest_list", &Self::v1_schema()).unwrap(); + let reader = Reader::with_schema(&schema, bs)?; + let values = Value::Array(reader.collect::, _>>()?); + from_value::<_serde::ManifestListV1>(&values)?.try_into(partition_type) + } + } + } + + /// Get the entries in the manifest list. + pub fn entries(&self) -> &[ManifestListEntry] { + &self.entries + } + + /// Get the v2 schema of the manifest list entry. + pub(crate) fn v2_schema() -> Schema { + let fields = vec![ + _const_fields::MANIFEST_PATH.clone(), + _const_fields::MANIFEST_LENGTH.clone(), + _const_fields::PARTITION_SPEC_ID.clone(), + _const_fields::CONTENT.clone(), + _const_fields::SEQUENCE_NUMBER.clone(), + _const_fields::MIN_SEQUENCE_NUMBER.clone(), + _const_fields::ADDED_SNAPSHOT_ID.clone(), + _const_fields::ADDED_FILES_COUNT_V2.clone(), + _const_fields::EXISTING_FILES_COUNT_V2.clone(), + _const_fields::DELETED_FILES_COUNT_V2.clone(), + _const_fields::ADDED_ROWS_COUNT_V2.clone(), + _const_fields::EXISTING_ROWS_COUNT_V2.clone(), + _const_fields::DELETED_ROWS_COUNT_V2.clone(), + _const_fields::PARTITIONS.clone(), + _const_fields::KEY_METADATA.clone(), + ]; + Schema::builder().with_fields(fields).build().unwrap() + } + + /// Get the v1 schema of the manifest list entry. + pub(crate) fn v1_schema() -> Schema { + let fields = vec![ + _const_fields::MANIFEST_PATH.clone(), + _const_fields::MANIFEST_LENGTH.clone(), + _const_fields::PARTITION_SPEC_ID.clone(), + _const_fields::ADDED_SNAPSHOT_ID.clone(), + _const_fields::ADDED_FILES_COUNT_V1.clone().to_owned(), + _const_fields::EXISTING_FILES_COUNT_V1.clone(), + _const_fields::DELETED_FILES_COUNT_V1.clone(), + _const_fields::ADDED_ROWS_COUNT_V1.clone(), + _const_fields::EXISTING_ROWS_COUNT_V1.clone(), + _const_fields::DELETED_ROWS_COUNT_V1.clone(), + _const_fields::PARTITIONS.clone(), + _const_fields::KEY_METADATA.clone(), + ]; + Schema::builder().with_fields(fields).build().unwrap() + } +} + +/// This is a helper module that defines the schema field of the manifest list entry. +mod _const_fields { + use std::sync::Arc; + + use once_cell::sync::Lazy; + + use crate::spec::{ListType, NestedField, NestedFieldRef, PrimitiveType, StructType, Type}; + + pub(crate) static MANIFEST_PATH: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 500, + "manifest_path", + Type::Primitive(PrimitiveType::String), + )) + }) + }; + pub(crate) static MANIFEST_LENGTH: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 501, + "manifest_length", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + pub(crate) static PARTITION_SPEC_ID: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 502, + "partition_spec_id", + Type::Primitive(PrimitiveType::Int), + )) + }) + }; + pub(crate) static CONTENT: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 517, + "content", + Type::Primitive(PrimitiveType::Int), + )) + }) + }; + pub(crate) static SEQUENCE_NUMBER: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 515, + "sequence_number", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + pub(crate) static MIN_SEQUENCE_NUMBER: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 516, + "min_sequence_number", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + pub(crate) static ADDED_SNAPSHOT_ID: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 503, + "added_snapshot_id", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + pub(crate) static ADDED_FILES_COUNT_V2: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 504, + "added_data_files_count", + Type::Primitive(PrimitiveType::Int), + )) + }) + }; + pub(crate) static ADDED_FILES_COUNT_V1: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 504, + "added_data_files_count", + Type::Primitive(PrimitiveType::Int), + )) + }) + }; + pub(crate) static EXISTING_FILES_COUNT_V2: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 505, + "existing_data_files_count", + Type::Primitive(PrimitiveType::Int), + )) + }) + }; + pub(crate) static EXISTING_FILES_COUNT_V1: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 505, + "existing_data_files_count", + Type::Primitive(PrimitiveType::Int), + )) + }) + }; + pub(crate) static DELETED_FILES_COUNT_V2: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 506, + "deleted_data_files_count", + Type::Primitive(PrimitiveType::Int), + )) + }) + }; + pub(crate) static DELETED_FILES_COUNT_V1: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 506, + "deleted_data_files_count", + Type::Primitive(PrimitiveType::Int), + )) + }) + }; + pub(crate) static ADDED_ROWS_COUNT_V2: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 512, + "added_rows_count", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + pub(crate) static ADDED_ROWS_COUNT_V1: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 512, + "added_rows_count", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + pub(crate) static EXISTING_ROWS_COUNT_V2: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 513, + "existing_rows_count", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + pub(crate) static EXISTING_ROWS_COUNT_V1: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 513, + "existing_rows_count", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + pub(crate) static DELETED_ROWS_COUNT_V2: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 514, + "deleted_rows_count", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + pub(crate) static DELETED_ROWS_COUNT_V1: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 514, + "deleted_rows_count", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + pub(crate) static PARTITIONS: Lazy = { + Lazy::new(|| { + // element type + let fields = vec![ + Arc::new(NestedField::required( + 509, + "contains_null", + Type::Primitive(PrimitiveType::Boolean), + )), + Arc::new(NestedField::optional( + 518, + "contains_nan", + Type::Primitive(PrimitiveType::Boolean), + )), + Arc::new(NestedField::optional( + 510, + "lower_bound", + Type::Primitive(PrimitiveType::Binary), + )), + Arc::new(NestedField::optional( + 511, + "upper_bound", + Type::Primitive(PrimitiveType::Binary), + )), + ]; + let element_field = Arc::new(NestedField::required( + 508, + "r_508", + Type::Struct(StructType::new(fields)), + )); + Arc::new(NestedField::optional( + 507, + "partitions", + Type::List(ListType { element_field }), + )) + }) + }; + pub(crate) static KEY_METADATA: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 519, + "key_metadata", + Type::Primitive(PrimitiveType::Binary), + )) + }) + }; +} + +/// Entry in a manifest list. +#[derive(Debug, PartialEq, Clone)] +pub struct ManifestListEntry { + /// field: 500 + /// + /// Location of the manifest file + manifest_path: String, + /// field: 501 + /// + /// Length of the manifest file in bytes + manifest_length: i64, + /// field: 502 + /// + /// ID of a partition spec used to write the manifest; must be listed + /// in table metadata partition-specs + partition_spec_id: i32, + /// field: 517 + /// + /// The type of files tracked by the manifest, either data or delete + /// files; 0 for all v1 manifests + content: ManifestContentType, + /// field: 515 + /// + /// The sequence number when the manifest was added to the table; use 0 + /// when reading v1 manifest lists + sequence_number: i64, + /// field: 516 + /// + /// The minimum data sequence number of all live data or delete files in + /// the manifest; use 0 when reading v1 manifest lists + min_sequence_number: i64, + /// field: 503 + /// + /// ID of the snapshot where the manifest file was added + added_snapshot_id: i64, + /// field: 504 + /// + /// Number of entries in the manifest that have status ADDED, when null + /// this is assumed to be non-zero + added_data_files_count: Option, + /// field: 505 + /// + /// Number of entries in the manifest that have status EXISTING (0), + /// when null this is assumed to be non-zero + existing_data_files_count: Option, + /// field: 506 + /// + /// Number of entries in the manifest that have status DELETED (2), + /// when null this is assumed to be non-zero + deleted_data_files_count: Option, + /// field: 512 + /// + /// Number of rows in all of files in the manifest that have status + /// ADDED, when null this is assumed to be non-zero + added_rows_count: Option, + /// field: 513 + /// + /// Number of rows in all of files in the manifest that have status + /// EXISTING, when null this is assumed to be non-zero + existing_rows_count: Option, + /// field: 514 + /// + /// Number of rows in all of files in the manifest that have status + /// DELETED, when null this is assumed to be non-zero + deleted_rows_count: Option, + /// field: 507 + /// element_field: 508 + /// + /// A list of field summaries for each partition field in the spec. Each + /// field in the list corresponds to a field in the manifest file’s + /// partition spec. + partitions: Vec, + /// field: 519 + /// + /// Implementation-specific key metadata for encryption + key_metadata: Vec, +} + +/// The type of files tracked by the manifest, either data or delete files; Data(0) for all v1 manifests +#[derive(Debug, PartialEq, Clone)] +pub enum ManifestContentType { + /// The manifest content is data. + Data = 0, + /// The manifest content is deletes. + Deletes = 1, +} + +impl TryFrom for ManifestContentType { + type Error = Error; + + fn try_from(value: i32) -> Result { + match value { + 0 => Ok(ManifestContentType::Data), + 1 => Ok(ManifestContentType::Deletes), + _ => Err(Error::new( + crate::ErrorKind::DataInvalid, + format!( + "Invalid manifest content type. Expected 0 or 1, got {}", + value + ), + )), + } + } +} + +/// Field summary for partition field in the spec. +/// +/// Each field in the list corresponds to a field in the manifest file’s partition spec. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct FieldSummary { + /// field: 509 + /// + /// Whether the manifest contains at least one partition with a null + /// value for the field + contains_null: bool, + /// field: 518 + /// Whether the manifest contains at least one partition with a NaN + /// value for the field + contains_nan: Option, + /// field: 510 + /// The minimum value for the field in the manifests + /// partitions. + lower_bound: Option, + /// field: 511 + /// The maximum value for the field in the manifests + /// partitions. + upper_bound: Option, +} + +/// This is a helper module that defines types to help with serialization/deserialization. +/// For deserialization the input first gets read into either the [ManifestListEntryV1] or [ManifestListEntryV2] struct +/// and then converted into the [ManifestListEntry] struct. Serialization works the other way around. +/// [ManifestListEntryV1] and [ManifestListEntryV2] are internal struct that are only used for serialization and deserialization. +pub(super) mod _serde { + pub use serde_bytes::ByteBuf; + use serde_derive::{Deserialize, Serialize}; + + use crate::{ + spec::{Literal, StructType, Type}, + Error, + }; + + use super::ManifestListEntry; + + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + #[serde(transparent)] + pub(crate) struct ManifestListV2 { + entries: Vec, + } + + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + #[serde(transparent)] + pub(crate) struct ManifestListV1 { + entries: Vec, + } + + impl ManifestListV2 { + /// Converts the [ManifestListV2] into a [ManifestList]. + /// The convert of [entries] need the partition_type info so use this function instead of [std::TryFrom] trait. + pub fn try_into(self, partition_type: &StructType) -> Result { + Ok(super::ManifestList { + entries: self + .entries + .into_iter() + .map(|v| v.try_into(partition_type)) + .collect::, _>>()?, + }) + } + } + + impl TryFrom for ManifestListV2 { + type Error = Error; + + fn try_from(value: super::ManifestList) -> Result { + Ok(Self { + entries: value + .entries + .into_iter() + .map(TryInto::try_into) + .collect::, _>>()?, + }) + } + } + + impl ManifestListV1 { + /// Converts the [ManifestListV1] into a [ManifestList]. + /// The convert of [entries] need the partition_type info so use this function instead of [std::TryFrom] trait. + pub fn try_into(self, partition_type: &StructType) -> Result { + Ok(super::ManifestList { + entries: self + .entries + .into_iter() + .map(|v| v.try_into(partition_type)) + .collect::, _>>()?, + }) + } + } + + impl From for ManifestListV1 { + fn from(value: super::ManifestList) -> Self { + Self { + entries: value.entries.into_iter().map(Into::into).collect(), + } + } + } + + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + pub(super) struct ManifestListEntryV1 { + pub manifest_path: String, + pub manifest_length: i64, + pub partition_spec_id: i32, + pub added_snapshot_id: i64, + pub added_data_files_count: Option, + pub existing_data_files_count: Option, + pub deleted_data_files_count: Option, + pub added_rows_count: Option, + pub existing_rows_count: Option, + pub deleted_rows_count: Option, + pub partitions: Option>, + pub key_metadata: Option, + } + + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + pub(super) struct ManifestListEntryV2 { + pub manifest_path: String, + pub manifest_length: i64, + pub partition_spec_id: i32, + pub content: i32, + pub sequence_number: i64, + pub min_sequence_number: i64, + pub added_snapshot_id: i64, + pub added_data_files_count: i32, + pub existing_data_files_count: i32, + pub deleted_data_files_count: i32, + pub added_rows_count: i64, + pub existing_rows_count: i64, + pub deleted_rows_count: i64, + pub partitions: Option>, + pub key_metadata: Option, + } + + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + pub(super) struct FieldSummary { + pub contains_null: bool, + pub contains_nan: Option, + pub lower_bound: Option, + pub upper_bound: Option, + } + + impl FieldSummary { + /// Converts the [FieldSummary] into a [super::FieldSummary]. + /// [lower_bound] and [upper_bound] are converted into [Literal]s need the type info so use + /// this function instead of [std::TryFrom] trait. + pub(crate) fn try_into(self, r#type: &Type) -> Result { + Ok(super::FieldSummary { + contains_null: self.contains_null, + contains_nan: self.contains_nan, + lower_bound: self + .lower_bound + .map(|v| Literal::try_from_bytes(&v, r#type)) + .transpose()?, + upper_bound: self + .upper_bound + .map(|v| Literal::try_from_bytes(&v, r#type)) + .transpose()?, + }) + } + } + + fn try_convert_to_field_summary( + partitions: Option>, + partition_type: &StructType, + ) -> Result, Error> { + Ok(partitions + .map(|partitions| { + let partition_types = partition_type.fields(); + if partitions.len() != partition_types.len() { + return Err(Error::new( + crate::ErrorKind::DataInvalid, + format!( + "Invalid partition spec. Expected {} fields, got {}", + partition_types.len(), + partitions.len() + ), + )); + } + partitions + .into_iter() + .zip(partition_types) + .map(|(v, field)| v.try_into(&field.field_type)) + .collect::, _>>() + }) + .transpose()? + .unwrap_or_default()) + } + + impl ManifestListEntryV2 { + /// Converts the [ManifestListEntryV2] into a [ManifestListEntry]. + /// The convert of [partitions] need the partition_type info so use this function instead of [std::TryFrom] trait. + pub fn try_into(self, partition_type: &StructType) -> Result { + let partitions = try_convert_to_field_summary(self.partitions, partition_type)?; + Ok(ManifestListEntry { + manifest_path: self.manifest_path, + manifest_length: self.manifest_length, + partition_spec_id: self.partition_spec_id, + content: self.content.try_into()?, + sequence_number: self.sequence_number, + min_sequence_number: self.min_sequence_number, + added_snapshot_id: self.added_snapshot_id, + added_data_files_count: Some(self.added_data_files_count), + existing_data_files_count: Some(self.existing_data_files_count), + deleted_data_files_count: Some(self.deleted_data_files_count), + added_rows_count: Some(self.added_rows_count), + existing_rows_count: Some(self.existing_rows_count), + deleted_rows_count: Some(self.deleted_rows_count), + partitions, + key_metadata: self.key_metadata.map(|b| b.into_vec()).unwrap_or_default(), + }) + } + } + + impl ManifestListEntryV1 { + /// Converts the [ManifestListEntryV1] into a [ManifestListEntry]. + /// The convert of [partitions] need the partition_type info so use this function instead of [std::TryFrom] trait. + pub fn try_into(self, partition_type: &StructType) -> Result { + let partitions = try_convert_to_field_summary(self.partitions, partition_type)?; + Ok(ManifestListEntry { + manifest_path: self.manifest_path, + manifest_length: self.manifest_length, + partition_spec_id: self.partition_spec_id, + added_snapshot_id: self.added_snapshot_id, + added_data_files_count: self.added_data_files_count, + existing_data_files_count: self.existing_data_files_count, + deleted_data_files_count: self.deleted_data_files_count, + added_rows_count: self.added_rows_count, + existing_rows_count: self.existing_rows_count, + deleted_rows_count: self.deleted_rows_count, + partitions, + key_metadata: self.key_metadata.map(|b| b.into_vec()).unwrap_or_default(), + // as ref: https://iceberg.apache.org/spec/#partitioning + // use 0 when reading v1 manifest lists + content: super::ManifestContentType::Data, + sequence_number: 0, + min_sequence_number: 0, + }) + } + } + + fn convert_to_serde_field_summary( + partitions: Vec, + ) -> Option> { + if partitions.is_empty() { + None + } else { + Some( + partitions + .into_iter() + .map(|v| FieldSummary { + contains_null: v.contains_null, + contains_nan: v.contains_nan, + lower_bound: v.lower_bound.map(|v| v.into()), + upper_bound: v.upper_bound.map(|v| v.into()), + }) + .collect(), + ) + } + } + + fn convert_to_serde_key_metadata(key_metadata: Vec) -> Option { + if key_metadata.is_empty() { + None + } else { + Some(ByteBuf::from(key_metadata)) + } + } + + impl TryFrom for ManifestListEntryV2 { + type Error = Error; + + fn try_from(value: ManifestListEntry) -> Result { + let partitions = convert_to_serde_field_summary(value.partitions); + let key_metadata = convert_to_serde_key_metadata(value.key_metadata); + Ok(Self { + manifest_path: value.manifest_path, + manifest_length: value.manifest_length, + partition_spec_id: value.partition_spec_id, + content: value.content as i32, + sequence_number: value.sequence_number, + min_sequence_number: value.min_sequence_number, + added_snapshot_id: value.added_snapshot_id, + added_data_files_count: value.added_data_files_count.ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "added_data_files_count in ManifestListEntryV2 should be require", + ) + })?, + existing_data_files_count: value.existing_data_files_count.ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "existing_data_files_count in ManifestListEntryV2 should be require", + ) + })?, + deleted_data_files_count: value.deleted_data_files_count.ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "deleted_data_files_count in ManifestListEntryV2 should be require", + ) + })?, + added_rows_count: value.added_rows_count.ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "added_rows_count in ManifestListEntryV2 should be require", + ) + })?, + existing_rows_count: value.existing_rows_count.ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "existing_rows_count in ManifestListEntryV2 should be require", + ) + })?, + deleted_rows_count: value.deleted_rows_count.ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "deleted_rows_count in ManifestListEntryV2 should be require", + ) + })?, + partitions, + key_metadata, + }) + } + } + + impl From for ManifestListEntryV1 { + fn from(value: ManifestListEntry) -> Self { + let partitions = convert_to_serde_field_summary(value.partitions); + let key_metadata = convert_to_serde_key_metadata(value.key_metadata); + Self { + manifest_path: value.manifest_path, + manifest_length: value.manifest_length, + partition_spec_id: value.partition_spec_id, + added_snapshot_id: value.added_snapshot_id, + added_data_files_count: value.added_data_files_count, + existing_data_files_count: value.existing_data_files_count, + deleted_data_files_count: value.deleted_data_files_count, + added_rows_count: value.added_rows_count, + existing_rows_count: value.existing_rows_count, + deleted_rows_count: value.deleted_rows_count, + partitions, + key_metadata, + } + } + } +} + +#[cfg(test)] +mod test { + use std::{fs, sync::Arc}; + + use crate::spec::{ + manifest_list::_serde::ManifestListV1, FieldSummary, Literal, ManifestContentType, + ManifestList, ManifestListEntry, NestedField, PrimitiveType, StructType, Type, + }; + + use super::_serde::ManifestListV2; + + #[test] + fn test_parse_manifest_list_v1() { + let path = format!( + "{}/testdata/simple_manifest_list_v1.avro", + env!("CARGO_MANIFEST_DIR") + ); + + let bs = fs::read(path).expect("read_file must succeed"); + + let manifest_list = ManifestList::parse_with_version( + &bs, + crate::spec::FormatVersion::V1, + &StructType::new(vec![]), + ) + .unwrap(); + + assert_eq!(1, manifest_list.entries.len()); + assert_eq!( + manifest_list.entries[0], + ManifestListEntry { + manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(), + manifest_length: 5806, + partition_spec_id: 0, + content: ManifestContentType::Data, + sequence_number: 0, + min_sequence_number: 0, + added_snapshot_id: 1646658105718557341, + added_data_files_count: Some(3), + existing_data_files_count: Some(0), + deleted_data_files_count: Some(0), + added_rows_count: Some(3), + existing_rows_count: Some(0), + deleted_rows_count: Some(0), + partitions: vec![], + key_metadata: vec![], + } + ); + } + + #[test] + fn test_parse_manifest_list_v2() { + let path = format!( + "{}/testdata/simple_manifest_list_v2.avro", + env!("CARGO_MANIFEST_DIR") + ); + + let bs = fs::read(path).expect("read_file must succeed"); + + let manifest_list = ManifestList::parse_with_version( + &bs, + crate::spec::FormatVersion::V2, + &StructType::new(vec![Arc::new(NestedField::required( + 1, + "test", + Type::Primitive(PrimitiveType::Long), + ))]), + ) + .unwrap(); + + assert_eq!(1, manifest_list.entries.len()); + assert_eq!( + manifest_list.entries[0], + ManifestListEntry { + manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(), + manifest_length: 6926, + partition_spec_id: 0, + content: ManifestContentType::Data, + sequence_number: 1, + min_sequence_number: 1, + added_snapshot_id: 377075049360453639, + added_data_files_count: Some(1), + existing_data_files_count: Some(0), + deleted_data_files_count: Some(0), + added_rows_count: Some(3), + existing_rows_count: Some(0), + deleted_rows_count: Some(0), + partitions: vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Literal::long(1)), upper_bound: Some(Literal::long(1))}], + key_metadata: vec![], + } + ); + } + + #[test] + fn test_serialize_manifest_list_v1() { + let manifest_list:ManifestListV1 = ManifestList { + entries: vec![ManifestListEntry { + manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(), + manifest_length: 5806, + partition_spec_id: 0, + content: ManifestContentType::Data, + sequence_number: 0, + min_sequence_number: 0, + added_snapshot_id: 1646658105718557341, + added_data_files_count: Some(3), + existing_data_files_count: Some(0), + deleted_data_files_count: Some(0), + added_rows_count: Some(3), + existing_rows_count: Some(0), + deleted_rows_count: Some(0), + partitions: vec![], + key_metadata: vec![], + }] + }.into(); + let result = serde_json::to_string(&manifest_list).unwrap(); + assert_eq!( + result, + r#"[{"manifest_path":"/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro","manifest_length":5806,"partition_spec_id":0,"added_snapshot_id":1646658105718557341,"added_data_files_count":3,"existing_data_files_count":0,"deleted_data_files_count":0,"added_rows_count":3,"existing_rows_count":0,"deleted_rows_count":0,"partitions":null,"key_metadata":null}]"# + ); + } + + #[test] + fn test_serialize_manifest_list_v2() { + let manifest_list:ManifestListV2 = ManifestList { + entries: vec![ManifestListEntry { + manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(), + manifest_length: 6926, + partition_spec_id: 0, + content: ManifestContentType::Data, + sequence_number: 1, + min_sequence_number: 1, + added_snapshot_id: 377075049360453639, + added_data_files_count: Some(1), + existing_data_files_count: Some(0), + deleted_data_files_count: Some(0), + added_rows_count: Some(3), + existing_rows_count: Some(0), + deleted_rows_count: Some(0), + partitions: vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Literal::long(1)), upper_bound: Some(Literal::long(1))}], + key_metadata: vec![], + }] + }.try_into().unwrap(); + let result = serde_json::to_string(&manifest_list).unwrap(); + assert_eq!( + result, + r#"[{"manifest_path":"s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro","manifest_length":6926,"partition_spec_id":0,"content":0,"sequence_number":1,"min_sequence_number":1,"added_snapshot_id":377075049360453639,"added_data_files_count":1,"existing_data_files_count":0,"deleted_data_files_count":0,"added_rows_count":3,"existing_rows_count":0,"deleted_rows_count":0,"partitions":[{"contains_null":false,"contains_nan":false,"lower_bound":[1,0,0,0,0,0,0,0],"upper_bound":[1,0,0,0,0,0,0,0]}],"key_metadata":null}]"# + ); + } +} diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs index dedde72e2..33d8bf958 100644 --- a/crates/iceberg/src/spec/mod.rs +++ b/crates/iceberg/src/spec/mod.rs @@ -18,6 +18,7 @@ //! Spec for Iceberg. mod datatypes; +mod manifest_list; mod partition; mod schema; mod snapshot; @@ -27,6 +28,7 @@ mod transform; mod values; pub use datatypes::*; +pub use manifest_list::*; pub use partition::*; pub use schema::*; pub use snapshot::*; diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index f38a605cf..d4e941b5e 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -73,7 +73,7 @@ pub struct Snapshot { timestamp_ms: i64, /// The location of a manifest list for this snapshot that /// tracks manifest files with additional metadata. - manifest_list: ManifestList, + manifest_list: ManifestListLocation, /// A string map that summarizes the snapshot changes, including operation. summary: Summary, /// ID of the table’s current schema when the snapshot was created. @@ -84,7 +84,7 @@ pub struct Snapshot { /// Type to distinguish between a path to a manifestlist file or a vector of manifestfile locations #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(untagged)] -pub enum ManifestList { +pub enum ManifestListLocation { /// Location of manifestlist file ManifestListFile(String), /// Manifestfile locations @@ -104,7 +104,7 @@ impl Snapshot { } /// Get location of manifest_list file #[inline] - pub fn manifest_list(&self) -> &ManifestList { + pub fn manifest_list(&self) -> &ManifestListLocation { &self.manifest_list } /// Get summary of the snapshot @@ -141,7 +141,7 @@ pub(super) mod _serde { use crate::{Error, ErrorKind}; - use super::{ManifestList, Operation, Snapshot, Summary}; + use super::{ManifestListLocation, Operation, Snapshot, Summary}; #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] @@ -183,7 +183,7 @@ pub(super) mod _serde { parent_snapshot_id: v2.parent_snapshot_id, sequence_number: v2.sequence_number, timestamp_ms: v2.timestamp_ms, - manifest_list: ManifestList::ManifestListFile(v2.manifest_list), + manifest_list: ManifestListLocation::ManifestListFile(v2.manifest_list), summary: v2.summary, schema_id: v2.schema_id, } @@ -198,8 +198,8 @@ pub(super) mod _serde { sequence_number: v2.sequence_number, timestamp_ms: v2.timestamp_ms, manifest_list: match v2.manifest_list { - ManifestList::ManifestListFile(file) => file, - ManifestList::ManifestFiles(_) => panic!("Wrong table format version. Can't convert a list of manifest files into a location of a manifest file.") + ManifestListLocation::ManifestListFile(file) => file, + ManifestListLocation::ManifestFiles(_) => panic!("Wrong table format version. Can't convert a list of manifest files into a location of a manifest file.") }, summary: v2.summary, schema_id: v2.schema_id, @@ -217,8 +217,8 @@ pub(super) mod _serde { sequence_number: 0, timestamp_ms: v1.timestamp_ms, manifest_list: match (v1.manifest_list, v1.manifests) { - (Some(file), _) => ManifestList::ManifestListFile(file), - (None, Some(files)) => ManifestList::ManifestFiles(files), + (Some(file), _) => ManifestListLocation::ManifestListFile(file), + (None, Some(files)) => ManifestListLocation::ManifestFiles(files), (None, None) => { return Err(Error::new( ErrorKind::DataInvalid, @@ -238,8 +238,8 @@ pub(super) mod _serde { impl From for SnapshotV1 { fn from(v2: Snapshot) -> Self { let (manifest_list, manifests) = match v2.manifest_list { - ManifestList::ManifestListFile(file) => (Some(file), None), - ManifestList::ManifestFiles(files) => (None, Some(files)), + ManifestListLocation::ManifestListFile(file) => (Some(file), None), + ManifestListLocation::ManifestFiles(files) => (None, Some(files)), }; SnapshotV1 { snapshot_id: v2.snapshot_id, @@ -309,7 +309,9 @@ pub enum SnapshotRetention { mod tests { use std::collections::HashMap; - use crate::spec::snapshot::{ManifestList, Operation, Snapshot, Summary, _serde::SnapshotV1}; + use crate::spec::snapshot::{ + ManifestListLocation, Operation, Snapshot, Summary, _serde::SnapshotV1, + }; #[test] fn schema() { @@ -339,7 +341,7 @@ mod tests { *result.summary() ); assert_eq!( - ManifestList::ManifestListFile("s3://b/wh/.../s1.avro".to_string()), + ManifestListLocation::ManifestListFile("s3://b/wh/.../s1.avro".to_string()), *result.manifest_list() ); } diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index f40b63e2e..ccd58f2ff 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -678,7 +678,7 @@ mod tests { use pretty_assertions::assert_eq; use crate::spec::{ - table_metadata::TableMetadata, ManifestList, NestedField, NullOrder, Operation, + table_metadata::TableMetadata, ManifestListLocation, NestedField, NullOrder, Operation, PartitionField, PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary, Transform, Type, }; @@ -936,7 +936,7 @@ mod tests { .with_timestamp_ms(1662532818843) .with_sequence_number(0) .with_schema_id(0) - .with_manifest_list(ManifestList::ManifestListFile("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string())) + .with_manifest_list(ManifestListLocation::ManifestListFile("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string())) .with_summary(Summary{operation: Operation::Append, other: HashMap::from_iter(vec![("spark.app.id".to_string(),"local-1662532784305".to_string()),("added-data-files".to_string(),"4".to_string()),("added-records".to_string(),"4".to_string()),("added-files-size".to_string(),"6001".to_string())])}) .build().unwrap(); @@ -1059,7 +1059,7 @@ mod tests { .with_snapshot_id(3051729675574597004) .with_timestamp_ms(1515100955770) .with_sequence_number(0) - .with_manifest_list(ManifestList::ManifestListFile( + .with_manifest_list(ManifestListLocation::ManifestListFile( "s3://a/b/1.avro".to_string(), )) .with_summary(Summary { @@ -1075,7 +1075,7 @@ mod tests { .with_timestamp_ms(1555100955770) .with_sequence_number(1) .with_schema_id(1) - .with_manifest_list(ManifestList::ManifestListFile( + .with_manifest_list(ManifestListLocation::ManifestListFile( "s3://a/b/2.avro".to_string(), )) .with_summary(Summary { diff --git a/crates/iceberg/testdata/simple_manifest_list_v1.avro b/crates/iceberg/testdata/simple_manifest_list_v1.avro new file mode 100644 index 000000000..bcdd126de Binary files /dev/null and b/crates/iceberg/testdata/simple_manifest_list_v1.avro differ diff --git a/crates/iceberg/testdata/simple_manifest_list_v2.avro b/crates/iceberg/testdata/simple_manifest_list_v2.avro new file mode 100644 index 000000000..001bb549f Binary files /dev/null and b/crates/iceberg/testdata/simple_manifest_list_v2.avro differ