Skip to content

Commit

Permalink
refactor: remove support of manifest list format as a list of file pa…
Browse files Browse the repository at this point in the history
…th (apache#201)

* refactor: remove support of manifest list format as a list of file paths#158

* refactor: add field definition to manifest list

* refactor: delete duplicated function

* refactor: fix duplicate function name
  • Loading branch information
Dysprosium0626 authored and shaeqahmed committed Dec 9, 2024
1 parent 426b670 commit 36fb5d4
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 101 deletions.
3 changes: 1 addition & 2 deletions crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,6 @@ mod _serde {
#[cfg(test)]
mod tests {
use chrono::{TimeZone, Utc};
use iceberg::spec::ManifestListLocation::ManifestListFile;
use iceberg::spec::{
FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type,
Expand Down Expand Up @@ -1146,7 +1145,7 @@ mod tests {
assert_eq!(vec![&Arc::new(Snapshot::builder()
.with_snapshot_id(3497810964824022504)
.with_timestamp_ms(1646787054459)
.with_manifest_list(ManifestListFile("s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro".to_string()))
.with_manifest_list("s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro")
.with_sequence_number(0)
.with_schema_id(0)
.with_summary(Summary {
Expand Down
3 changes: 1 addition & 2 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,6 @@ pub enum TableUpdate {

#[cfg(test)]
mod tests {
use crate::spec::ManifestListLocation::ManifestListFile;
use crate::spec::{
FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary,
Expand Down Expand Up @@ -911,7 +910,7 @@ mod tests {
.with_parent_snapshot_id(Some(3051729675574597000))
.with_timestamp_ms(1555100955770)
.with_sequence_number(1)
.with_manifest_list(ManifestListFile("s3://a/b/2.avro".to_string()))
.with_manifest_list("s3://a/b/2.avro")
.with_schema_id(1)
.with_summary(Summary {
operation: Operation::Append,
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ mod tests {
fixture
.table
.file_io()
.new_output(current_snapshot.manifest_list_file_path().unwrap())
.new_output(current_snapshot.manifest_list())
.unwrap(),
current_snapshot.snapshot_id(),
current_snapshot
Expand Down
131 changes: 45 additions & 86 deletions crates/iceberg/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,24 +84,16 @@ pub struct Snapshot {
timestamp_ms: i64,
/// The location of a manifest list for this snapshot that
/// tracks manifest files with additional metadata.
manifest_list: ManifestListLocation,
/// Currently we only support manifest list file, and manifest files are not supported.
#[builder(setter(into))]
manifest_list: String,
/// A string map that summarizes the snapshot changes, including operation.
summary: Summary,
/// ID of the table’s current schema when the snapshot was created.
#[builder(setter(strip_option), default = None)]
schema_id: Option<SchemaId>,
}

/// Type to distinguish between a path to a manifestlist file or a vector of manifestfile locations
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(untagged)]
pub enum ManifestListLocation {
/// Location of manifestlist file
ManifestListFile(String),
/// Manifestfile locations
ManifestFiles(Vec<String>),
}

impl Snapshot {
/// Get the id of the snapshot
#[inline]
Expand All @@ -122,23 +114,10 @@ impl Snapshot {
}
/// Get location of manifest_list file
#[inline]
pub fn manifest_list(&self) -> &ManifestListLocation {
pub fn manifest_list(&self) -> &str {
&self.manifest_list
}

/// Return the manifest list file path.
///
/// It will return an error if the manifest list is not a file but a list of manifest file paths.
#[inline]
pub fn manifest_list_file_path(&self) -> Result<&str> {
match &self.manifest_list {
ManifestListLocation::ManifestListFile(s) => Ok(s),
_ => Err(Error::new(
ErrorKind::DataInvalid,
"Manifest list is not a file but a list of manifest files.",
)),
}
}
/// Get summary of the snapshot
#[inline]
pub fn summary(&self) -> &Summary {
Expand Down Expand Up @@ -187,31 +166,28 @@ impl Snapshot {
file_io: &FileIO,
table_metadata: &TableMetadata,
) -> Result<ManifestList> {
match &self.manifest_list {
ManifestListLocation::ManifestListFile(file) => {
let mut manifest_list_content= Vec::new();
file_io
.new_input(file)?
.reader().await?
.read_to_end(&mut manifest_list_content)
.await?;

let schema = self.schema(table_metadata)?;

let partition_type_provider = |partition_spec_id: i32| -> Result<Option<StructType>> {
table_metadata.partition_spec_by_id(partition_spec_id).map(|partition_spec| {
partition_spec.partition_type(&schema)
}).transpose()
};

ManifestList::parse_with_version(&manifest_list_content, table_metadata.format_version(),
partition_type_provider, )
}
ManifestListLocation::ManifestFiles(_) => Err(Error::new(
ErrorKind::FeatureUnsupported,
"Loading manifests from `manifests` is currently not supported, we only support loading from `manifest-list` file, see https://iceberg.apache.org/spec/#snapshots for more information.",
)),
}
let mut manifest_list_content = Vec::new();
file_io
.new_input(&self.manifest_list)?
.reader()
.await?
.read_to_end(&mut manifest_list_content)
.await?;

let schema = self.schema(table_metadata)?;

let partition_type_provider = |partition_spec_id: i32| -> Result<Option<StructType>> {
table_metadata
.partition_spec_by_id(partition_spec_id)
.map(|partition_spec| partition_spec.partition_type(&schema))
.transpose()
};

ManifestList::parse_with_version(
&manifest_list_content,
table_metadata.format_version(),
partition_type_provider,
)
}

pub(crate) fn log(&self) -> SnapshotLog {
Expand All @@ -232,9 +208,9 @@ pub(super) mod _serde {
use serde::{Deserialize, Serialize};

use crate::spec::SchemaId;
use crate::{Error, ErrorKind};
use crate::Error;

use super::{ManifestListLocation, Operation, Snapshot, Summary};
use super::{Operation, Snapshot, Summary};

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
Expand Down Expand Up @@ -276,7 +252,7 @@ pub(super) mod _serde {
parent_snapshot_id: v2.parent_snapshot_id,
sequence_number: v2.sequence_number,
timestamp_ms: v2.timestamp_ms,
manifest_list: ManifestListLocation::ManifestListFile(v2.manifest_list),
manifest_list: v2.manifest_list,
summary: v2.summary,
schema_id: v2.schema_id,
}
Expand All @@ -286,17 +262,14 @@ pub(super) mod _serde {
impl From<Snapshot> for SnapshotV2 {
fn from(v2: Snapshot) -> Self {
SnapshotV2 {
snapshot_id: v2.snapshot_id,
parent_snapshot_id: v2.parent_snapshot_id,
sequence_number: v2.sequence_number,
timestamp_ms: v2.timestamp_ms,
manifest_list: match v2.manifest_list {
ManifestListLocation::ManifestListFile(file) => file,
ManifestListLocation::ManifestFiles(_) => panic!("Wrong table format version. Can't convert a list of manifest files into a location of a manifest file.")
},
summary: v2.summary,
schema_id: v2.schema_id,
}
snapshot_id: v2.snapshot_id,
parent_snapshot_id: v2.parent_snapshot_id,
sequence_number: v2.sequence_number,
timestamp_ms: v2.timestamp_ms,
manifest_list: v2.manifest_list,
summary: v2.summary,
schema_id: v2.schema_id,
}
}
}

Expand All @@ -310,15 +283,10 @@ pub(super) mod _serde {
sequence_number: 0,
timestamp_ms: v1.timestamp_ms,
manifest_list: match (v1.manifest_list, v1.manifests) {
(Some(file), _) => ManifestListLocation::ManifestListFile(file),
(None, Some(files)) => ManifestListLocation::ManifestFiles(files),
(None, None) => {
return Err(Error::new(
ErrorKind::DataInvalid,
"Neither manifestlist file or manifest files are provided.",
))
}
},
(Some(file), None) => file,
(Some(_), Some(_)) => "Invalid v1 snapshot, when manifest list provided, manifest files should be omitted".to_string(),
(None, _) => "Unsupported v1 snapshot, only manifest list is supported".to_string()
},
summary: v1.summary.unwrap_or(Summary {
operation: Operation::default(),
other: HashMap::new(),
Expand All @@ -330,18 +298,14 @@ pub(super) mod _serde {

impl From<Snapshot> for SnapshotV1 {
fn from(v2: Snapshot) -> Self {
let (manifest_list, manifests) = match v2.manifest_list {
ManifestListLocation::ManifestListFile(file) => (Some(file), None),
ManifestListLocation::ManifestFiles(files) => (None, Some(files)),
};
SnapshotV1 {
snapshot_id: v2.snapshot_id,
parent_snapshot_id: v2.parent_snapshot_id,
timestamp_ms: v2.timestamp_ms,
manifest_list,
manifests,
manifest_list: Some(v2.manifest_list),
summary: Some(v2.summary),
schema_id: v2.schema_id,
manifests: None,
}
}
}
Expand Down Expand Up @@ -403,9 +367,7 @@ mod tests {
use chrono::{TimeZone, Utc};
use std::collections::HashMap;

use crate::spec::snapshot::{
ManifestListLocation, Operation, Snapshot, Summary, _serde::SnapshotV1,
};
use crate::spec::snapshot::{Operation, Snapshot, Summary, _serde::SnapshotV1};

#[test]
fn schema() {
Expand Down Expand Up @@ -437,9 +399,6 @@ mod tests {
},
*result.summary()
);
assert_eq!(
ManifestListLocation::ManifestListFile("s3://b/wh/.../s1.avro".to_string()),
*result.manifest_list()
);
assert_eq!("s3://b/wh/.../s1.avro".to_string(), *result.manifest_list());
}
}
16 changes: 6 additions & 10 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,9 +839,9 @@ mod tests {
use pretty_assertions::assert_eq;

use crate::spec::{
table_metadata::TableMetadata, ManifestListLocation, NestedField, NullOrder, Operation,
PartitionField, PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference,
SnapshotRetention, SortDirection, SortField, SortOrder, Summary, Transform, Type,
table_metadata::TableMetadata, NestedField, NullOrder, Operation, PartitionField,
PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention,
SortDirection, SortField, SortOrder, Summary, Transform, Type,
};

use super::{FormatVersion, MetadataLog, SnapshotLog};
Expand Down Expand Up @@ -1104,7 +1104,7 @@ mod tests {
.with_timestamp_ms(1662532818843)
.with_sequence_number(0)
.with_schema_id(0)
.with_manifest_list(ManifestListLocation::ManifestListFile("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string()))
.with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro")
.with_summary(Summary { operation: Operation::Append, other: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
.build();

Expand Down Expand Up @@ -1228,9 +1228,7 @@ mod tests {
.with_snapshot_id(3051729675574597004)
.with_timestamp_ms(1515100955770)
.with_sequence_number(0)
.with_manifest_list(ManifestListLocation::ManifestListFile(
"s3://a/b/1.avro".to_string(),
))
.with_manifest_list("s3://a/b/1.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::new(),
Expand All @@ -1243,9 +1241,7 @@ mod tests {
.with_timestamp_ms(1555100955770)
.with_sequence_number(1)
.with_schema_id(1)
.with_manifest_list(ManifestListLocation::ManifestListFile(
"s3://a/b/2.avro".to_string(),
))
.with_manifest_list("s3://a/b/2.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::new(),
Expand Down

0 comments on commit 36fb5d4

Please sign in to comment.