From 79805ad80e80cedcda36c726478506af05ead590 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Mon, 7 Aug 2023 18:11:48 +0800 Subject: [PATCH 1/2] Define schema post order visitor --- crates/iceberg/Cargo.toml | 5 +- crates/iceberg/src/error.rs | 25 + crates/iceberg/src/spec/datatypes.rs | 99 ++- crates/iceberg/src/spec/schema.rs | 980 ++++++++++++++++++++++++++- 4 files changed, 1065 insertions(+), 44 deletions(-) diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index e1b5de2cb..0b27a5622 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -28,7 +28,7 @@ keywords = ["iceberg"] [dependencies] apache-avro = "0.15.0" -serde = "^1.0" +serde = {version = "^1.0", features = ["rc"]} serde_bytes = "0.11.8" serde_json = "^1.0" serde_derive = "^1.0" @@ -39,6 +39,9 @@ chrono = "0.4" uuid = "1.4.1" ordered-float = "3.7.0" bitvec = "1.0.1" +itertools = "0.11" +bimap = "0.6" + [dev-dependencies] pretty_assertions = "1.4.0" diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index 6cb41db79..b2a4ba816 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -241,6 +241,12 @@ impl Error { pub fn kind(&self) -> ErrorKind { self.kind } + + /// Return error's message. + #[inline] + pub fn message(&self) -> &str { + self.message.as_str() + } } macro_rules! define_from_err { @@ -283,6 +289,25 @@ define_from_err!( "Failed to convert between uuid und iceberg value" ); +/// Helper macro to check arguments. +/// +/// +/// Example: +/// +/// Following example check `a > 0`, otherwise returns an error. +/// ```ignore +/// use iceberg::check; +/// ensure_data_valid!(a > 0, "{} is not positive.", a); +/// ``` +#[macro_export] +macro_rules! ensure_data_valid { + ($cond: expr, $fmt: literal, $($arg:tt)*) => { + if !$cond { + return Err($crate::error::Error::new($crate::error::ErrorKind::DataInvalid, format!($fmt, $($arg)*))) + } + }; +} + #[cfg(test)] mod tests { use anyhow::anyhow; diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index 5750ab968..6c9f3723c 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -23,12 +23,13 @@ use serde::de::{Error, IntoDeserializer}; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; use std::cell::OnceCell; use std::slice::Iter; +use std::sync::Arc; use std::{collections::HashMap, fmt, ops::Index}; /// Field name for list type. -const LIST_FILED_NAME: &str = "element"; -const MAP_KEY_FIELD_NAME: &str = "key"; -const MAP_VALUE_FIELD_NAME: &str = "value"; +pub(crate) const LIST_FILED_NAME: &str = "element"; +pub(crate) const MAP_KEY_FIELD_NAME: &str = "key"; +pub(crate) const MAP_VALUE_FIELD_NAME: &str = "value"; #[derive(Debug, PartialEq, Eq, Clone)] /// All data types are either primitives or nested types, which are maps, lists, or structs. @@ -54,6 +55,20 @@ impl fmt::Display for Type { } } +impl Type { + /// Whether the type is primitive type. + #[inline(always)] + pub fn is_primitive(&self) -> bool { + matches!(self, Type::Primitive(_)) + } + + /// Whether the type is struct type. + #[inline(always)] + pub fn is_struct(&self) -> bool { + matches!(self, Type::Struct(_)) + } +} + /// Primitive data types #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(rename_all = "lowercase", remote = "Self")] @@ -218,7 +233,7 @@ impl fmt::Display for PrimitiveType { #[serde(rename = "struct", tag = "type")] pub struct StructType { /// Struct fields - fields: Vec, + fields: Vec, /// Lookup for index by field id #[serde(skip_serializing)] id_lookup: OnceCell>, @@ -261,7 +276,7 @@ impl<'de> Deserialize<'de> for StructType { } } } - let fields: Vec = + let fields: Vec = fields.ok_or_else(|| de::Error::missing_field("fields"))?; Ok(StructType::new(fields)) @@ -275,14 +290,14 @@ impl<'de> Deserialize<'de> for StructType { impl StructType { /// Creates a struct type with the given fields. - pub fn new(fields: Vec) -> Self { + pub fn new(fields: Vec) -> Self { Self { fields, id_lookup: OnceCell::default(), } } /// Get struct field with certain id - pub fn field_by_id(&self, id: i32) -> Option<&NestedField> { + pub fn field_by_id(&self, id: i32) -> Option<&NestedFieldRef> { self.field_id_to_index(id).map(|idx| &self.fields[idx]) } @@ -298,6 +313,11 @@ impl StructType { pub fn iter(&self) -> Iter { self.fields.iter() } + + /// Get fields. + pub fn fields(&self) -> &[NestedFieldRef] { + &self.fields + } } impl PartialEq for StructType { @@ -352,6 +372,9 @@ pub struct NestedField { pub write_default: Option, } +/// Reference to nested field. +pub type NestedFieldRef = Arc; + impl NestedField { /// Construct a required field. pub fn required(id: i32, name: impl ToString, field_type: Type) -> Self { @@ -443,13 +466,15 @@ impl fmt::Display for NestedField { /// Elements can be either optional or required. Element types may be any type. pub struct ListType { /// Element field of list type. - pub element_field: NestedField, + pub element_field: NestedFieldRef, } /// Module for type serialization/deserialization. pub(super) mod _serde { use crate::spec::datatypes::Type::Map; - use crate::spec::datatypes::{ListType, MapType, NestedField, PrimitiveType, StructType, Type}; + use crate::spec::datatypes::{ + ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, StructType, Type, + }; use serde_derive::{Deserialize, Serialize}; use std::borrow::Cow; @@ -466,7 +491,7 @@ pub(super) mod _serde { }, Struct { r#type: String, - fields: Cow<'a, Vec>, + fields: Cow<'a, Vec>, }, #[serde(rename_all = "kebab-case")] Map { @@ -493,7 +518,8 @@ pub(super) mod _serde { element_id, element.into_owned(), element_required, - ), + ) + .into(), }), SerdeType::Map { r#type: _, @@ -503,12 +529,13 @@ pub(super) mod _serde { value_required, value, } => Map(MapType { - key_field: NestedField::map_key_element(key_id, key.into_owned()), + key_field: NestedField::map_key_element(key_id, key.into_owned()).into(), value_field: NestedField::map_value_element( value_id, value.into_owned(), value_required, - ), + ) + .into(), }), SerdeType::Struct { r#type: _, fields } => { Self::Struct(StructType::new(fields.into_owned())) @@ -552,9 +579,9 @@ pub(super) mod _serde { /// Both map keys and map values may be any type, including nested types. pub struct MapType { /// Field for key. - pub key_field: NestedField, + pub key_field: NestedFieldRef, /// Field for value. - pub value_field: NestedField, + pub value_field: NestedFieldRef, } #[cfg(test)] @@ -598,7 +625,8 @@ mod tests { precision: 9, scale: 2, }), - )], + ) + .into()], id_lookup: OnceCell::default(), }), ) @@ -627,7 +655,8 @@ mod tests { 1, "id", Type::Primitive(PrimitiveType::Fixed(8)), - )], + ) + .into()], id_lookup: OnceCell::default(), }), ) @@ -662,8 +691,9 @@ mod tests { fields: vec![ NestedField::required(1, "id", Type::Primitive(PrimitiveType::Uuid)) .with_initial_default("0db3e2a8-9d1d-42b9-aa7b-74ebe558dceb") - .with_write_default("ec5911be-b0a7-458c-8438-c9a3e53cffae"), - NestedField::optional(2, "data", Type::Primitive(PrimitiveType::Int)), + .with_write_default("ec5911be-b0a7-458c-8438-c9a3e53cffae") + .into(), + NestedField::optional(2, "data", Type::Primitive(PrimitiveType::Int)).into(), ], id_lookup: HashMap::from([(1, 0), (2, 1)]).into(), }), @@ -725,17 +755,21 @@ mod tests { let struct_type = Type::Struct(StructType::new(vec![ NestedField::required(1, "id", Type::Primitive(PrimitiveType::Uuid)) .with_initial_default("0db3e2a8-9d1d-42b9-aa7b-74ebe558dceb") - .with_write_default("ec5911be-b0a7-458c-8438-c9a3e53cffae"), - NestedField::optional(2, "data", Type::Primitive(PrimitiveType::Int)), + .with_write_default("ec5911be-b0a7-458c-8438-c9a3e53cffae") + .into(), + NestedField::optional(2, "data", Type::Primitive(PrimitiveType::Int)).into(), NestedField::required( 3, "address", Type::Struct(StructType::new(vec![ - NestedField::required(4, "street", Type::Primitive(PrimitiveType::String)), - NestedField::optional(5, "province", Type::Primitive(PrimitiveType::String)), - NestedField::required(6, "zip", Type::Primitive(PrimitiveType::Int)), + NestedField::required(4, "street", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::optional(5, "province", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::required(6, "zip", Type::Primitive(PrimitiveType::Int)).into(), ])), - ), + ) + .into(), ])); check_type_serde(record, struct_type) @@ -759,7 +793,8 @@ mod tests { 3, Type::Primitive(PrimitiveType::String), true, - ), + ) + .into(), }), ); } @@ -780,12 +815,14 @@ mod tests { check_type_serde( record, Type::Map(MapType { - key_field: NestedField::map_key_element(4, Type::Primitive(PrimitiveType::String)), + key_field: NestedField::map_key_element(4, Type::Primitive(PrimitiveType::String)) + .into(), value_field: NestedField::map_value_element( 5, Type::Primitive(PrimitiveType::Double), false, - ), + ) + .into(), }), ); } @@ -806,12 +843,14 @@ mod tests { check_type_serde( record, Type::Map(MapType { - key_field: NestedField::map_key_element(4, Type::Primitive(PrimitiveType::Int)), + key_field: NestedField::map_key_element(4, Type::Primitive(PrimitiveType::Int)) + .into(), value_field: NestedField::map_value_element( 5, Type::Primitive(PrimitiveType::String), false, - ), + ) + .into(), }), ); } diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index 1e80e7cc6..e1c6f2c77 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -17,7 +17,17 @@ //! This module defines schema in iceberg. -use crate::spec::datatypes::{NestedField, StructType}; +use crate::error::Result; +use crate::spec::datatypes::{ + ListType, MapType, NestedFieldRef, PrimitiveType, StructType, Type, LIST_FILED_NAME, + MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME, +}; +use crate::{ensure_data_valid, Error, ErrorKind}; +use bimap::BiHashMap; +use itertools::Itertools; +use once_cell::sync::OnceCell; +use std::collections::{HashMap, HashSet}; +use std::fmt::{Display, Formatter}; const DEFAULT_SCHEMA_ID: i32 = 0; @@ -27,17 +37,27 @@ pub struct Schema { r#struct: StructType, schema_id: i32, highest_field_id: i32, + identifier_field_ids: HashSet, + + alias_to_id: BiHashMap, + id_to_field: HashMap, + + name_to_id: HashMap, + id_to_name: HashMap, + lower_case_name_to_id: OnceCell>, } /// Schema builder. pub struct SchemaBuilder { schema_id: i32, - fields: Vec, + fields: Vec, + alias_to_id: BiHashMap, + identifier_field_ids: HashSet, } impl SchemaBuilder { - /// Add fields to schem builder - pub fn with_fields(mut self, fields: impl IntoIterator) -> Self { + /// Add fields to schem builder. + pub fn with_fields(mut self, fields: impl IntoIterator) -> Self { self.fields.extend(fields.into_iter()); self } @@ -48,14 +68,105 @@ impl SchemaBuilder { self } + /// Set identifier field ids. + pub fn with_identifier_field_ids(mut self, ids: impl IntoIterator) -> Self { + self.identifier_field_ids.extend(ids); + self + } + + /// Set alias to filed id mapping. + pub fn with_alias(mut self, alias_to_id: BiHashMap) -> Self { + self.alias_to_id = alias_to_id; + self + } + /// Builds the schema. - pub fn build(self) -> Schema { + pub fn build(self) -> Result { let highest_field_id = self.fields.iter().map(|f| f.id).max().unwrap_or(0); - Schema { - r#struct: StructType::new(self.fields), + + let r#struct = StructType::new(self.fields); + let id_to_field = index_by_id(&r#struct)?; + + Self::validate_identifier_ids( + &r#struct, + &id_to_field, + self.identifier_field_ids.iter().copied(), + )?; + + let (name_to_id, id_to_name) = { + let mut index = IndexByName::default(); + visit_struct(&r#struct, &mut index)?; + index.indexes() + }; + + Ok(Schema { + r#struct, schema_id: self.schema_id, highest_field_id, + identifier_field_ids: self.identifier_field_ids, + + alias_to_id: self.alias_to_id, + id_to_field, + + name_to_id, + id_to_name, + lower_case_name_to_id: OnceCell::default(), + }) + } + + fn validate_identifier_ids( + r#struct: &StructType, + id_to_field: &HashMap, + identifier_field_ids: impl Iterator, + ) -> Result<()> { + let id_to_parent = index_parents(r#struct); + for identifier_field_id in identifier_field_ids { + let field = id_to_field.get(&identifier_field_id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add identifier field {identifier_field_id}: field does not exist" + ), + ) + })?; + ensure_data_valid!( + field.required, + "Cannot add identifier field: {} is an optional field", + field.name + ); + if let Type::Primitive(p) = field.field_type.as_ref() { + ensure_data_valid!( + !matches!(p, PrimitiveType::Double | PrimitiveType::Float), + "Cannot add identifier field {}: cannot be a float or double type", + field.name + ); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add field {} as an identifier field: not a primitive type field", + field.name + ), + )); + } + + let mut cur_field_id = identifier_field_id; + while let Some(parent) = id_to_parent.get(&cur_field_id) { + let parent_field = id_to_field + .get(parent) + .expect("Field id should not disappear."); + ensure_data_valid!( + parent_field.field_type.is_struct(), + "Cannot add field {} as an identifier field: must not be nested in {:?}", + field.name, + parent_field + ); + ensure_data_valid!(parent_field.required, "Cannot add field {} as an identifier field: must not be nested in an optional field {}", field.name, parent_field); + cur_field_id = *parent; + } } + + Ok(()) } } @@ -65,12 +176,30 @@ impl Schema { SchemaBuilder { schema_id: DEFAULT_SCHEMA_ID, fields: vec![], + identifier_field_ids: HashSet::default(), + alias_to_id: BiHashMap::default(), } } /// Get field by field id. - pub fn field_by_id(&self, field_id: i32) -> Option<&NestedField> { - self.r#struct.field_by_id(field_id) + pub fn field_by_id(&self, field_id: i32) -> Option<&NestedFieldRef> { + self.id_to_field.get(&field_id) + } + + /// Get field by field name. + /// + /// Both full name and short name could work here. + pub fn field_by_name(&self, field_name: &str) -> Option<&NestedFieldRef> { + self.name_to_id + .get(field_name) + .and_then(|id| self.field_by_id(*id)) + } + + /// Get field by alias. + pub fn field_by_alias(&self, alias: &str) -> Option<&NestedFieldRef> { + self.alias_to_id + .get_by_left(alias) + .and_then(|id| self.field_by_id(*id)) } /// Returns [`highest_field_id`]. @@ -84,23 +213,424 @@ impl Schema { pub fn schema_id(&self) -> i32 { self.schema_id } + + /// Returns [`r#struct`]. + #[inline] + pub fn as_struct(&self) -> &StructType { + &self.r#struct + } + + /// Get field id by full name. + pub fn field_id_by_name(&self, name: &str) -> Option { + self.name_to_id.get(name).copied() + } + + /// Get field id by full name. + pub fn name_by_field_id(&self, field_id: i32) -> Option<&str> { + self.id_to_name.get(&field_id).map(String::as_str) + } +} + +impl Display for Schema { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + writeln!(f, "table {{")?; + for field in self.as_struct().fields() { + writeln!(f, " {}", field)?; + } + writeln!(f, "}}") + } +} + +/// A post order schema visitor. +/// +/// For order of methods called, please refer to [`visit_schema`]. +pub trait SchemaVisitor { + /// Return type of this visitor. + type T; + + /// Called before struct field. + fn before_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> { + Ok(()) + } + /// Called after struct field. + fn after_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> { + Ok(()) + } + /// Called before list field. + fn before_list_element(&mut self, _field: &NestedFieldRef) -> Result<()> { + Ok(()) + } + /// Called after list field. + fn after_list_element(&mut self, _field: &NestedFieldRef) -> Result<()> { + Ok(()) + } + /// Called before map key field. + fn before_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> { + Ok(()) + } + /// Called after map key field. + fn after_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> { + Ok(()) + } + /// Called before map value field. + fn before_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> { + Ok(()) + } + /// Called after map value field. + fn after_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> { + Ok(()) + } + + /// Called after schema's type visited. + fn schema(&mut self, schema: &Schema, value: Self::T) -> Result; + /// Called after struct's field type visited. + fn field(&mut self, field: &NestedFieldRef, value: Self::T) -> Result; + /// Called after struct's fields visited. + fn r#struct(&mut self, r#struct: &StructType, results: Vec) -> Result; + /// Called after list fields visited. + fn list(&mut self, list: &ListType, value: Self::T) -> Result; + /// Called after map's key and value fields visited. + fn map(&mut self, map: &MapType, key_value: Self::T, value: Self::T) -> Result; + /// Called when see a primitive type. + fn primitive(&mut self, p: &PrimitiveType) -> Result; +} + +/// Visiting a type in post order. +pub fn visit_type(r#type: &Type, visitor: &mut V) -> Result { + match r#type { + Type::Primitive(p) => visitor.primitive(p), + Type::List(list) => { + visitor.before_list_element(&list.element_field)?; + let value = visit_type(&list.element_field.field_type, visitor)?; + visitor.after_list_element(&list.element_field)?; + visitor.list(list, value) + } + Type::Map(map) => { + let key_result = { + visitor.before_map_key(&map.key_field)?; + let ret = visit_type(&map.key_field.field_type, visitor)?; + visitor.after_map_key(&map.key_field)?; + ret + }; + + let value_result = { + visitor.before_map_value(&map.value_field)?; + let ret = visit_type(&map.value_field.field_type, visitor)?; + visitor.after_map_value(&map.value_field)?; + ret + }; + + visitor.map(map, key_result, value_result) + } + Type::Struct(s) => visit_struct(s, visitor), + } +} + +/// Visit struct type in post order. +pub fn visit_struct(s: &StructType, visitor: &mut V) -> Result { + let mut results = Vec::with_capacity(s.fields().len()); + for field in s.fields() { + visitor.before_struct_field(field)?; + let result = visit_type(&field.field_type, visitor)?; + visitor.after_struct_field(field)?; + let result = visitor.field(field, result)?; + results.push(result); + } + + visitor.r#struct(s, results) +} + +/// Visit schema in post order. +pub fn visit_schema(schema: &Schema, visitor: &mut V) -> Result { + let result = visit_struct(&schema.r#struct, visitor)?; + visitor.schema(schema, result) +} + +/// Creates an field id to field map. +pub fn index_by_id(r#struct: &StructType) -> Result> { + struct IndexById(HashMap); + + impl SchemaVisitor for IndexById { + type T = (); + + fn schema(&mut self, _schema: &Schema, _value: ()) -> Result<()> { + Ok(()) + } + + fn field(&mut self, field: &NestedFieldRef, _value: ()) -> Result<()> { + self.0.insert(field.id, field.clone()); + Ok(()) + } + + fn r#struct(&mut self, _struct: &StructType, _results: Vec) -> Result { + Ok(()) + } + + fn list(&mut self, list: &ListType, _value: Self::T) -> Result { + self.0 + .insert(list.element_field.id, list.element_field.clone()); + Ok(()) + } + + fn map(&mut self, map: &MapType, _key_value: Self::T, _value: Self::T) -> Result { + self.0.insert(map.key_field.id, map.key_field.clone()); + self.0.insert(map.value_field.id, map.value_field.clone()); + Ok(()) + } + + fn primitive(&mut self, _: &PrimitiveType) -> Result { + Ok(()) + } + } + + let mut index = IndexById(HashMap::new()); + visit_struct(r#struct, &mut index)?; + Ok(index.0) +} + +/// Creates a field id to parent field id map. +pub fn index_parents(r#struct: &StructType) -> HashMap { + struct IndexByParent { + parents: Vec, + result: HashMap, + } + + impl SchemaVisitor for IndexByParent { + type T = (); + + fn before_struct_field(&mut self, field: &NestedFieldRef) -> Result<()> { + self.parents.push(field.id); + Ok(()) + } + + fn after_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> { + self.parents.pop(); + Ok(()) + } + + fn before_list_element(&mut self, field: &NestedFieldRef) -> Result<()> { + self.parents.push(field.id); + Ok(()) + } + + fn after_list_element(&mut self, _field: &NestedFieldRef) -> Result<()> { + self.parents.pop(); + Ok(()) + } + + fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> { + self.parents.push(field.id); + Ok(()) + } + + fn after_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> { + self.parents.pop(); + Ok(()) + } + + fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> { + self.parents.push(field.id); + Ok(()) + } + + fn after_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> { + self.parents.pop(); + Ok(()) + } + + fn schema(&mut self, _schema: &Schema, _value: Self::T) -> Result { + Ok(()) + } + + fn field(&mut self, field: &NestedFieldRef, _value: Self::T) -> Result { + if let Some(parent) = self.parents.last().copied() { + self.result.insert(field.id, parent); + } + Ok(()) + } + + fn r#struct(&mut self, _struct: &StructType, _results: Vec) -> Result { + Ok(()) + } + + fn list(&mut self, _list: &ListType, _value: Self::T) -> Result { + Ok(()) + } + + fn map(&mut self, _map: &MapType, _key_value: Self::T, _value: Self::T) -> Result { + Ok(()) + } + + fn primitive(&mut self, _p: &PrimitiveType) -> Result { + Ok(()) + } + } + + let mut index = IndexByParent { + parents: vec![], + result: HashMap::new(), + }; + visit_struct(r#struct, &mut index).unwrap(); + index.result +} + +#[derive(Default)] +struct IndexByName { + // Maybe radix tree is better here? + name_to_id: HashMap, + short_name_to_id: HashMap, + + field_names: Vec, + short_field_names: Vec, +} + +impl IndexByName { + fn add_field(&mut self, name: &str, field_id: i32) -> Result<()> { + let full_name = self + .field_names + .iter() + .map(String::as_str) + .chain(vec![name]) + .join("."); + if let Some(existing_field_id) = self.name_to_id.get(full_name.as_str()) { + return Err(Error::new(ErrorKind::DataInvalid, format!("Invalid schema: multiple fields for name {full_name}: {field_id} and {existing_field_id}"))); + } else { + self.name_to_id.insert(full_name, field_id); + } + + let full_short_name = self + .short_field_names + .iter() + .map(String::as_str) + .chain(vec![name]) + .join("."); + self.short_name_to_id + .entry(full_short_name) + .or_insert_with(|| field_id); + Ok(()) + } + + /// Returns two indexes: full name to field id, and id to full name. + /// + /// In the first index, short names are returned. + /// In second index, short names are not returned. + pub fn indexes(mut self) -> (HashMap, HashMap) { + self.short_name_to_id.reserve(self.name_to_id.len()); + for (name, id) in &self.name_to_id { + self.short_name_to_id.insert(name.clone(), *id); + } + + let id_to_name = self.name_to_id.into_iter().map(|e| (e.1, e.0)).collect(); + (self.short_name_to_id, id_to_name) + } +} + +impl SchemaVisitor for IndexByName { + type T = (); + + fn before_struct_field(&mut self, field: &NestedFieldRef) -> Result<()> { + self.field_names.push(field.name.to_string()); + self.short_field_names.push(field.name.to_string()); + Ok(()) + } + + fn after_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> { + self.field_names.pop(); + self.short_field_names.pop(); + Ok(()) + } + + fn before_list_element(&mut self, field: &NestedFieldRef) -> Result<()> { + self.field_names.push(field.name.clone()); + if !field.field_type.is_struct() { + self.short_field_names.push(field.name.to_string()); + } + + Ok(()) + } + + fn after_list_element(&mut self, field: &NestedFieldRef) -> Result<()> { + self.field_names.pop(); + if !field.field_type.is_struct() { + self.short_field_names.pop(); + } + + Ok(()) + } + + fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> { + self.before_struct_field(field) + } + + fn after_map_key(&mut self, field: &NestedFieldRef) -> Result<()> { + self.after_struct_field(field) + } + + fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> { + self.field_names.push(field.name.to_string()); + if !field.field_type.is_struct() { + self.short_field_names.push(field.name.to_string()); + } + Ok(()) + } + + fn after_map_value(&mut self, field: &NestedFieldRef) -> Result<()> { + self.field_names.pop(); + if !field.field_type.is_struct() { + self.short_field_names.pop(); + } + + Ok(()) + } + + fn schema(&mut self, _schema: &Schema, _value: Self::T) -> Result { + Ok(()) + } + + fn field(&mut self, field: &NestedFieldRef, _value: Self::T) -> Result { + self.add_field(field.name.as_str(), field.id) + } + + fn r#struct(&mut self, _struct: &StructType, _results: Vec) -> Result { + Ok(()) + } + + fn list(&mut self, list: &ListType, _value: Self::T) -> Result { + self.add_field(LIST_FILED_NAME, list.element_field.id) + } + + fn map(&mut self, map: &MapType, _key_value: Self::T, _value: Self::T) -> Result { + self.add_field(MAP_KEY_FIELD_NAME, map.key_field.id)?; + self.add_field(MAP_VALUE_FIELD_NAME, map.value_field.id) + } + + fn primitive(&mut self, _p: &PrimitiveType) -> Result { + Ok(()) + } } #[cfg(test)] mod tests { - use crate::spec::datatypes::{NestedField, PrimitiveType, Type}; + use crate::spec::datatypes::Type::{List, Map, Primitive, Struct}; + use crate::spec::datatypes::{ + ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, StructType, Type, + }; use crate::spec::schema::Schema; + use std::collections::HashMap; #[test] fn test_construct_schema() { - let field1 = NestedField::required(1, "f1", Type::Primitive(PrimitiveType::Boolean)); - let field2 = NestedField::optional(2, "f2", Type::Primitive(PrimitiveType::Int)); + let field1: NestedFieldRef = + NestedField::required(1, "f1", Type::Primitive(PrimitiveType::Boolean)).into(); + let field2: NestedFieldRef = + NestedField::optional(2, "f2", Type::Primitive(PrimitiveType::Int)).into(); let schema = Schema::builder() .with_fields(vec![field1.clone()]) .with_fields(vec![field2.clone()]) .with_schema_id(3) - .build(); + .build() + .unwrap(); assert_eq!(3, schema.schema_id()); assert_eq!(2, schema.highest_field_id()); @@ -108,4 +638,428 @@ mod tests { assert_eq!(Some(&field2), schema.field_by_id(2)); assert_eq!(None, schema.field_by_id(3)); } + + fn table_schema_simple() -> Schema { + Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![2]) + .with_fields(vec![ + NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap() + } + + fn table_schema_nested() -> Schema { + Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![2]) + .with_fields(vec![ + NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + NestedField::required( + 4, + "qux", + Type::List(ListType { + element_field: NestedField::list_element( + 5, + Type::Primitive(PrimitiveType::String), + true, + ) + .into(), + }), + ) + .into(), + NestedField::required( + 6, + "quux", + Type::Map(MapType { + key_field: NestedField::map_key_element( + 7, + Type::Primitive(PrimitiveType::String), + ) + .into(), + value_field: NestedField::map_value_element( + 8, + Type::Map(MapType { + key_field: NestedField::map_key_element( + 9, + Type::Primitive(PrimitiveType::String), + ) + .into(), + value_field: NestedField::map_value_element( + 10, + Type::Primitive(PrimitiveType::Int), + true, + ) + .into(), + }), + true, + ) + .into(), + }), + ) + .into(), + NestedField::required( + 11, + "location", + Type::List(ListType { + element_field: NestedField::list_element( + 12, + Type::Struct(StructType::new(vec![ + NestedField::optional( + 13, + "latitude", + Type::Primitive(PrimitiveType::Float), + ) + .into(), + NestedField::optional( + 14, + "longitude", + Type::Primitive(PrimitiveType::Float), + ) + .into(), + ])), + true, + ) + .into(), + }), + ) + .into(), + NestedField::optional( + 15, + "person", + Type::Struct(StructType::new(vec![ + NestedField::optional(16, "name", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::required(17, "age", Type::Primitive(PrimitiveType::Int)) + .into(), + ])), + ) + .into(), + ]) + .build() + .unwrap() + } + + #[test] + fn test_schema_display() { + let expected_str = r#" +table { + 1: foo: optional string + 2: bar: required int + 3: baz: optional boolean +} +"#; + + assert_eq!(expected_str, format!("\n{}", table_schema_simple())); + } + + #[test] + fn test_schema_build_failed_on_duplicate_names() { + let ret = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![1]) + .with_fields(vec![ + NestedField::required(1, "foo", Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "bar", Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Primitive(PrimitiveType::Boolean)).into(), + NestedField::optional(4, "baz", Primitive(PrimitiveType::Boolean)).into(), + ]) + .build(); + + assert!(ret + .unwrap_err() + .message() + .contains("Invalid schema: multiple fields for name baz")); + } + + #[test] + fn test_schema_index_by_name() { + let expected_name_to_id = HashMap::from( + [ + ("foo", 1), + ("bar", 2), + ("baz", 3), + ("qux", 4), + ("qux.element", 5), + ("quux", 6), + ("quux.key", 7), + ("quux.value", 8), + ("quux.value.key", 9), + ("quux.value.value", 10), + ("location", 11), + ("location.element", 12), + ("location.element.latitude", 13), + ("location.element.longitude", 14), + ("location.latitude", 13), + ("location.longitude", 14), + ("person", 15), + ("person.name", 16), + ("person.age", 17), + ] + .map(|e| (e.0.to_string(), e.1)), + ); + + let schema = table_schema_nested(); + assert_eq!(&expected_name_to_id, &schema.name_to_id); + } + + #[test] + fn test_schema_find_column_name() { + let expected_column_name = HashMap::from([ + (1, "foo"), + (2, "bar"), + (3, "baz"), + (4, "qux"), + (5, "qux.element"), + (6, "quux"), + (7, "quux.key"), + (8, "quux.value"), + (9, "quux.value.key"), + (10, "quux.value.value"), + (11, "location"), + (12, "location.element"), + (13, "location.element.latitude"), + (14, "location.element.longitude"), + ]); + + let schema = table_schema_nested(); + for (id, name) in expected_column_name { + assert_eq!( + Some(name), + schema.name_by_field_id(id), + "Column name for field id {} not match.", + id + ); + } + } + + #[test] + fn test_schema_find_column_name_not_found() { + let schema = table_schema_nested(); + + assert!(schema.name_by_field_id(99).is_none()); + } + + #[test] + fn test_schema_find_column_name_by_id_simple() { + let expected_id_to_name = HashMap::from([(1, "foo"), (2, "bar"), (3, "baz")]); + + let schema = table_schema_simple(); + + for (id, name) in expected_id_to_name { + assert_eq!( + Some(name), + schema.name_by_field_id(id), + "Column name for field id {} not match.", + id + ); + } + } + + #[test] + fn test_schema_find_simple() { + let schema = table_schema_simple(); + + assert_eq!( + Some(schema.r#struct.fields()[0].clone()), + schema.field_by_id(1).cloned() + ); + assert_eq!( + Some(schema.r#struct.fields()[1].clone()), + schema.field_by_id(2).cloned() + ); + assert_eq!( + Some(schema.r#struct.fields()[2].clone()), + schema.field_by_id(3).cloned() + ); + + assert!(schema.field_by_id(4).is_none()); + assert!(schema.field_by_name("non exist").is_none()); + } + + #[test] + fn test_schema_find_nested() { + let expected_id_to_field: HashMap = HashMap::from([ + ( + 1, + NestedField::optional(1, "foo", Primitive(PrimitiveType::String)), + ), + ( + 2, + NestedField::required(2, "bar", Primitive(PrimitiveType::Int)), + ), + ( + 3, + NestedField::optional(3, "baz", Primitive(PrimitiveType::Boolean)), + ), + ( + 4, + NestedField::required( + 4, + "qux", + Type::List(ListType { + element_field: NestedField::list_element( + 5, + Type::Primitive(PrimitiveType::String), + true, + ) + .into(), + }), + ), + ), + ( + 5, + NestedField::required(5, "element", Primitive(PrimitiveType::String)), + ), + ( + 6, + NestedField::required( + 6, + "quux", + Map(MapType { + key_field: NestedField::map_key_element( + 7, + Primitive(PrimitiveType::String), + ) + .into(), + value_field: NestedField::map_value_element( + 8, + Map(MapType { + key_field: NestedField::map_key_element( + 9, + Primitive(PrimitiveType::String), + ) + .into(), + value_field: NestedField::map_value_element( + 10, + Primitive(PrimitiveType::Int), + true, + ) + .into(), + }), + true, + ) + .into(), + }), + ), + ), + ( + 7, + NestedField::required(7, "key", Primitive(PrimitiveType::String)), + ), + ( + 8, + NestedField::required( + 8, + "value", + Map(MapType { + key_field: NestedField::map_key_element( + 9, + Primitive(PrimitiveType::String), + ) + .into(), + value_field: NestedField::map_value_element( + 10, + Primitive(PrimitiveType::Int), + true, + ) + .into(), + }), + ), + ), + ( + 9, + NestedField::required(9, "key", Primitive(PrimitiveType::String)), + ), + ( + 10, + NestedField::required(10, "value", Primitive(PrimitiveType::Int)), + ), + ( + 11, + NestedField::required( + 11, + "location", + List(ListType { + element_field: NestedField::list_element( + 12, + Struct(StructType::new(vec![ + NestedField::optional( + 13, + "latitude", + Primitive(PrimitiveType::Float), + ) + .into(), + NestedField::optional( + 14, + "longitude", + Primitive(PrimitiveType::Float), + ) + .into(), + ])), + true, + ) + .into(), + }), + ), + ), + ( + 12, + NestedField::list_element( + 12, + Struct(StructType::new(vec![ + NestedField::optional(13, "latitude", Primitive(PrimitiveType::Float)) + .into(), + NestedField::optional(14, "longitude", Primitive(PrimitiveType::Float)) + .into(), + ])), + true, + ), + ), + ( + 13, + NestedField::optional(13, "latitude", Primitive(PrimitiveType::Float)), + ), + ( + 14, + NestedField::optional(14, "longitude", Primitive(PrimitiveType::Float)), + ), + ( + 15, + NestedField::optional( + 15, + "person", + Type::Struct(StructType::new(vec![ + NestedField::optional(16, "name", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::required(17, "age", Type::Primitive(PrimitiveType::Int)) + .into(), + ])), + ), + ), + ( + 16, + NestedField::optional(16, "name", Type::Primitive(PrimitiveType::String)), + ), + ( + 17, + NestedField::required(17, "age", Type::Primitive(PrimitiveType::Int)), + ), + ]); + + let schema = table_schema_nested(); + for (id, field) in expected_id_to_field { + assert_eq!( + Some(&field), + schema.field_by_id(id).map(|f| f.as_ref()), + "Field for {} not match.", + id + ); + } + } } From cd2e32032884268a4278ab315c8a1a6afb088d89 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Wed, 9 Aug 2023 19:22:30 +0800 Subject: [PATCH 2/2] Resolve conflicts --- crates/iceberg/src/spec/datatypes.rs | 5 -- crates/iceberg/src/spec/values.rs | 79 ++++++++-------------------- 2 files changed, 21 insertions(+), 63 deletions(-) diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index 6c9f3723c..9b4c986a8 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -22,7 +22,6 @@ use ::serde::de::{MapAccess, Visitor}; use serde::de::{Error, IntoDeserializer}; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; use std::cell::OnceCell; -use std::slice::Iter; use std::sync::Arc; use std::{collections::HashMap, fmt, ops::Index}; @@ -309,10 +308,6 @@ impl StructType { .get(&field_id) .copied() } - /// Returns an iteratorr over the struct fields - pub fn iter(&self) -> Iter { - self.fields.iter() - } /// Get fields. pub fn fields(&self) -> &[NestedFieldRef] { diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index 771ffd88e..5d81f95cc 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -407,8 +407,8 @@ impl Literal { }, Type::Struct(schema) => { if let JsonValue::Object(mut object) = value { - Ok(Some(Literal::Struct(Struct::from_iter(schema.iter().map( - |field| { + Ok(Some(Literal::Struct(Struct::from_iter( + schema.fields().iter().map(|field| { ( field.id, object.remove(&field.id.to_string()).and_then(|value| { @@ -423,8 +423,8 @@ impl Literal { }), field.name.clone(), ) - }, - ))))) + }), + )))) } else { Err(Error::new( crate::ErrorKind::DataInvalid, @@ -796,33 +796,9 @@ mod tests { (3, None, "address".to_string()), ])), &Type::Struct(StructType::new(vec![ - NestedField { - id: 1, - name: "id".to_string(), - required: true, - field_type: Box::new(Type::Primitive(PrimitiveType::Int)), - doc: None, - initial_default: None, - write_default: None, - }, - NestedField { - id: 2, - name: "name".to_string(), - required: false, - field_type: Box::new(Type::Primitive(PrimitiveType::String)), - doc: None, - initial_default: None, - write_default: None, - }, - NestedField { - id: 3, - name: "address".to_string(), - required: false, - field_type: Box::new(Type::Primitive(PrimitiveType::String)), - doc: None, - initial_default: None, - write_default: None, - }, + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::optional(3, "address", Type::Primitive(PrimitiveType::String)).into(), ])), ); } @@ -840,15 +816,12 @@ mod tests { None, ]), &Type::List(ListType { - element_field: NestedField { - id: 0, - name: "".to_string(), - required: true, - field_type: Box::new(Type::Primitive(PrimitiveType::Int)), - doc: None, - initial_default: None, - write_default: None, - }, + element_field: NestedField::list_element( + 0, + Type::Primitive(PrimitiveType::Int), + true, + ) + .into(), }), ); } @@ -874,24 +847,14 @@ mod tests { ), ])), &Type::Map(MapType { - key_field: NestedField { - id: 0, - name: "key".to_string(), - required: true, - field_type: Box::new(Type::Primitive(PrimitiveType::String)), - doc: None, - initial_default: None, - write_default: None, - }, - value_field: NestedField { - id: 1, - name: "value".to_string(), - required: true, - field_type: Box::new(Type::Primitive(PrimitiveType::Int)), - doc: None, - initial_default: None, - write_default: None, - }, + key_field: NestedField::map_key_element(0, Type::Primitive(PrimitiveType::String)) + .into(), + value_field: NestedField::map_value_element( + 1, + Type::Primitive(PrimitiveType::Int), + true, + ) + .into(), }), ); }