Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix (manifest-list): added serde aliases to support both forms conventions #365

Merged
merged 4 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 57 additions & 5 deletions crates/iceberg/src/spec/manifest_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl ManifestList {
from_value::<_serde::ManifestListV1>(&values)?.try_into(partition_type_provider)
}
FormatVersion::V2 => {
let reader = Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V2, bs)?;
let reader = Reader::new(bs)?;
let values = Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
from_value::<_serde::ManifestListV2>(&values)?.try_into(partition_type_provider)
}
Expand Down Expand Up @@ -802,6 +802,9 @@ pub(super) mod _serde {
pub key_metadata: Option<ByteBuf>,
}

// Aliases were added to fields that were renamed in Iceberg 1.5.0 (https://github.com/apache/iceberg/pull/5338), in order to support both conventions/versions.
// In the current implementation deserialization is done using field names, and therefore these fields may appear as either.
// see issue that raised this here: https://github.com/apache/iceberg-rust/issues/338
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub(super) struct ManifestFileV2 {
pub manifest_path: String,
Expand All @@ -811,8 +814,11 @@ pub(super) mod _serde {
pub sequence_number: i64,
pub min_sequence_number: i64,
pub added_snapshot_id: i64,
#[serde(alias = "added_data_files_count", alias = "added_files_count")]
pub added_data_files_count: i32,
#[serde(alias = "existing_data_files_count", alias = "existing_files_count")]
pub existing_data_files_count: i32,
#[serde(alias = "deleted_data_files_count", alias = "deleted_files_count")]
pub deleted_data_files_count: i32,
pub added_rows_count: i64,
pub existing_rows_count: i64,
Expand Down Expand Up @@ -1089,16 +1095,16 @@ pub(super) mod _serde {

#[cfg(test)]
mod test {
use apache_avro::{Reader, Schema};
use std::{collections::HashMap, fs, sync::Arc};

use tempfile::TempDir;

use crate::{
io::FileIOBuilder,
spec::{
manifest_list::{_serde::ManifestListV1, UNASSIGNED_SEQUENCE_NUMBER},
FieldSummary, Literal, ManifestContentType, ManifestFile, ManifestList,
ManifestListWriter, NestedField, PrimitiveType, StructType, Type,
manifest_list::_serde::ManifestListV1, FieldSummary, Literal, ManifestContentType,
ManifestFile, ManifestList, ManifestListWriter, NestedField, PrimitiveType, StructType,
Type, UNASSIGNED_SEQUENCE_NUMBER,
},
};

Expand Down Expand Up @@ -1462,4 +1468,50 @@ mod test {

temp_dir.close().unwrap();
}

#[tokio::test]
async fn test_manifest_list_v2_deserializer_aliases() {
// reading avro manifest file generated by iceberg 1.4.0
let avro_1_path = "testdata/manifests_lists/manifest-list-v2-1.avro";
let bs_1 = fs::read(avro_1_path).unwrap();
let avro_1_fields = read_avro_schema_fields_as_str(bs_1.clone()).await;
assert_eq!(
avro_1_fields,
"manifest_path, manifest_length, partition_spec_id, content, sequence_number, min_sequence_number, added_snapshot_id, added_data_files_count, existing_data_files_count, deleted_data_files_count, added_rows_count, existing_rows_count, deleted_rows_count, partitions"
);
// reading avro manifest file generated by iceberg 1.5.0
let avro_2_path = "testdata/manifests_lists/manifest-list-v2-2.avro";
let bs_2 = fs::read(avro_2_path).unwrap();
let avro_2_fields = read_avro_schema_fields_as_str(bs_2.clone()).await;
assert_eq!(
avro_2_fields,
"manifest_path, manifest_length, partition_spec_id, content, sequence_number, min_sequence_number, added_snapshot_id, added_files_count, existing_files_count, deleted_files_count, added_rows_count, existing_rows_count, deleted_rows_count, partitions"
);
// deserializing both files to ManifestList struct
let _manifest_list_1 =
ManifestList::parse_with_version(&bs_1, crate::spec::FormatVersion::V2, move |_id| {
Ok(Some(StructType::new(vec![])))
})
.unwrap();
let _manifest_list_2 =
ManifestList::parse_with_version(&bs_2, crate::spec::FormatVersion::V2, move |_id| {
Ok(Some(StructType::new(vec![])))
})
.unwrap();
}

async fn read_avro_schema_fields_as_str(bs: Vec<u8>) -> String {
let reader = Reader::new(&bs[..]).unwrap();
let schema = reader.writer_schema();
let fields: String = match schema {
Schema::Record(record) => record
.fields
.iter()
.map(|field| field.name.clone())
.collect::<Vec<String>>()
.join(", "),
_ => "".to_string(),
};
fields
}
}
Binary file not shown.
Binary file not shown.
Loading