From b8a206c492db8d68ceeab30ba2c8f7775360a216 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Sun, 3 Sep 2023 11:37:00 +0800 Subject: [PATCH 1/5] support read manifest-list --- crates/iceberg/src/spec/manifest_list.rs | 581 ++++++++++++++++++ crates/iceberg/src/spec/mod.rs | 2 + crates/iceberg/src/spec/snapshot.rs | 28 +- crates/iceberg/src/spec/table_metadata.rs | 8 +- .../testdata/simple_manifest_list_v1.avro | Bin 0 -> 3759 bytes .../testdata/simple_manifest_list_v2.avro | Bin 0 -> 4257 bytes 6 files changed, 602 insertions(+), 17 deletions(-) create mode 100644 crates/iceberg/src/spec/manifest_list.rs create mode 100644 crates/iceberg/testdata/simple_manifest_list_v1.avro create mode 100644 crates/iceberg/testdata/simple_manifest_list_v2.avro diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs new file mode 100644 index 000000000..2310b7bd2 --- /dev/null +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -0,0 +1,581 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! ManifestList for Iceberg. + +use apache_avro::{from_value, Reader}; + +use crate::{spec::Literal, Error}; + +use super::{FormatVersion, StructType}; + +/// Snapshots are embedded in table metadata, but the list of manifests for a +/// snapshot are stored in a separate manifest list file. +/// +/// A new manifest list is written for each attempt to commit a snapshot +/// because the list of manifests always changes to produce a new snapshot. +/// When a manifest list is written, the (optimistic) sequence number of the +/// snapshot is written for all new manifest files tracked by the list. +/// +/// A manifest list includes summary metadata that can be used to avoid +/// scanning all of the manifests in a snapshot when planning a table scan. +/// This includes the number of added, existing, and deleted files, and a +/// summary of values for each field of the partition spec used to write the +/// manifest. +#[derive(Debug, Clone)] +pub struct ManifestList { + version: FormatVersion, + /// Entries in a manifest list. + entries: Vec, +} + +impl ManifestList { + /// Parse manifest list from bytes. + /// + /// QUESTION: Will we have more than one manifest list in a single file? + pub fn parse( + bs: &[u8], + version: FormatVersion, + partition_type: &StructType, + ) -> Result { + let reader = Reader::new(bs)?; + let entries = match version { + FormatVersion::V2 => reader + .map(|v| { + v.and_then(|value| from_value::<_serde::ManifestListEntryV2>(&value)) + .map_err(|e| { + Error::new( + crate::ErrorKind::DataInvalid, + "Failed to parse manifest list entry", + ) + .with_source(e) + }) + }) + .map(|v| v.and_then(|v| v.try_into(partition_type))) + .collect::, Error>>()?, + FormatVersion::V1 => reader + .map(|v| { + v.and_then(|value| from_value::<_serde::ManifestListEntryV1>(&value)) + .map_err(|e| { + Error::new( + crate::ErrorKind::DataInvalid, + "Failed to parse manifest list entry", + ) + .with_source(e) + }) + }) + .map(|v| v.and_then(|v| v.try_into(partition_type))) + .collect::, Error>>()?, + }; + Ok(ManifestList { version, entries }) + } + + /// Get the entries in the manifest list. + pub fn entries(&self) -> &[ManifestListEntry] { + &self.entries + } + + /// Get the version of the manifest list. + pub fn version(&self) -> &FormatVersion { + &self.version + } +} + +/// Entry in a manifest list. +#[derive(Debug, PartialEq, Clone)] +pub struct ManifestListEntry { + /// field: 500 + /// + /// Location of the manifest file + manifest_path: String, + /// field: 501 + /// + /// Length of the manifest file in bytes + manifest_length: i64, + /// field: 502 + /// + /// ID of a partition spec used to write the manifest; must be listed + /// in table metadata partition-specs + partition_spec_id: i32, + /// field: 517 + /// + /// The type of files tracked by the manifest, either data or delete + /// files; 0 for all v1 manifests + content: ManifestContentType, + /// field: 515 + /// + /// The sequence number when the manifest was added to the table; use 0 + /// when reading v1 manifest lists + sequence_number: i64, + /// field: 516 + /// + /// The minimum data sequence number of all live data or delete files in + /// the manifest; use 0 when reading v1 manifest lists + min_sequence_number: i64, + /// field: 503 + /// + /// ID of the snapshot where the manifest file was added + added_snapshot_id: i64, + /// field: 504 + /// + /// Number of entries in the manifest that have status ADDED, when null + /// this is assumed to be non-zero + added_data_files_count: Option, + /// field: 505 + /// + /// Number of entries in the manifest that have status EXISTING (0), + /// when null this is assumed to be non-zero + existing_data_files_count: Option, + /// field: 506 + /// + /// Number of entries in the manifest that have status DELETED (2), + /// when null this is assumed to be non-zero + deleted_data_files_count: Option, + /// field: 512 + /// + /// Number of rows in all of files in the manifest that have status + /// ADDED, when null this is assumed to be non-zero + added_rows_count: Option, + /// field: 513 + /// + /// Number of rows in all of files in the manifest that have status + /// EXISTING, when null this is assumed to be non-zero + existing_rows_count: Option, + /// field: 514 + /// + /// Number of rows in all of files in the manifest that have status + /// DELETED, when null this is assumed to be non-zero + deleted_rows_count: Option, + /// field: 507 + /// element_field: 508 + /// + /// A list of field summaries for each partition field in the spec. Each + /// field in the list corresponds to a field in the manifest file’s + /// partition spec. + partitions: Option>, + /// field: 519 + /// + /// Implementation-specific key metadata for encryption + key_metadata: Option>, +} + +/// The type of files tracked by the manifest, either data or delete files; Data(0) for all v1 manifests +#[derive(Debug, PartialEq, Clone)] +pub enum ManifestContentType { + /// The manifest content is data. + Data = 0, + /// The manifest content is deletes. + Deletes = 1, +} + +impl TryFrom for ManifestContentType { + type Error = Error; + + fn try_from(value: i32) -> Result { + match value { + 0 => Ok(ManifestContentType::Data), + 1 => Ok(ManifestContentType::Deletes), + _ => Err(Error::new( + crate::ErrorKind::DataInvalid, + format!( + "Invalid manifest content type. Expected 0 or 1, got {}", + value + ), + )), + } + } +} + +/// Field summary for partition field in the spec. +/// +/// Each field in the list corresponds to a field in the manifest file’s partition spec. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct FieldSummary { + /// field: 509 + /// + /// Whether the manifest contains at least one partition with a null + /// value for the field + contains_null: bool, + /// field: 518 + /// Whether the manifest contains at least one partition with a NaN + /// value for the field + contains_nan: Option, + /// field: 510 + /// The minimum value for the field in the manifests + /// partitions. + lower_bound: Option, + /// field: 511 + /// The maximum value for the field in the manifests + /// partitions. + upper_bound: Option, +} + +pub(super) mod _serde { + /// This is a helper module that defines types to help with serialization/deserialization. + /// For deserialization the input first gets read into either the [ManifestListEntryV1] or [ManifestListEntryV2] struct + /// and then converted into the [ManifestListEntry] struct. Serialization works the other way around. + /// [ManifestListEntryV1] and [ManifestListEntryV2] are internal struct that are only used for serialization and deserialization. + pub use serde_bytes::ByteBuf; + use serde_derive::{Deserialize, Serialize}; + + use crate::{ + spec::{Literal, StructType, Type}, + Error, + }; + + use super::ManifestListEntry; + + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + pub(super) struct ManifestListEntryV1 { + pub manifest_path: String, + pub manifest_length: i64, + pub partition_spec_id: i32, + pub added_snapshot_id: i64, + pub added_data_files_count: Option, + pub existing_data_files_count: Option, + pub deleted_data_files_count: Option, + pub added_rows_count: Option, + pub existing_rows_count: Option, + pub deleted_rows_count: Option, + pub partitions: Option>, + pub key_metadata: Option, + } + + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + pub(super) struct ManifestListEntryV2 { + pub manifest_path: String, + pub manifest_length: i64, + pub partition_spec_id: i32, + pub content: i32, + pub sequence_number: i64, + pub min_sequence_number: i64, + pub added_snapshot_id: i64, + pub added_data_files_count: i32, + pub existing_data_files_count: i32, + pub deleted_data_files_count: i32, + pub added_rows_count: i64, + pub existing_rows_count: i64, + pub deleted_rows_count: i64, + pub partitions: Option>, + pub key_metadata: Option, + } + + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + pub(super) struct FieldSummary { + pub contains_null: bool, + pub contains_nan: Option, + pub lower_bound: Option, + pub upper_bound: Option, + } + + impl FieldSummary { + /// Converts the [FieldSummary] into a [super::FieldSummary]. + /// [lower_bound] and [upper_bound] are converted into [Literal]s need the type info so use + /// this function instead of [std::TryFrom] trait. + pub fn try_into(self, r#type: &Type) -> Result { + Ok(super::FieldSummary { + contains_null: self.contains_null, + contains_nan: self.contains_nan, + lower_bound: self + .lower_bound + .map(|v| Literal::try_from_bytes(&v, r#type)) + .transpose()?, + upper_bound: self + .upper_bound + .map(|v| Literal::try_from_bytes(&v, r#type)) + .transpose()?, + }) + } + } + + impl ManifestListEntryV2 { + /// Converts the [ManifestListEntryV2] into a [ManifestListEntry]. + /// The convert of [partitions] need the partition_type info so use this function instead of [std::TryFrom] trait. + pub fn try_into(self, partition_type: &StructType) -> Result { + let partitions = if let Some(partitions) = self.partitions { + let partition_types = partition_type.fields(); + if partitions.len() != partition_types.len() { + return Err(Error::new( + crate::ErrorKind::DataInvalid, + format!( + "Invalid partition spec. Expected {} fields, got {}", + partition_types.len(), + partitions.len() + ), + )); + } + Some( + partitions + .into_iter() + .zip(partition_types) + .map(|(v, field)| v.try_into(&field.field_type)) + .collect::, _>>()?, + ) + } else { + None + }; + Ok(ManifestListEntry { + manifest_path: self.manifest_path, + manifest_length: self.manifest_length, + partition_spec_id: self.partition_spec_id, + content: self.content.try_into()?, + sequence_number: self.sequence_number, + min_sequence_number: self.min_sequence_number, + added_snapshot_id: self.added_snapshot_id, + added_data_files_count: Some(self.added_data_files_count), + existing_data_files_count: Some(self.existing_data_files_count), + deleted_data_files_count: Some(self.deleted_data_files_count), + added_rows_count: Some(self.added_rows_count), + existing_rows_count: Some(self.existing_rows_count), + deleted_rows_count: Some(self.deleted_rows_count), + partitions, + key_metadata: self.key_metadata.map(|b| b.into_vec()), + }) + } + } + + impl ManifestListEntryV1 { + /// Converts the [ManifestListEntryV1] into a [ManifestListEntry]. + /// The convert of [partitions] need the partition_type info so use this function instead of [std::TryFrom] trait. + pub fn try_into(self, partition_type: &StructType) -> Result { + let partitions = if let Some(partitions) = self.partitions { + let partition_types = partition_type.fields(); + if partitions.len() != partition_types.len() { + return Err(Error::new( + crate::ErrorKind::DataInvalid, + format!( + "Invalid partition spec. Expected {} fields, got {}", + partition_types.len(), + partitions.len() + ), + )); + } + Some( + partitions + .into_iter() + .zip(partition_types) + .map(|(v, field)| v.try_into(&field.field_type)) + .collect::, _>>()?, + ) + } else { + None + }; + Ok(ManifestListEntry { + manifest_path: self.manifest_path, + manifest_length: self.manifest_length, + partition_spec_id: self.partition_spec_id, + added_snapshot_id: self.added_snapshot_id, + added_data_files_count: self.added_data_files_count, + existing_data_files_count: self.existing_data_files_count, + deleted_data_files_count: self.deleted_data_files_count, + added_rows_count: self.added_rows_count, + existing_rows_count: self.existing_rows_count, + deleted_rows_count: self.deleted_rows_count, + partitions, + key_metadata: self.key_metadata.map(|b| b.into_vec()), + // as ref: https://iceberg.apache.org/spec/#partitioning + // use 0 when reading v1 manifest lists + content: super::ManifestContentType::Data, + sequence_number: 0, + min_sequence_number: 0, + }) + } + } + + impl TryFrom for ManifestListEntryV2 { + type Error = Error; + + fn try_from(value: ManifestListEntry) -> Result { + Ok(Self { + manifest_path: value.manifest_path, + manifest_length: value.manifest_length, + partition_spec_id: value.partition_spec_id, + content: value.content as i32, + sequence_number: value.sequence_number, + min_sequence_number: value.min_sequence_number, + added_snapshot_id: value.added_snapshot_id, + added_data_files_count: value.added_data_files_count.ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "added_data_files_count in ManifestListEntryV2 shold be require", + ) + })?, + existing_data_files_count: value.existing_data_files_count.ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "existing_data_files_count in ManifestListEntryV2 shold be require", + ) + })?, + deleted_data_files_count: value.deleted_data_files_count.ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "deleted_data_files_count in ManifestListEntryV2 shold be require", + ) + })?, + added_rows_count: value.added_rows_count.ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "added_rows_count in ManifestListEntryV2 shold be require", + ) + })?, + existing_rows_count: value.existing_rows_count.ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "existing_rows_count in ManifestListEntryV2 shold be require", + ) + })?, + deleted_rows_count: value.deleted_rows_count.ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "deleted_rows_count in ManifestListEntryV2 shold be require", + ) + })?, + partitions: value.partitions.map(|v| { + v.into_iter() + .map(|v| FieldSummary { + contains_null: v.contains_null, + contains_nan: v.contains_nan, + lower_bound: v.lower_bound.map(|v| v.into()), + upper_bound: v.upper_bound.map(|v| v.into()), + }) + .collect() + }), + key_metadata: value.key_metadata.map(ByteBuf::from), + }) + } + } + + impl From for ManifestListEntryV1 { + fn from(value: ManifestListEntry) -> Self { + Self { + manifest_path: value.manifest_path, + manifest_length: value.manifest_length, + partition_spec_id: value.partition_spec_id, + added_snapshot_id: value.added_snapshot_id, + added_data_files_count: value.added_data_files_count, + existing_data_files_count: value.existing_data_files_count, + deleted_data_files_count: value.deleted_data_files_count, + added_rows_count: value.added_rows_count, + existing_rows_count: value.existing_rows_count, + deleted_rows_count: value.deleted_rows_count, + partitions: value.partitions.map(|v| { + v.into_iter() + .map(|v| FieldSummary { + contains_null: v.contains_null, + contains_nan: v.contains_nan, + lower_bound: v.lower_bound.map(|v| v.into()), + upper_bound: v.upper_bound.map(|v| v.into()), + }) + .collect() + }), + key_metadata: value.key_metadata.map(ByteBuf::from), + } + } + } +} + +#[cfg(test)] +mod test { + use std::{fs, sync::Arc}; + + use crate::spec::{ + FieldSummary, Literal, ManifestContentType, ManifestList, ManifestListEntry, NestedField, + PrimitiveType, StructType, Type, + }; + + #[test] + fn test_parse_manifest_list_v1() { + let path = format!( + "{}/testdata/simple_manifest_list_v1.avro", + env!("CARGO_MANIFEST_DIR") + ); + + let bs = fs::read(path).expect("read_file must succeed"); + + let manifest_list = ManifestList::parse( + &bs, + crate::spec::FormatVersion::V1, + &StructType::new(vec![]), + ) + .unwrap(); + + assert_eq!(1, manifest_list.entries.len()); + assert_eq!( + manifest_list.entries[0], + ManifestListEntry { + manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(), + manifest_length: 5806, + partition_spec_id: 0, + content: ManifestContentType::Data, + sequence_number: 0, + min_sequence_number: 0, + added_snapshot_id: 1646658105718557341, + added_data_files_count: Some(3), + existing_data_files_count: Some(0), + deleted_data_files_count: Some(0), + added_rows_count: Some(3), + existing_rows_count: Some(0), + deleted_rows_count: Some(0), + partitions: Some(vec![]), + key_metadata: None, + } + ); + } + + #[test] + fn test_parse_manifest_list_v2() { + let path = format!( + "{}/testdata/simple_manifest_list_v2.avro", + env!("CARGO_MANIFEST_DIR") + ); + + let bs = fs::read(path).expect("read_file must succeed"); + + let manifest_list = ManifestList::parse( + &bs, + crate::spec::FormatVersion::V2, + &StructType::new(vec![Arc::new(NestedField::required( + 1, + "test", + Type::Primitive(PrimitiveType::Long), + ))]), + ) + .unwrap(); + + assert_eq!(1, manifest_list.entries.len()); + assert_eq!( + manifest_list.entries[0], + ManifestListEntry { + manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(), + manifest_length: 6926, + partition_spec_id: 0, + content: ManifestContentType::Data, + sequence_number: 1, + min_sequence_number: 1, + added_snapshot_id: 377075049360453639, + added_data_files_count: Some(1), + existing_data_files_count: Some(0), + deleted_data_files_count: Some(0), + added_rows_count: Some(3), + existing_rows_count: Some(0), + deleted_rows_count: Some(0), + partitions: Some(vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Literal::long(1)), upper_bound: Some(Literal::long(1))}]), + key_metadata: None, + } + ); + } +} diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs index dedde72e2..33d8bf958 100644 --- a/crates/iceberg/src/spec/mod.rs +++ b/crates/iceberg/src/spec/mod.rs @@ -18,6 +18,7 @@ //! Spec for Iceberg. mod datatypes; +mod manifest_list; mod partition; mod schema; mod snapshot; @@ -27,6 +28,7 @@ mod transform; mod values; pub use datatypes::*; +pub use manifest_list::*; pub use partition::*; pub use schema::*; pub use snapshot::*; diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index f38a605cf..d4e941b5e 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -73,7 +73,7 @@ pub struct Snapshot { timestamp_ms: i64, /// The location of a manifest list for this snapshot that /// tracks manifest files with additional metadata. - manifest_list: ManifestList, + manifest_list: ManifestListLocation, /// A string map that summarizes the snapshot changes, including operation. summary: Summary, /// ID of the table’s current schema when the snapshot was created. @@ -84,7 +84,7 @@ pub struct Snapshot { /// Type to distinguish between a path to a manifestlist file or a vector of manifestfile locations #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(untagged)] -pub enum ManifestList { +pub enum ManifestListLocation { /// Location of manifestlist file ManifestListFile(String), /// Manifestfile locations @@ -104,7 +104,7 @@ impl Snapshot { } /// Get location of manifest_list file #[inline] - pub fn manifest_list(&self) -> &ManifestList { + pub fn manifest_list(&self) -> &ManifestListLocation { &self.manifest_list } /// Get summary of the snapshot @@ -141,7 +141,7 @@ pub(super) mod _serde { use crate::{Error, ErrorKind}; - use super::{ManifestList, Operation, Snapshot, Summary}; + use super::{ManifestListLocation, Operation, Snapshot, Summary}; #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] @@ -183,7 +183,7 @@ pub(super) mod _serde { parent_snapshot_id: v2.parent_snapshot_id, sequence_number: v2.sequence_number, timestamp_ms: v2.timestamp_ms, - manifest_list: ManifestList::ManifestListFile(v2.manifest_list), + manifest_list: ManifestListLocation::ManifestListFile(v2.manifest_list), summary: v2.summary, schema_id: v2.schema_id, } @@ -198,8 +198,8 @@ pub(super) mod _serde { sequence_number: v2.sequence_number, timestamp_ms: v2.timestamp_ms, manifest_list: match v2.manifest_list { - ManifestList::ManifestListFile(file) => file, - ManifestList::ManifestFiles(_) => panic!("Wrong table format version. Can't convert a list of manifest files into a location of a manifest file.") + ManifestListLocation::ManifestListFile(file) => file, + ManifestListLocation::ManifestFiles(_) => panic!("Wrong table format version. Can't convert a list of manifest files into a location of a manifest file.") }, summary: v2.summary, schema_id: v2.schema_id, @@ -217,8 +217,8 @@ pub(super) mod _serde { sequence_number: 0, timestamp_ms: v1.timestamp_ms, manifest_list: match (v1.manifest_list, v1.manifests) { - (Some(file), _) => ManifestList::ManifestListFile(file), - (None, Some(files)) => ManifestList::ManifestFiles(files), + (Some(file), _) => ManifestListLocation::ManifestListFile(file), + (None, Some(files)) => ManifestListLocation::ManifestFiles(files), (None, None) => { return Err(Error::new( ErrorKind::DataInvalid, @@ -238,8 +238,8 @@ pub(super) mod _serde { impl From for SnapshotV1 { fn from(v2: Snapshot) -> Self { let (manifest_list, manifests) = match v2.manifest_list { - ManifestList::ManifestListFile(file) => (Some(file), None), - ManifestList::ManifestFiles(files) => (None, Some(files)), + ManifestListLocation::ManifestListFile(file) => (Some(file), None), + ManifestListLocation::ManifestFiles(files) => (None, Some(files)), }; SnapshotV1 { snapshot_id: v2.snapshot_id, @@ -309,7 +309,9 @@ pub enum SnapshotRetention { mod tests { use std::collections::HashMap; - use crate::spec::snapshot::{ManifestList, Operation, Snapshot, Summary, _serde::SnapshotV1}; + use crate::spec::snapshot::{ + ManifestListLocation, Operation, Snapshot, Summary, _serde::SnapshotV1, + }; #[test] fn schema() { @@ -339,7 +341,7 @@ mod tests { *result.summary() ); assert_eq!( - ManifestList::ManifestListFile("s3://b/wh/.../s1.avro".to_string()), + ManifestListLocation::ManifestListFile("s3://b/wh/.../s1.avro".to_string()), *result.manifest_list() ); } diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index f40b63e2e..ccd58f2ff 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -678,7 +678,7 @@ mod tests { use pretty_assertions::assert_eq; use crate::spec::{ - table_metadata::TableMetadata, ManifestList, NestedField, NullOrder, Operation, + table_metadata::TableMetadata, ManifestListLocation, NestedField, NullOrder, Operation, PartitionField, PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary, Transform, Type, }; @@ -936,7 +936,7 @@ mod tests { .with_timestamp_ms(1662532818843) .with_sequence_number(0) .with_schema_id(0) - .with_manifest_list(ManifestList::ManifestListFile("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string())) + .with_manifest_list(ManifestListLocation::ManifestListFile("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string())) .with_summary(Summary{operation: Operation::Append, other: HashMap::from_iter(vec![("spark.app.id".to_string(),"local-1662532784305".to_string()),("added-data-files".to_string(),"4".to_string()),("added-records".to_string(),"4".to_string()),("added-files-size".to_string(),"6001".to_string())])}) .build().unwrap(); @@ -1059,7 +1059,7 @@ mod tests { .with_snapshot_id(3051729675574597004) .with_timestamp_ms(1515100955770) .with_sequence_number(0) - .with_manifest_list(ManifestList::ManifestListFile( + .with_manifest_list(ManifestListLocation::ManifestListFile( "s3://a/b/1.avro".to_string(), )) .with_summary(Summary { @@ -1075,7 +1075,7 @@ mod tests { .with_timestamp_ms(1555100955770) .with_sequence_number(1) .with_schema_id(1) - .with_manifest_list(ManifestList::ManifestListFile( + .with_manifest_list(ManifestListLocation::ManifestListFile( "s3://a/b/2.avro".to_string(), )) .with_summary(Summary { diff --git a/crates/iceberg/testdata/simple_manifest_list_v1.avro b/crates/iceberg/testdata/simple_manifest_list_v1.avro new file mode 100644 index 0000000000000000000000000000000000000000..bcdd126de418800e16f19bf36146462aaf68079d GIT binary patch literal 3759 zcmbW3J!~9B6vvZP2t`yVMMyDPAyVL=+xYH$c88Q8MM5IN2AKqFGq*eE8$362%+A^z z(FxH|fP}(wQjBfIC<>Befzr^@MQJDzqM)OK7CI_s_G4!D_SWpVyN{XodvE6b-+$if zesK2u1-Q-GYoXsq1NiStcO5YrB4@?n$Y(ro790u(>iYmv+(V)8dYB;ja}OgDgwD$C zyQLLFDEe|m7AO=P(@pt3V7`p=Ci9`djDmOGz5#Zy=z}*_K|r0y z=%yW=Fe~;N6OaIvAPDit2w)1jBZ0y^-7K(c1s*~!u*$~^hsgINStpLE&{?m_Kf#S_ zAPP|c1Oq!9i|Kr?T47-jpuh_$9EN=+?69{T8WAD`Cz)^L$FiC&RS?mK0wfXSM+EYd zSIM9rjEHbnRE+EnvZ5AC^Z*K&+*auMEJ}(=uSBPyD%2Ugu2Kew3O)k)`V)9fi-$hK zp^%sC&5698?W<31S=0a#B;+a23B8tXtIup(Ox-T3r;4Ykl><44Bl)-NkbxA4{(V|e zyw&K)Wo04xj0%Wp=w+Rf38~8%L6E98NR+h3BiZ6T0O=_0`92H*1Zoq|d4g?7qUlau zwkk5PBy_PRCuiy|se(c}gIV1^ZpTW2upPv`uIy2xKHa0KfHnVSu2+!k0?97WV;n$P zwvxWOnG?YFREdVeQ-G6Q^Ml>-#(2Sz%{P!5%S785>%vx}!2n9>sZtR1`{p=Ltf#SL z9b`InINPaId~xWcl2_|%DBiW_n^t^%m==3oo;}fWt=GrpGOtT(%IkSCu|l5($bUh$ z5Py(^aX|pj)8l#&)P3Vea(qbXjZ6K5Wvg0NmXW3{zO2#4 z!?z80$G zRR&>ZPOj7W3AsdVz{dDxsYvw}$;YO2r03M@@ytJ8{`%mLFV3(1;x9k{{q)_> PPoF(K^Vyw?r8NEnIL8Rv literal 0 HcmV?d00001 diff --git a/crates/iceberg/testdata/simple_manifest_list_v2.avro b/crates/iceberg/testdata/simple_manifest_list_v2.avro new file mode 100644 index 0000000000000000000000000000000000000000..001bb549f7c4a3ea7bfb4d513d080c10fc6f70e1 GIT binary patch literal 4257 zcmbW4&uiR96vrK>hZIUq#ZU;zpybwdkajJvy^upmT5N37(oMnGgw3op>s8T@tu*p> z8~g|K>IO=kkfqRqZ%HBUv6!}}LZLs>oJ%P^hMs#Ud7~dQqm{Hjd|d75ecrq`^L<~v z+y3y}#Ra_Yd)EV_OC0>@@;!wPdPHd`J~2GsR4R roM7uGJv{ZFMZ0z~hcZY%@?A zyZ3S{dYE=$MG`2WzUA)0Z__g%&ZcKzYI!dD;Qe*fw`do=(?k)SM_sf`rJ<|pK_wsC zChndb-S(u|w>^qce-^$9_ii`oN9f;8PEuqFlWVV_zopH|Q#MhnP1am%GV>n8k( z)^DLOASR+7>iZTQ&9^FrH9VJ+=>4P$aPRKhQK`l$DD3w;=;$G18Xr>|AjU_8sS7;a0cn@PJYAr`$#g5K%&@#NR ztX*%h;vndzKS1d+sZi>Y6qkH#1r&~ZYr4>HC(Fk~m)j5@3UkWfx1u97F7tfdeXpOZ zMW*a=G1#)AT4?ee;I)_-l$W72uH3t{3KPSo1t^Uwca94IQ;Yc0?ka9*+X|>qa*46; z;{p5)ci#c&KQ&u-tq!esW1HaXV zyCrlrt*R1BGv~jTDlv-QLeX2Og8067vm8yuU)bX0uy5|8T!w!@PiDqIyBhSi|s(P z7d12MlhK^F1bl1{|Q(IY3x~DZNJ)_z{I%9x{sM*iMYT#+}x+%{WUy3`S*-qU*R7K$Ya8D_-K literal 0 HcmV?d00001 From 5beda7dd2cc57d289bec7b1df28b487e9dafe308 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 7 Sep 2023 14:23:04 +0800 Subject: [PATCH 2/5] support to read with schema --- crates/iceberg/src/avro/mod.rs | 1 + crates/iceberg/src/avro/schema.rs | 4 +- crates/iceberg/src/spec/manifest_list.rs | 510 ++++++++++++++++++----- 3 files changed, 408 insertions(+), 107 deletions(-) diff --git a/crates/iceberg/src/avro/mod.rs b/crates/iceberg/src/avro/mod.rs index a6bb0735f..59dfe97c6 100644 --- a/crates/iceberg/src/avro/mod.rs +++ b/crates/iceberg/src/avro/mod.rs @@ -18,3 +18,4 @@ //! Avro related codes. #[allow(dead_code)] mod schema; +pub use schema::*; diff --git a/crates/iceberg/src/avro/schema.rs b/crates/iceberg/src/avro/schema.rs index 244d2082e..9f0d75136 100644 --- a/crates/iceberg/src/avro/schema.rs +++ b/crates/iceberg/src/avro/schema.rs @@ -215,7 +215,7 @@ impl SchemaVisitor for SchemaToAvroSchema { } /// Converting iceberg schema to avro schema. -pub(crate) fn schema_to_avro_schema(name: impl ToString, schema: &Schema) -> Result { +pub fn schema_to_avro_schema(name: impl ToString, schema: &Schema) -> Result { let mut converter = SchemaToAvroSchema { schema: name.to_string(), }; @@ -454,7 +454,7 @@ impl AvroSchemaVisitor for AvroSchemaToSchema { } /// Converts avro schema to iceberg schema. -pub(crate) fn avro_schema_to_schema(avro_schema: &AvroSchema) -> Result { +pub fn avro_schema_to_schema(avro_schema: &AvroSchema) -> Result { if let AvroSchema::Record(_) = avro_schema { let mut converter = AvroSchemaToSchema { next_id: 0 }; let typ = visit(avro_schema, &mut converter)?.expect("Iceberg schema should not be none."); diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 2310b7bd2..0498631b0 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -17,11 +17,12 @@ //! ManifestList for Iceberg. -use apache_avro::{from_value, Reader}; +use crate::{avro::schema_to_avro_schema, spec::Literal, Error}; +use apache_avro::{from_value, types::Value, Reader}; +use once_cell::sync::Lazy; +use std::sync::Arc; -use crate::{spec::Literal, Error}; - -use super::{FormatVersion, StructType}; +use super::{FormatVersion, ListType, NestedField, NestedFieldRef, Schema, StructType}; /// Snapshots are embedded in table metadata, but the list of manifests for a /// snapshot are stored in a separate manifest list file. @@ -38,7 +39,6 @@ use super::{FormatVersion, StructType}; /// manifest. #[derive(Debug, Clone)] pub struct ManifestList { - version: FormatVersion, /// Entries in a manifest list. entries: Vec, } @@ -47,41 +47,25 @@ impl ManifestList { /// Parse manifest list from bytes. /// /// QUESTION: Will we have more than one manifest list in a single file? - pub fn parse( + pub fn parse_with_version( bs: &[u8], version: FormatVersion, partition_type: &StructType, ) -> Result { - let reader = Reader::new(bs)?; - let entries = match version { - FormatVersion::V2 => reader - .map(|v| { - v.and_then(|value| from_value::<_serde::ManifestListEntryV2>(&value)) - .map_err(|e| { - Error::new( - crate::ErrorKind::DataInvalid, - "Failed to parse manifest list entry", - ) - .with_source(e) - }) - }) - .map(|v| v.and_then(|v| v.try_into(partition_type))) - .collect::, Error>>()?, - FormatVersion::V1 => reader - .map(|v| { - v.and_then(|value| from_value::<_serde::ManifestListEntryV1>(&value)) - .map_err(|e| { - Error::new( - crate::ErrorKind::DataInvalid, - "Failed to parse manifest list entry", - ) - .with_source(e) - }) - }) - .map(|v| v.and_then(|v| v.try_into(partition_type))) - .collect::, Error>>()?, - }; - Ok(ManifestList { version, entries }) + match version { + FormatVersion::V2 => { + let schema = schema_to_avro_schema("manifest_list", &Self::v2_schema()).unwrap(); + let reader = Reader::with_schema(&schema, bs)?; + let values = Value::Array(reader.collect::, _>>()?); + from_value::<_serde::ManifestListV2>(&values)?.try_into(partition_type) + } + FormatVersion::V1 => { + let schema = schema_to_avro_schema("manifest_list", &Self::v1_schema()).unwrap(); + let reader = Reader::with_schema(&schema, bs)?; + let values = Value::Array(reader.collect::, _>>()?); + from_value::<_serde::ManifestListV1>(&values)?.try_into(partition_type) + } + } } /// Get the entries in the manifest list. @@ -89,9 +73,262 @@ impl ManifestList { &self.entries } - /// Get the version of the manifest list. - pub fn version(&self) -> &FormatVersion { - &self.version + const MANIFEST_PATH: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 500, + "manifest_path", + super::Type::Primitive(super::PrimitiveType::String), + )) + }) + }; + const MANIFEST_LENGTH: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 501, + "manifest_length", + super::Type::Primitive(super::PrimitiveType::Long), + )) + }) + }; + const PARTITION_SPEC_ID: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 502, + "partition_spec_id", + super::Type::Primitive(super::PrimitiveType::Int), + )) + }) + }; + const CONTENT: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 517, + "content", + super::Type::Primitive(super::PrimitiveType::Int), + )) + }) + }; + const SEQUENCE_NUMBER: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 515, + "sequence_number", + super::Type::Primitive(super::PrimitiveType::Long), + )) + }) + }; + const MIN_SEQUENCE_NUMBER: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 516, + "min_sequence_number", + super::Type::Primitive(super::PrimitiveType::Long), + )) + }) + }; + const ADDED_SNAPSHOT_ID: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 503, + "added_snapshot_id", + super::Type::Primitive(super::PrimitiveType::Long), + )) + }) + }; + const ADDED_FILES_COUNT_V2: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 504, + "added_data_files_count", + super::Type::Primitive(super::PrimitiveType::Int), + )) + }) + }; + const ADDED_FILES_COUNT_V1: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 504, + "added_data_files_count", + super::Type::Primitive(super::PrimitiveType::Int), + )) + }) + }; + const EXISTING_FILES_COUNT_V2: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 505, + "existing_data_files_count", + super::Type::Primitive(super::PrimitiveType::Int), + )) + }) + }; + const EXISTING_FILES_COUNT_V1: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 505, + "existing_data_files_count", + super::Type::Primitive(super::PrimitiveType::Int), + )) + }) + }; + const DELETED_FILES_COUNT_V2: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 506, + "deleted_data_files_count", + super::Type::Primitive(super::PrimitiveType::Int), + )) + }) + }; + const DELETED_FILES_COUNT_V1: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 506, + "deleted_data_files_count", + super::Type::Primitive(super::PrimitiveType::Int), + )) + }) + }; + const ADDED_ROWS_COUNT_V2: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 512, + "added_rows_count", + super::Type::Primitive(super::PrimitiveType::Long), + )) + }) + }; + const ADDED_ROWS_COUNT_V1: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 512, + "added_rows_count", + super::Type::Primitive(super::PrimitiveType::Long), + )) + }) + }; + const EXISTING_ROWS_COUNT_V2: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 513, + "existing_rows_count", + super::Type::Primitive(super::PrimitiveType::Long), + )) + }) + }; + const EXISTING_ROWS_COUNT_V1: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 513, + "existing_rows_count", + super::Type::Primitive(super::PrimitiveType::Long), + )) + }) + }; + const DELETED_ROWS_COUNT_V2: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 514, + "deleted_rows_count", + super::Type::Primitive(super::PrimitiveType::Long), + )) + }) + }; + const DELETED_ROWS_COUNT_V1: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 514, + "deleted_rows_count", + super::Type::Primitive(super::PrimitiveType::Long), + )) + }) + }; + const PARTITIONS: Lazy = { + Lazy::new(|| { + // element type + let fields = vec![ + Arc::new(NestedField::required( + 509, + "contains_null", + super::Type::Primitive(super::PrimitiveType::Boolean), + )), + Arc::new(NestedField::optional( + 518, + "contains_nan", + super::Type::Primitive(super::PrimitiveType::Boolean), + )), + Arc::new(NestedField::optional( + 510, + "lower_bound", + super::Type::Primitive(super::PrimitiveType::Binary), + )), + Arc::new(NestedField::optional( + 511, + "upper_bound", + super::Type::Primitive(super::PrimitiveType::Binary), + )), + ]; + let element_field = Arc::new(NestedField::required( + 508, + "r_508", + super::Type::Struct(StructType::new(fields)), + )); + Arc::new(NestedField::optional( + 507, + "partitions", + super::Type::List(ListType { element_field }), + )) + }) + }; + const KEY_METADATA: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 519, + "key_metadata", + super::Type::Primitive(super::PrimitiveType::Binary), + )) + }) + }; + + /// Get the v2 schema of the manifest list entry. + pub(crate) fn v2_schema() -> Schema { + let fields = vec![ + Self::MANIFEST_PATH.clone(), + Self::MANIFEST_LENGTH.clone(), + Self::PARTITION_SPEC_ID.clone(), + Self::CONTENT.clone(), + Self::SEQUENCE_NUMBER.clone(), + Self::MIN_SEQUENCE_NUMBER.clone(), + Self::ADDED_SNAPSHOT_ID.clone(), + Self::ADDED_FILES_COUNT_V2.clone(), + Self::EXISTING_FILES_COUNT_V2.clone(), + Self::DELETED_FILES_COUNT_V2.clone(), + Self::ADDED_ROWS_COUNT_V2.clone(), + Self::EXISTING_ROWS_COUNT_V2.clone(), + Self::DELETED_ROWS_COUNT_V2.clone(), + Self::PARTITIONS.clone(), + Self::KEY_METADATA.clone(), + ]; + Schema::builder().with_fields(fields).build().unwrap() + } + /// Get the v1 schema of the manifest list entry. + pub(crate) fn v1_schema() -> Schema { + let fields = vec![ + Self::MANIFEST_PATH.clone(), + Self::MANIFEST_LENGTH.clone(), + Self::PARTITION_SPEC_ID.clone(), + Self::ADDED_SNAPSHOT_ID.clone(), + Self::ADDED_FILES_COUNT_V1.clone().to_owned(), + Self::EXISTING_FILES_COUNT_V1.clone(), + Self::DELETED_FILES_COUNT_V1.clone(), + Self::ADDED_ROWS_COUNT_V1.clone(), + Self::EXISTING_ROWS_COUNT_V1.clone(), + Self::DELETED_ROWS_COUNT_V1.clone(), + Self::PARTITIONS.clone(), + Self::KEY_METADATA.clone(), + ]; + Schema::builder().with_fields(fields).build().unwrap() } } @@ -166,11 +403,11 @@ pub struct ManifestListEntry { /// A list of field summaries for each partition field in the spec. Each /// field in the list corresponds to a field in the manifest file’s /// partition spec. - partitions: Option>, + partitions: Vec, /// field: 519 /// /// Implementation-specific key metadata for encryption - key_metadata: Option>, + key_metadata: Vec, } /// The type of files tracked by the manifest, either data or delete files; Data(0) for all v1 manifests @@ -239,6 +476,68 @@ pub(super) mod _serde { use super::ManifestListEntry; + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + #[serde(transparent)] + pub(crate) struct ManifestListV2 { + entries: Vec, + } + + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + #[serde(transparent)] + pub(crate) struct ManifestListV1 { + entries: Vec, + } + + impl ManifestListV2 { + /// Converts the [ManifestListV2] into a [ManifestList]. + /// The convert of [entries] need the partition_type info so use this function instead of [std::TryFrom] trait. + pub fn try_into(self, partition_type: &StructType) -> Result { + Ok(super::ManifestList { + entries: self + .entries + .into_iter() + .map(|v| v.try_into(partition_type)) + .collect::, _>>()?, + }) + } + } + + impl TryFrom for ManifestListV2 { + type Error = Error; + + fn try_from(value: super::ManifestList) -> Result { + Ok(Self { + entries: value + .entries + .into_iter() + .map(TryInto::try_into) + .collect::, _>>()?, + }) + } + } + + impl ManifestListV1 { + /// Converts the [ManifestListV1] into a [ManifestList]. + /// The convert of [entries] need the partition_type info so use this function instead of [std::TryFrom] trait. + pub fn try_into(self, partition_type: &StructType) -> Result { + Ok(super::ManifestList { + entries: self + .entries + .into_iter() + .map(|v| v.try_into(partition_type)) + .collect::, _>>()?, + }) + } + } + + impl From for ManifestListV1 { + fn from(value: super::ManifestList) -> Self { + Self { + entries: value.entries.into_iter().map(Into::into).collect(), + } + } + } + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] pub(super) struct ManifestListEntryV1 { pub manifest_path: String, @@ -286,7 +585,7 @@ pub(super) mod _serde { /// Converts the [FieldSummary] into a [super::FieldSummary]. /// [lower_bound] and [upper_bound] are converted into [Literal]s need the type info so use /// this function instead of [std::TryFrom] trait. - pub fn try_into(self, r#type: &Type) -> Result { + pub(crate) fn try_into(self, r#type: &Type) -> Result { Ok(super::FieldSummary { contains_null: self.contains_null, contains_nan: self.contains_nan, @@ -302,32 +601,40 @@ pub(super) mod _serde { } } - impl ManifestListEntryV2 { - /// Converts the [ManifestListEntryV2] into a [ManifestListEntry]. - /// The convert of [partitions] need the partition_type info so use this function instead of [std::TryFrom] trait. - pub fn try_into(self, partition_type: &StructType) -> Result { - let partitions = if let Some(partitions) = self.partitions { + fn try_convert_to_field_sumary( + partitions: Option>, + partition_type: &StructType, + ) -> Result, Error> { + Ok(partitions + .and_then(|partitions| { let partition_types = partition_type.fields(); if partitions.len() != partition_types.len() { - return Err(Error::new( + return Some(Err(Error::new( crate::ErrorKind::DataInvalid, format!( "Invalid partition spec. Expected {} fields, got {}", partition_types.len(), partitions.len() ), - )); + ))); } Some( partitions .into_iter() .zip(partition_types) .map(|(v, field)| v.try_into(&field.field_type)) - .collect::, _>>()?, + .collect::, _>>(), ) - } else { - None - }; + }) + .transpose()? + .unwrap_or_default()) + } + + impl ManifestListEntryV2 { + /// Converts the [ManifestListEntryV2] into a [ManifestListEntry]. + /// The convert of [partitions] need the partition_type info so use this function instead of [std::TryFrom] trait. + pub fn try_into(self, partition_type: &StructType) -> Result { + let partitions = try_convert_to_field_sumary(self.partitions, partition_type)?; Ok(ManifestListEntry { manifest_path: self.manifest_path, manifest_length: self.manifest_length, @@ -343,7 +650,7 @@ pub(super) mod _serde { existing_rows_count: Some(self.existing_rows_count), deleted_rows_count: Some(self.deleted_rows_count), partitions, - key_metadata: self.key_metadata.map(|b| b.into_vec()), + key_metadata: self.key_metadata.map(|b| b.into_vec()).unwrap_or_default(), }) } } @@ -352,28 +659,7 @@ pub(super) mod _serde { /// Converts the [ManifestListEntryV1] into a [ManifestListEntry]. /// The convert of [partitions] need the partition_type info so use this function instead of [std::TryFrom] trait. pub fn try_into(self, partition_type: &StructType) -> Result { - let partitions = if let Some(partitions) = self.partitions { - let partition_types = partition_type.fields(); - if partitions.len() != partition_types.len() { - return Err(Error::new( - crate::ErrorKind::DataInvalid, - format!( - "Invalid partition spec. Expected {} fields, got {}", - partition_types.len(), - partitions.len() - ), - )); - } - Some( - partitions - .into_iter() - .zip(partition_types) - .map(|(v, field)| v.try_into(&field.field_type)) - .collect::, _>>()?, - ) - } else { - None - }; + let partitions = try_convert_to_field_sumary(self.partitions, partition_type)?; Ok(ManifestListEntry { manifest_path: self.manifest_path, manifest_length: self.manifest_length, @@ -386,7 +672,7 @@ pub(super) mod _serde { existing_rows_count: self.existing_rows_count, deleted_rows_count: self.deleted_rows_count, partitions, - key_metadata: self.key_metadata.map(|b| b.into_vec()), + key_metadata: self.key_metadata.map(|b| b.into_vec()).unwrap_or_default(), // as ref: https://iceberg.apache.org/spec/#partitioning // use 0 when reading v1 manifest lists content: super::ManifestContentType::Data, @@ -396,10 +682,40 @@ pub(super) mod _serde { } } + fn convert_to_serde_field_summary( + partitions: Vec, + ) -> Option> { + if partitions.is_empty() { + None + } else { + Some( + partitions + .into_iter() + .map(|v| FieldSummary { + contains_null: v.contains_null, + contains_nan: v.contains_nan, + lower_bound: v.lower_bound.map(|v| v.into()), + upper_bound: v.upper_bound.map(|v| v.into()), + }) + .collect(), + ) + } + } + + fn convert_to_serde_key_metadata(key_metadata: Vec) -> Option { + if key_metadata.is_empty() { + None + } else { + Some(ByteBuf::from(key_metadata)) + } + } + impl TryFrom for ManifestListEntryV2 { type Error = Error; fn try_from(value: ManifestListEntry) -> Result { + let partitions = convert_to_serde_field_summary(value.partitions); + let key_metadata = convert_to_serde_key_metadata(value.key_metadata); Ok(Self { manifest_path: value.manifest_path, manifest_length: value.manifest_length, @@ -444,23 +760,16 @@ pub(super) mod _serde { "deleted_rows_count in ManifestListEntryV2 shold be require", ) })?, - partitions: value.partitions.map(|v| { - v.into_iter() - .map(|v| FieldSummary { - contains_null: v.contains_null, - contains_nan: v.contains_nan, - lower_bound: v.lower_bound.map(|v| v.into()), - upper_bound: v.upper_bound.map(|v| v.into()), - }) - .collect() - }), - key_metadata: value.key_metadata.map(ByteBuf::from), + partitions, + key_metadata, }) } } impl From for ManifestListEntryV1 { fn from(value: ManifestListEntry) -> Self { + let partitions = convert_to_serde_field_summary(value.partitions); + let key_metadata = convert_to_serde_key_metadata(value.key_metadata); Self { manifest_path: value.manifest_path, manifest_length: value.manifest_length, @@ -472,17 +781,8 @@ pub(super) mod _serde { added_rows_count: value.added_rows_count, existing_rows_count: value.existing_rows_count, deleted_rows_count: value.deleted_rows_count, - partitions: value.partitions.map(|v| { - v.into_iter() - .map(|v| FieldSummary { - contains_null: v.contains_null, - contains_nan: v.contains_nan, - lower_bound: v.lower_bound.map(|v| v.into()), - upper_bound: v.upper_bound.map(|v| v.into()), - }) - .collect() - }), - key_metadata: value.key_metadata.map(ByteBuf::from), + partitions, + key_metadata, } } } @@ -506,7 +806,7 @@ mod test { let bs = fs::read(path).expect("read_file must succeed"); - let manifest_list = ManifestList::parse( + let manifest_list = ManifestList::parse_with_version( &bs, crate::spec::FormatVersion::V1, &StructType::new(vec![]), @@ -530,8 +830,8 @@ mod test { added_rows_count: Some(3), existing_rows_count: Some(0), deleted_rows_count: Some(0), - partitions: Some(vec![]), - key_metadata: None, + partitions: vec![], + key_metadata: vec![], } ); } @@ -545,7 +845,7 @@ mod test { let bs = fs::read(path).expect("read_file must succeed"); - let manifest_list = ManifestList::parse( + let manifest_list = ManifestList::parse_with_version( &bs, crate::spec::FormatVersion::V2, &StructType::new(vec![Arc::new(NestedField::required( @@ -573,8 +873,8 @@ mod test { added_rows_count: Some(3), existing_rows_count: Some(0), deleted_rows_count: Some(0), - partitions: Some(vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Literal::long(1)), upper_bound: Some(Literal::long(1))}]), - key_metadata: None, + partitions: vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Literal::long(1)), upper_bound: Some(Literal::long(1))}], + key_metadata: vec![], } ); } From 8b217c555a4d6109fd67ccafdf31c7b0576f00ba Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Mon, 11 Sep 2023 09:46:49 +0800 Subject: [PATCH 3/5] * add test for serialize * fix clippy --- crates/iceberg/src/avro/mod.rs | 2 +- crates/iceberg/src/avro/schema.rs | 4 +- crates/iceberg/src/spec/manifest_list.rs | 276 ++++++++++++++--------- 3 files changed, 173 insertions(+), 109 deletions(-) diff --git a/crates/iceberg/src/avro/mod.rs b/crates/iceberg/src/avro/mod.rs index 59dfe97c6..bdccb2ff4 100644 --- a/crates/iceberg/src/avro/mod.rs +++ b/crates/iceberg/src/avro/mod.rs @@ -18,4 +18,4 @@ //! Avro related codes. #[allow(dead_code)] mod schema; -pub use schema::*; +pub(crate) use schema::*; diff --git a/crates/iceberg/src/avro/schema.rs b/crates/iceberg/src/avro/schema.rs index 9f0d75136..244d2082e 100644 --- a/crates/iceberg/src/avro/schema.rs +++ b/crates/iceberg/src/avro/schema.rs @@ -215,7 +215,7 @@ impl SchemaVisitor for SchemaToAvroSchema { } /// Converting iceberg schema to avro schema. -pub fn schema_to_avro_schema(name: impl ToString, schema: &Schema) -> Result { +pub(crate) fn schema_to_avro_schema(name: impl ToString, schema: &Schema) -> Result { let mut converter = SchemaToAvroSchema { schema: name.to_string(), }; @@ -454,7 +454,7 @@ impl AvroSchemaVisitor for AvroSchemaToSchema { } /// Converts avro schema to iceberg schema. -pub fn avro_schema_to_schema(avro_schema: &AvroSchema) -> Result { +pub(crate) fn avro_schema_to_schema(avro_schema: &AvroSchema) -> Result { if let AvroSchema::Record(_) = avro_schema { let mut converter = AvroSchemaToSchema { next_id: 0 }; let typ = visit(avro_schema, &mut converter)?.expect("Iceberg schema should not be none."); diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 0498631b0..3c525ee11 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -19,10 +19,8 @@ use crate::{avro::schema_to_avro_schema, spec::Literal, Error}; use apache_avro::{from_value, types::Value, Reader}; -use once_cell::sync::Lazy; -use std::sync::Arc; -use super::{FormatVersion, ListType, NestedField, NestedFieldRef, Schema, StructType}; +use super::{FormatVersion, Schema, StructType}; /// Snapshots are embedded in table metadata, but the list of manifests for a /// snapshot are stored in a separate manifest list file. @@ -73,263 +71,273 @@ impl ManifestList { &self.entries } - const MANIFEST_PATH: Lazy = { + /// Get the v2 schema of the manifest list entry. + pub(crate) fn v2_schema() -> Schema { + let fields = vec![ + _schema::MANIFEST_PATH.clone(), + _schema::MANIFEST_LENGTH.clone(), + _schema::PARTITION_SPEC_ID.clone(), + _schema::CONTENT.clone(), + _schema::SEQUENCE_NUMBER.clone(), + _schema::MIN_SEQUENCE_NUMBER.clone(), + _schema::ADDED_SNAPSHOT_ID.clone(), + _schema::ADDED_FILES_COUNT_V2.clone(), + _schema::EXISTING_FILES_COUNT_V2.clone(), + _schema::DELETED_FILES_COUNT_V2.clone(), + _schema::ADDED_ROWS_COUNT_V2.clone(), + _schema::EXISTING_ROWS_COUNT_V2.clone(), + _schema::DELETED_ROWS_COUNT_V2.clone(), + _schema::PARTITIONS.clone(), + _schema::KEY_METADATA.clone(), + ]; + Schema::builder().with_fields(fields).build().unwrap() + } + + /// Get the v1 schema of the manifest list entry. + pub(crate) fn v1_schema() -> Schema { + let fields = vec![ + _schema::MANIFEST_PATH.clone(), + _schema::MANIFEST_LENGTH.clone(), + _schema::PARTITION_SPEC_ID.clone(), + _schema::ADDED_SNAPSHOT_ID.clone(), + _schema::ADDED_FILES_COUNT_V1.clone().to_owned(), + _schema::EXISTING_FILES_COUNT_V1.clone(), + _schema::DELETED_FILES_COUNT_V1.clone(), + _schema::ADDED_ROWS_COUNT_V1.clone(), + _schema::EXISTING_ROWS_COUNT_V1.clone(), + _schema::DELETED_ROWS_COUNT_V1.clone(), + _schema::PARTITIONS.clone(), + _schema::KEY_METADATA.clone(), + ]; + Schema::builder().with_fields(fields).build().unwrap() + } +} + +/// This is a helper module that defines the schema field of the manifest list entry. +mod _schema { + use std::sync::Arc; + + use once_cell::sync::Lazy; + + use crate::spec::{ListType, NestedField, NestedFieldRef, PrimitiveType, StructType, Type}; + + pub(crate) static MANIFEST_PATH: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 500, "manifest_path", - super::Type::Primitive(super::PrimitiveType::String), + Type::Primitive(PrimitiveType::String), )) }) }; - const MANIFEST_LENGTH: Lazy = { + pub(crate) static MANIFEST_LENGTH: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 501, "manifest_length", - super::Type::Primitive(super::PrimitiveType::Long), + Type::Primitive(PrimitiveType::Long), )) }) }; - const PARTITION_SPEC_ID: Lazy = { + pub(crate) static PARTITION_SPEC_ID: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 502, "partition_spec_id", - super::Type::Primitive(super::PrimitiveType::Int), + Type::Primitive(PrimitiveType::Int), )) }) }; - const CONTENT: Lazy = { + pub(crate) static CONTENT: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 517, "content", - super::Type::Primitive(super::PrimitiveType::Int), + Type::Primitive(PrimitiveType::Int), )) }) }; - const SEQUENCE_NUMBER: Lazy = { + pub(crate) static SEQUENCE_NUMBER: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 515, "sequence_number", - super::Type::Primitive(super::PrimitiveType::Long), + Type::Primitive(PrimitiveType::Long), )) }) }; - const MIN_SEQUENCE_NUMBER: Lazy = { + pub(crate) static MIN_SEQUENCE_NUMBER: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 516, "min_sequence_number", - super::Type::Primitive(super::PrimitiveType::Long), + Type::Primitive(PrimitiveType::Long), )) }) }; - const ADDED_SNAPSHOT_ID: Lazy = { + pub(crate) static ADDED_SNAPSHOT_ID: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 503, "added_snapshot_id", - super::Type::Primitive(super::PrimitiveType::Long), + Type::Primitive(PrimitiveType::Long), )) }) }; - const ADDED_FILES_COUNT_V2: Lazy = { + pub(crate) static ADDED_FILES_COUNT_V2: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 504, "added_data_files_count", - super::Type::Primitive(super::PrimitiveType::Int), + Type::Primitive(PrimitiveType::Int), )) }) }; - const ADDED_FILES_COUNT_V1: Lazy = { + pub(crate) static ADDED_FILES_COUNT_V1: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 504, "added_data_files_count", - super::Type::Primitive(super::PrimitiveType::Int), + Type::Primitive(PrimitiveType::Int), )) }) }; - const EXISTING_FILES_COUNT_V2: Lazy = { + pub(crate) static EXISTING_FILES_COUNT_V2: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 505, "existing_data_files_count", - super::Type::Primitive(super::PrimitiveType::Int), + Type::Primitive(PrimitiveType::Int), )) }) }; - const EXISTING_FILES_COUNT_V1: Lazy = { + pub(crate) static EXISTING_FILES_COUNT_V1: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 505, "existing_data_files_count", - super::Type::Primitive(super::PrimitiveType::Int), + Type::Primitive(PrimitiveType::Int), )) }) }; - const DELETED_FILES_COUNT_V2: Lazy = { + pub(crate) static DELETED_FILES_COUNT_V2: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 506, "deleted_data_files_count", - super::Type::Primitive(super::PrimitiveType::Int), + Type::Primitive(PrimitiveType::Int), )) }) }; - const DELETED_FILES_COUNT_V1: Lazy = { + pub(crate) static DELETED_FILES_COUNT_V1: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 506, "deleted_data_files_count", - super::Type::Primitive(super::PrimitiveType::Int), + Type::Primitive(PrimitiveType::Int), )) }) }; - const ADDED_ROWS_COUNT_V2: Lazy = { + pub(crate) static ADDED_ROWS_COUNT_V2: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 512, "added_rows_count", - super::Type::Primitive(super::PrimitiveType::Long), + Type::Primitive(PrimitiveType::Long), )) }) }; - const ADDED_ROWS_COUNT_V1: Lazy = { + pub(crate) static ADDED_ROWS_COUNT_V1: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 512, "added_rows_count", - super::Type::Primitive(super::PrimitiveType::Long), + Type::Primitive(PrimitiveType::Long), )) }) }; - const EXISTING_ROWS_COUNT_V2: Lazy = { + pub(crate) static EXISTING_ROWS_COUNT_V2: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 513, "existing_rows_count", - super::Type::Primitive(super::PrimitiveType::Long), + Type::Primitive(PrimitiveType::Long), )) }) }; - const EXISTING_ROWS_COUNT_V1: Lazy = { + pub(crate) static EXISTING_ROWS_COUNT_V1: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 513, "existing_rows_count", - super::Type::Primitive(super::PrimitiveType::Long), + Type::Primitive(PrimitiveType::Long), )) }) }; - const DELETED_ROWS_COUNT_V2: Lazy = { + pub(crate) static DELETED_ROWS_COUNT_V2: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 514, "deleted_rows_count", - super::Type::Primitive(super::PrimitiveType::Long), + Type::Primitive(PrimitiveType::Long), )) }) }; - const DELETED_ROWS_COUNT_V1: Lazy = { + pub(crate) static DELETED_ROWS_COUNT_V1: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 514, "deleted_rows_count", - super::Type::Primitive(super::PrimitiveType::Long), + Type::Primitive(PrimitiveType::Long), )) }) }; - const PARTITIONS: Lazy = { + pub(crate) static PARTITIONS: Lazy = { Lazy::new(|| { // element type let fields = vec![ Arc::new(NestedField::required( 509, "contains_null", - super::Type::Primitive(super::PrimitiveType::Boolean), + Type::Primitive(PrimitiveType::Boolean), )), Arc::new(NestedField::optional( 518, "contains_nan", - super::Type::Primitive(super::PrimitiveType::Boolean), + Type::Primitive(PrimitiveType::Boolean), )), Arc::new(NestedField::optional( 510, "lower_bound", - super::Type::Primitive(super::PrimitiveType::Binary), + Type::Primitive(PrimitiveType::Binary), )), Arc::new(NestedField::optional( 511, "upper_bound", - super::Type::Primitive(super::PrimitiveType::Binary), + Type::Primitive(PrimitiveType::Binary), )), ]; let element_field = Arc::new(NestedField::required( 508, "r_508", - super::Type::Struct(StructType::new(fields)), + Type::Struct(StructType::new(fields)), )); Arc::new(NestedField::optional( 507, "partitions", - super::Type::List(ListType { element_field }), + Type::List(ListType { element_field }), )) }) }; - const KEY_METADATA: Lazy = { + pub(crate) static KEY_METADATA: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 519, "key_metadata", - super::Type::Primitive(super::PrimitiveType::Binary), + Type::Primitive(PrimitiveType::Binary), )) }) }; - - /// Get the v2 schema of the manifest list entry. - pub(crate) fn v2_schema() -> Schema { - let fields = vec![ - Self::MANIFEST_PATH.clone(), - Self::MANIFEST_LENGTH.clone(), - Self::PARTITION_SPEC_ID.clone(), - Self::CONTENT.clone(), - Self::SEQUENCE_NUMBER.clone(), - Self::MIN_SEQUENCE_NUMBER.clone(), - Self::ADDED_SNAPSHOT_ID.clone(), - Self::ADDED_FILES_COUNT_V2.clone(), - Self::EXISTING_FILES_COUNT_V2.clone(), - Self::DELETED_FILES_COUNT_V2.clone(), - Self::ADDED_ROWS_COUNT_V2.clone(), - Self::EXISTING_ROWS_COUNT_V2.clone(), - Self::DELETED_ROWS_COUNT_V2.clone(), - Self::PARTITIONS.clone(), - Self::KEY_METADATA.clone(), - ]; - Schema::builder().with_fields(fields).build().unwrap() - } - /// Get the v1 schema of the manifest list entry. - pub(crate) fn v1_schema() -> Schema { - let fields = vec![ - Self::MANIFEST_PATH.clone(), - Self::MANIFEST_LENGTH.clone(), - Self::PARTITION_SPEC_ID.clone(), - Self::ADDED_SNAPSHOT_ID.clone(), - Self::ADDED_FILES_COUNT_V1.clone().to_owned(), - Self::EXISTING_FILES_COUNT_V1.clone(), - Self::DELETED_FILES_COUNT_V1.clone(), - Self::ADDED_ROWS_COUNT_V1.clone(), - Self::EXISTING_ROWS_COUNT_V1.clone(), - Self::DELETED_ROWS_COUNT_V1.clone(), - Self::PARTITIONS.clone(), - Self::KEY_METADATA.clone(), - ]; - Schema::builder().with_fields(fields).build().unwrap() - } } /// Entry in a manifest list. @@ -461,11 +469,11 @@ pub struct FieldSummary { upper_bound: Option, } +/// This is a helper module that defines types to help with serialization/deserialization. +/// For deserialization the input first gets read into either the [ManifestListEntryV1] or [ManifestListEntryV2] struct +/// and then converted into the [ManifestListEntry] struct. Serialization works the other way around. +/// [ManifestListEntryV1] and [ManifestListEntryV2] are internal struct that are only used for serialization and deserialization. pub(super) mod _serde { - /// This is a helper module that defines types to help with serialization/deserialization. - /// For deserialization the input first gets read into either the [ManifestListEntryV1] or [ManifestListEntryV2] struct - /// and then converted into the [ManifestListEntry] struct. Serialization works the other way around. - /// [ManifestListEntryV1] and [ManifestListEntryV2] are internal struct that are only used for serialization and deserialization. pub use serde_bytes::ByteBuf; use serde_derive::{Deserialize, Serialize}; @@ -606,25 +614,23 @@ pub(super) mod _serde { partition_type: &StructType, ) -> Result, Error> { Ok(partitions - .and_then(|partitions| { + .map(|partitions| { let partition_types = partition_type.fields(); if partitions.len() != partition_types.len() { - return Some(Err(Error::new( + return Err(Error::new( crate::ErrorKind::DataInvalid, format!( "Invalid partition spec. Expected {} fields, got {}", partition_types.len(), partitions.len() ), - ))); + )); } - Some( - partitions - .into_iter() - .zip(partition_types) - .map(|(v, field)| v.try_into(&field.field_type)) - .collect::, _>>(), - ) + partitions + .into_iter() + .zip(partition_types) + .map(|(v, field)| v.try_into(&field.field_type)) + .collect::, _>>() }) .transpose()? .unwrap_or_default()) @@ -793,10 +799,12 @@ mod test { use std::{fs, sync::Arc}; use crate::spec::{ - FieldSummary, Literal, ManifestContentType, ManifestList, ManifestListEntry, NestedField, - PrimitiveType, StructType, Type, + manifest_list::_serde::ManifestListV1, FieldSummary, Literal, ManifestContentType, + ManifestList, ManifestListEntry, NestedField, PrimitiveType, StructType, Type, }; + use super::_serde::ManifestListV2; + #[test] fn test_parse_manifest_list_v1() { let path = format!( @@ -878,4 +886,60 @@ mod test { } ); } + + #[test] + fn test_serialize_manifest_list_v1() { + let manifest_list:ManifestListV1 = ManifestList { + entries: vec![ManifestListEntry { + manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(), + manifest_length: 5806, + partition_spec_id: 0, + content: ManifestContentType::Data, + sequence_number: 0, + min_sequence_number: 0, + added_snapshot_id: 1646658105718557341, + added_data_files_count: Some(3), + existing_data_files_count: Some(0), + deleted_data_files_count: Some(0), + added_rows_count: Some(3), + existing_rows_count: Some(0), + deleted_rows_count: Some(0), + partitions: vec![], + key_metadata: vec![], + }] + }.into(); + let result = serde_json::to_string(&manifest_list).unwrap(); + assert_eq!( + result, + r#"[{"manifest_path":"/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro","manifest_length":5806,"partition_spec_id":0,"added_snapshot_id":1646658105718557341,"added_data_files_count":3,"existing_data_files_count":0,"deleted_data_files_count":0,"added_rows_count":3,"existing_rows_count":0,"deleted_rows_count":0,"partitions":null,"key_metadata":null}]"# + ); + } + + #[test] + fn test_serialize_manifest_list_v2() { + let manifest_list:ManifestListV2 = ManifestList { + entries: vec![ManifestListEntry { + manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(), + manifest_length: 6926, + partition_spec_id: 0, + content: ManifestContentType::Data, + sequence_number: 1, + min_sequence_number: 1, + added_snapshot_id: 377075049360453639, + added_data_files_count: Some(1), + existing_data_files_count: Some(0), + deleted_data_files_count: Some(0), + added_rows_count: Some(3), + existing_rows_count: Some(0), + deleted_rows_count: Some(0), + partitions: vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Literal::long(1)), upper_bound: Some(Literal::long(1))}], + key_metadata: vec![], + }] + }.try_into().unwrap(); + let result = serde_json::to_string(&manifest_list).unwrap(); + assert_eq!( + result, + r#"[{"manifest_path":"s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro","manifest_length":6926,"partition_spec_id":0,"content":0,"sequence_number":1,"min_sequence_number":1,"added_snapshot_id":377075049360453639,"added_data_files_count":1,"existing_data_files_count":0,"deleted_data_files_count":0,"added_rows_count":3,"existing_rows_count":0,"deleted_rows_count":0,"partitions":[{"contains_null":false,"contains_nan":false,"lower_bound":[1,0,0,0,0,0,0,0],"upper_bound":[1,0,0,0,0,0,0,0]}],"key_metadata":null}]"# + ); + } } From a801d0addc2a639636a33f7c171db8d40c56215d Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Fri, 22 Sep 2023 10:11:26 +0800 Subject: [PATCH 4/5] fix typos --- crates/iceberg/src/spec/manifest_list.rs | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 3c525ee11..cb6fdb263 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -43,8 +43,6 @@ pub struct ManifestList { impl ManifestList { /// Parse manifest list from bytes. - /// - /// QUESTION: Will we have more than one manifest list in a single file? pub fn parse_with_version( bs: &[u8], version: FormatVersion, @@ -609,7 +607,7 @@ pub(super) mod _serde { } } - fn try_convert_to_field_sumary( + fn try_convert_to_field_summary( partitions: Option>, partition_type: &StructType, ) -> Result, Error> { @@ -640,7 +638,7 @@ pub(super) mod _serde { /// Converts the [ManifestListEntryV2] into a [ManifestListEntry]. /// The convert of [partitions] need the partition_type info so use this function instead of [std::TryFrom] trait. pub fn try_into(self, partition_type: &StructType) -> Result { - let partitions = try_convert_to_field_sumary(self.partitions, partition_type)?; + let partitions = try_convert_to_field_summary(self.partitions, partition_type)?; Ok(ManifestListEntry { manifest_path: self.manifest_path, manifest_length: self.manifest_length, @@ -665,7 +663,7 @@ pub(super) mod _serde { /// Converts the [ManifestListEntryV1] into a [ManifestListEntry]. /// The convert of [partitions] need the partition_type info so use this function instead of [std::TryFrom] trait. pub fn try_into(self, partition_type: &StructType) -> Result { - let partitions = try_convert_to_field_sumary(self.partitions, partition_type)?; + let partitions = try_convert_to_field_summary(self.partitions, partition_type)?; Ok(ManifestListEntry { manifest_path: self.manifest_path, manifest_length: self.manifest_length, @@ -733,37 +731,37 @@ pub(super) mod _serde { added_data_files_count: value.added_data_files_count.ok_or_else(|| { Error::new( crate::ErrorKind::DataInvalid, - "added_data_files_count in ManifestListEntryV2 shold be require", + "added_data_files_count in ManifestListEntryV2 should be require", ) })?, existing_data_files_count: value.existing_data_files_count.ok_or_else(|| { Error::new( crate::ErrorKind::DataInvalid, - "existing_data_files_count in ManifestListEntryV2 shold be require", + "existing_data_files_count in ManifestListEntryV2 should be require", ) })?, deleted_data_files_count: value.deleted_data_files_count.ok_or_else(|| { Error::new( crate::ErrorKind::DataInvalid, - "deleted_data_files_count in ManifestListEntryV2 shold be require", + "deleted_data_files_count in ManifestListEntryV2 should be require", ) })?, added_rows_count: value.added_rows_count.ok_or_else(|| { Error::new( crate::ErrorKind::DataInvalid, - "added_rows_count in ManifestListEntryV2 shold be require", + "added_rows_count in ManifestListEntryV2 should be require", ) })?, existing_rows_count: value.existing_rows_count.ok_or_else(|| { Error::new( crate::ErrorKind::DataInvalid, - "existing_rows_count in ManifestListEntryV2 shold be require", + "existing_rows_count in ManifestListEntryV2 should be require", ) })?, deleted_rows_count: value.deleted_rows_count.ok_or_else(|| { Error::new( crate::ErrorKind::DataInvalid, - "deleted_rows_count in ManifestListEntryV2 shold be require", + "deleted_rows_count in ManifestListEntryV2 should be require", ) })?, partitions, From b68196c9cde50a05bf8badee5af240fe29f2b4c0 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Fri, 22 Sep 2023 13:44:12 +0800 Subject: [PATCH 5/5] rename _schema to _const_fields --- crates/iceberg/src/spec/manifest_list.rs | 56 ++++++++++++------------ 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index cb6fdb263..71e232594 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -72,21 +72,21 @@ impl ManifestList { /// Get the v2 schema of the manifest list entry. pub(crate) fn v2_schema() -> Schema { let fields = vec![ - _schema::MANIFEST_PATH.clone(), - _schema::MANIFEST_LENGTH.clone(), - _schema::PARTITION_SPEC_ID.clone(), - _schema::CONTENT.clone(), - _schema::SEQUENCE_NUMBER.clone(), - _schema::MIN_SEQUENCE_NUMBER.clone(), - _schema::ADDED_SNAPSHOT_ID.clone(), - _schema::ADDED_FILES_COUNT_V2.clone(), - _schema::EXISTING_FILES_COUNT_V2.clone(), - _schema::DELETED_FILES_COUNT_V2.clone(), - _schema::ADDED_ROWS_COUNT_V2.clone(), - _schema::EXISTING_ROWS_COUNT_V2.clone(), - _schema::DELETED_ROWS_COUNT_V2.clone(), - _schema::PARTITIONS.clone(), - _schema::KEY_METADATA.clone(), + _const_fields::MANIFEST_PATH.clone(), + _const_fields::MANIFEST_LENGTH.clone(), + _const_fields::PARTITION_SPEC_ID.clone(), + _const_fields::CONTENT.clone(), + _const_fields::SEQUENCE_NUMBER.clone(), + _const_fields::MIN_SEQUENCE_NUMBER.clone(), + _const_fields::ADDED_SNAPSHOT_ID.clone(), + _const_fields::ADDED_FILES_COUNT_V2.clone(), + _const_fields::EXISTING_FILES_COUNT_V2.clone(), + _const_fields::DELETED_FILES_COUNT_V2.clone(), + _const_fields::ADDED_ROWS_COUNT_V2.clone(), + _const_fields::EXISTING_ROWS_COUNT_V2.clone(), + _const_fields::DELETED_ROWS_COUNT_V2.clone(), + _const_fields::PARTITIONS.clone(), + _const_fields::KEY_METADATA.clone(), ]; Schema::builder().with_fields(fields).build().unwrap() } @@ -94,25 +94,25 @@ impl ManifestList { /// Get the v1 schema of the manifest list entry. pub(crate) fn v1_schema() -> Schema { let fields = vec![ - _schema::MANIFEST_PATH.clone(), - _schema::MANIFEST_LENGTH.clone(), - _schema::PARTITION_SPEC_ID.clone(), - _schema::ADDED_SNAPSHOT_ID.clone(), - _schema::ADDED_FILES_COUNT_V1.clone().to_owned(), - _schema::EXISTING_FILES_COUNT_V1.clone(), - _schema::DELETED_FILES_COUNT_V1.clone(), - _schema::ADDED_ROWS_COUNT_V1.clone(), - _schema::EXISTING_ROWS_COUNT_V1.clone(), - _schema::DELETED_ROWS_COUNT_V1.clone(), - _schema::PARTITIONS.clone(), - _schema::KEY_METADATA.clone(), + _const_fields::MANIFEST_PATH.clone(), + _const_fields::MANIFEST_LENGTH.clone(), + _const_fields::PARTITION_SPEC_ID.clone(), + _const_fields::ADDED_SNAPSHOT_ID.clone(), + _const_fields::ADDED_FILES_COUNT_V1.clone().to_owned(), + _const_fields::EXISTING_FILES_COUNT_V1.clone(), + _const_fields::DELETED_FILES_COUNT_V1.clone(), + _const_fields::ADDED_ROWS_COUNT_V1.clone(), + _const_fields::EXISTING_ROWS_COUNT_V1.clone(), + _const_fields::DELETED_ROWS_COUNT_V1.clone(), + _const_fields::PARTITIONS.clone(), + _const_fields::KEY_METADATA.clone(), ]; Schema::builder().with_fields(fields).build().unwrap() } } /// This is a helper module that defines the schema field of the manifest list entry. -mod _schema { +mod _const_fields { use std::sync::Arc; use once_cell::sync::Lazy;