Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support UnboundPartitionSpec #106

Merged
merged 5 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@
/target
/Cargo.lock
.idea
.vscode
**/.DS_Store
16 changes: 7 additions & 9 deletions crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ mod _serde {

use serde_derive::{Deserialize, Serialize};

use iceberg::spec::{PartitionSpec, Schema, SortOrder, TableMetadata};
use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec};
use iceberg::{Error, ErrorKind, Namespace, TableIdent, TableRequirement, TableUpdate};

pub(super) const OK: u16 = 200u16;
Expand Down Expand Up @@ -660,7 +660,7 @@ mod _serde {
pub(super) name: String,
pub(super) location: Option<String>,
pub(super) schema: Schema,
pub(super) partition_spec: Option<PartitionSpec>,
pub(super) partition_spec: Option<UnboundPartitionSpec>,
pub(super) write_order: Option<SortOrder>,
pub(super) stage_create: Option<bool>,
pub(super) properties: Option<HashMap<String, String>>,
Expand All @@ -686,9 +686,9 @@ mod tests {
use chrono::{TimeZone, Utc};
use iceberg::spec::ManifestListLocation::ManifestListFile;
use iceberg::spec::{
FormatVersion, NestedField, NullOrder, Operation, PartitionField, PartitionSpec,
PrimitiveType, Schema, Snapshot, SnapshotLog, SortDirection, SortField, SortOrder, Summary,
Transform, Type,
FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type,
UnboundPartitionField, UnboundPartitionSpec,
};
use iceberg::transaction::Transaction;
use mockito::{Mock, Server, ServerGuard};
Expand Down Expand Up @@ -1233,14 +1233,12 @@ mod tests {
)
.properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
.partition_spec(
PartitionSpec::builder()
.with_fields(vec![PartitionField::builder()
UnboundPartitionSpec::builder()
.with_fields(vec![UnboundPartitionField::builder()
.source_id(1)
.field_id(1000)
.transform(Transform::Truncate(3))
.name("id".to_string())
.build()])
.with_spec_id(1)
.build()
.unwrap(),
)
Expand Down
36 changes: 15 additions & 21 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
use serde_derive::{Deserialize, Serialize};
use urlencoding::encode;

use crate::spec::{FormatVersion, PartitionSpec, Schema, Snapshot, SnapshotReference, SortOrder};
use crate::spec::{
FormatVersion, Schema, Snapshot, SnapshotReference, SortOrder, UnboundPartitionSpec,
};
use crate::table::Table;
use crate::{Error, ErrorKind, Result};
use async_trait::async_trait;
Expand Down Expand Up @@ -226,7 +228,7 @@ pub struct TableCreation {
pub schema: Schema,
/// The partition spec of the table, could be None.
#[builder(default, setter(strip_option))]
pub partition_spec: Option<PartitionSpec>,
pub partition_spec: Option<UnboundPartitionSpec>,
/// The sort order of the table.
#[builder(default, setter(strip_option))]
pub sort_order: Option<SortOrder>,
Expand Down Expand Up @@ -361,7 +363,7 @@ pub enum TableUpdate {
/// Add a new partition spec to the table
AddSpec {
/// The partition spec to add.
spec: PartitionSpec,
spec: UnboundPartitionSpec,
},
/// Set table's default spec
#[serde(rename_all = "kebab-case")]
Expand Down Expand Up @@ -429,9 +431,9 @@ pub enum TableUpdate {
mod tests {
use crate::spec::ManifestListLocation::ManifestListFile;
use crate::spec::{
FormatVersion, NestedField, NullOrder, Operation, PartitionField, PartitionSpec,
PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection,
SortField, SortOrder, Summary, Transform, Type,
FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary,
Transform, Type, UnboundPartitionField, UnboundPartitionSpec,
};
use crate::{NamespaceIdent, TableIdent, TableRequirement, TableUpdate};
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -758,23 +760,19 @@ mod tests {
{
"action": "add-spec",
"spec": {
"spec-id": 1,
"fields": [
{
"source-id": 4,
"field-id": 1000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this field_id in request is optional?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like it is for historical reasons. For old specs they can be missing, and they'll just auto-increment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, cleaned up the spec a bit: apache/iceberg#9240

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reference, I saw the comments now. But I think in the rest api request, it's reasonable to make it optional, and ask the server to assign a meaningful field id to it?

"name": "ts_day",
"transform": "day"
},
{
"source-id": 1,
"field-id": 1001,
"name": "id_bucket",
"transform": "bucket[16]"
},
{
"source-id": 2,
"field-id": 1002,
"name": "id_truncate",
"transform": "truncate[4]"
}
Expand All @@ -783,28 +781,24 @@ mod tests {
}
"#,
TableUpdate::AddSpec {
spec: PartitionSpec::builder()
.with_spec_id(1)
.with_partition_field(
PartitionField::builder()
spec: UnboundPartitionSpec::builder()
.with_unbound_partition_field(
UnboundPartitionField::builder()
.source_id(4)
.field_id(1000)
.name("ts_day".to_string())
.transform(Transform::Day)
.build(),
)
.with_partition_field(
PartitionField::builder()
.with_unbound_partition_field(
UnboundPartitionField::builder()
.source_id(1)
.field_id(1001)
.name("id_bucket".to_string())
.transform(Transform::Bucket(16))
.build(),
)
.with_partition_field(
PartitionField::builder()
.with_unbound_partition_field(
UnboundPartitionField::builder()
.source_id(2)
.field_id(1002)
.name("id_truncate".to_string())
.transform(Transform::Truncate(4))
.build(),
Expand Down
108 changes: 103 additions & 5 deletions crates/iceberg/src/spec/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use super::transform::Transform;

/// Reference to [`PartitionSpec`].
pub type PartitionSpecRef = Arc<PartitionSpec>;
/// Partition fields capture the transform from table data to partition values.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
#[serde(rename_all = "kebab-case")]
/// Partition fields capture the transform from table data to partition values.
pub struct PartitionField {
/// A source column id from the table’s schema
pub source_id: i32,
Expand All @@ -41,10 +41,10 @@ pub struct PartitionField {
pub transform: Transform,
}

/// Partition spec that defines how to produce a tuple of partition values from a record.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder)]
#[serde(rename_all = "kebab-case")]
#[builder(setter(prefix = "with"))]
/// Partition spec that defines how to produce a tuple of partition values from a record.
pub struct PartitionSpec {
/// Identifier for PartitionSpec
pub spec_id: i32,
Expand All @@ -60,13 +60,51 @@ impl PartitionSpec {
}
}

/// Reference to [`UnboundPartitionSpec`].
pub type UnboundPartitionSpecRef = Arc<UnboundPartitionSpec>;
/// Unbound partition field can be built without a schema and later bound to a schema.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
#[serde(rename_all = "kebab-case")]
pub struct UnboundPartitionField {
/// A source column id from the table’s schema
pub source_id: i32,
/// A partition field id that is used to identify a partition field and is unique within a partition spec.
/// In v2 table metadata, it is unique across all partition specs.
#[builder(default, setter(strip_option))]
pub partition_id: Option<i32>,
/// A partition name.
pub name: String,
/// A transform that is applied to the source column to produce a partition value.
pub transform: Transform,
}

/// Unbound partition spec can be built without a schema and later bound to a schema.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use TypedBuilder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't find a neat way to add #[builder(setter(each(name = "with_partition_field")))] in TypedBuilder.
Accoding to this, it may need to use mutators?
Maybe we can try later?

#[serde(rename_all = "kebab-case")]
#[builder(setter(prefix = "with"))]
pub struct UnboundPartitionSpec {
my-vegetable-has-exploded marked this conversation as resolved.
Show resolved Hide resolved
/// Identifier for PartitionSpec
#[builder(default, setter(strip_option))]
pub spec_id: Option<i32>,
/// Details of the partition spec
#[builder(setter(each(name = "with_unbound_partition_field")))]
pub fields: Vec<UnboundPartitionField>,
}

impl UnboundPartitionSpec {
/// Create unbound partition spec builer
pub fn builder() -> UnboundPartitionSpecBuilder {
UnboundPartitionSpecBuilder::default()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn partition_spec() {
let sort_order = r#"
fn test_partition_spec() {
let spec = r#"
{
"spec-id": 1,
"fields": [ {
Expand All @@ -88,7 +126,7 @@ mod tests {
}
"#;

let partition_spec: PartitionSpec = serde_json::from_str(sort_order).unwrap();
let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
assert_eq!(4, partition_spec.fields[0].source_id);
assert_eq!(1000, partition_spec.fields[0].field_id);
assert_eq!("ts_day", partition_spec.fields[0].name);
Expand All @@ -104,4 +142,64 @@ mod tests {
assert_eq!("id_truncate", partition_spec.fields[2].name);
assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);
}

#[test]
fn test_unbound_partition_spec() {
let spec = r#"
{
"spec-id": 1,
"fields": [ {
"source-id": 4,
"partition-id": 1000,
"name": "ts_day",
"transform": "day"
}, {
"source-id": 1,
"partition-id": 1001,
"name": "id_bucket",
"transform": "bucket[16]"
}, {
"source-id": 2,
"partition-id": 1002,
"name": "id_truncate",
"transform": "truncate[4]"
} ]
}
"#;

let partition_spec: UnboundPartitionSpec = serde_json::from_str(spec).unwrap();
assert_eq!(Some(1), partition_spec.spec_id);

assert_eq!(4, partition_spec.fields[0].source_id);
assert_eq!(Some(1000), partition_spec.fields[0].partition_id);
assert_eq!("ts_day", partition_spec.fields[0].name);
assert_eq!(Transform::Day, partition_spec.fields[0].transform);

assert_eq!(1, partition_spec.fields[1].source_id);
assert_eq!(Some(1001), partition_spec.fields[1].partition_id);
assert_eq!("id_bucket", partition_spec.fields[1].name);
assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform);

assert_eq!(2, partition_spec.fields[2].source_id);
assert_eq!(Some(1002), partition_spec.fields[2].partition_id);
assert_eq!("id_truncate", partition_spec.fields[2].name);
assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);

let spec = r#"
{
"fields": [ {
"source-id": 4,
"name": "ts_day",
"transform": "day"
} ]
}
"#;
let partition_spec: UnboundPartitionSpec = serde_json::from_str(spec).unwrap();
assert_eq!(None, partition_spec.spec_id);

assert_eq!(4, partition_spec.fields[0].source_id);
assert_eq!(None, partition_spec.fields[0].partition_id);
assert_eq!("ts_day", partition_spec.fields[0].name);
assert_eq!(Transform::Day, partition_spec.fields[0].transform);
}
}
Loading