diff --git a/crates/iceberg/src/avro/schema.rs b/crates/iceberg/src/avro/schema.rs index eb24ab0c3..f8420d460 100644 --- a/crates/iceberg/src/avro/schema.rs +++ b/crates/iceberg/src/avro/schema.rs @@ -32,6 +32,11 @@ use itertools::{Either, Itertools}; use serde_json::{Number, Value}; const FILED_ID_PROP: &str = "field-id"; +const UUID_BYTES: usize = 16; +const UUID_LOGICAL_TYPE: &str = "uuid"; +// # TODO: https://github.com/apache/iceberg-rust/issues/86 +// This const may better to maintain in avro-rs. +const LOGICAL_TYPE: &str = "logicalType"; struct SchemaToAvroSchema { schema: String, @@ -98,26 +103,13 @@ impl SchemaVisitor for SchemaToAvroSchema { _struct: &StructType, results: Vec, ) -> Result { - let avro_fields: Vec = - results.into_iter().map(|r| r.unwrap_right()).collect(); + let avro_fields = results.into_iter().map(|r| r.unwrap_right()).collect_vec(); - let lookup = BTreeMap::from_iter( - avro_fields - .iter() - .enumerate() - .map(|(i, field)| (field.name.clone(), i)), - ); - - Ok(Either::Left(AvroSchema::Record(RecordSchema { + Ok(Either::Left( // The name of this record schema should be determined later, by schema name or field // name, here we use a temporary placeholder to do it. - name: Name::new("null")?, - aliases: None, - doc: None, - fields: avro_fields, - lookup, - attributes: Default::default(), - }))) + avro_record_schema("null", avro_fields)?, + )) } fn list(&mut self, list: &ListType, value: AvroSchemaOrField) -> Result { @@ -172,7 +164,7 @@ impl SchemaVisitor for SchemaToAvroSchema { let value_field = { let mut field = AvroRecordField { - name: map.key_field.name.clone(), + name: map.value_field.name.clone(), doc: None, aliases: None, default: None, @@ -188,16 +180,13 @@ impl SchemaVisitor for SchemaToAvroSchema { field }; - let item_avro_schema = AvroSchema::Record(RecordSchema { - name: Name::from(format!("k{}_v{}", map.key_field.id, map.value_field.id).as_str()), - aliases: None, - doc: None, - fields: vec![key_field, value_field], - lookup: Default::default(), - attributes: Default::default(), - }); + let fields = vec![key_field, value_field]; + let item_avro_schema = avro_record_schema( + format!("k{}_v{}", map.key_field.id, map.value_field.id).as_str(), + fields, + )?; - Ok(Either::Left(item_avro_schema)) + Ok(Either::Left(AvroSchema::Array(item_avro_schema.into()))) } } @@ -213,8 +202,8 @@ impl SchemaVisitor for SchemaToAvroSchema { PrimitiveType::Timestamp => AvroSchema::TimestampMicros, PrimitiveType::Timestamptz => AvroSchema::TimestampMicros, PrimitiveType::String => AvroSchema::String, - PrimitiveType::Uuid => AvroSchema::Uuid, - PrimitiveType::Fixed(len) => avro_fixed_schema((*len) as usize)?, + PrimitiveType::Uuid => avro_fixed_schema(UUID_BYTES, Some(UUID_LOGICAL_TYPE))?, + PrimitiveType::Fixed(len) => avro_fixed_schema((*len) as usize, None)?, PrimitiveType::Binary => AvroSchema::Bytes, PrimitiveType::Decimal { precision, scale } => { avro_decimal_schema(*precision as usize, *scale as usize)? @@ -233,13 +222,38 @@ pub(crate) fn schema_to_avro_schema(name: impl ToString, schema: &Schema) -> Res visit_schema(schema, &mut converter).map(Either::unwrap_left) } -pub(crate) fn avro_fixed_schema(len: usize) -> Result { +fn avro_record_schema(name: &str, fields: Vec) -> Result { + let lookup = fields + .iter() + .enumerate() + .map(|f| (f.1.name.clone(), f.0)) + .collect(); + + Ok(AvroSchema::Record(RecordSchema { + name: Name::new(name)?, + aliases: None, + doc: None, + fields, + lookup, + attributes: Default::default(), + })) +} + +pub(crate) fn avro_fixed_schema(len: usize, logical_type: Option<&str>) -> Result { + let attributes = if let Some(logical_type) = logical_type { + BTreeMap::from([( + LOGICAL_TYPE.to_string(), + Value::String(logical_type.to_string()), + )]) + } else { + Default::default() + }; Ok(AvroSchema::Fixed(FixedSchema { name: Name::new(format!("fixed_{len}").as_str())?, aliases: None, doc: None, size: len, - attributes: Default::default(), + attributes, })) } @@ -247,9 +261,7 @@ pub(crate) fn avro_decimal_schema(precision: usize, scale: usize) -> Result Type::Primitive(PrimitiveType::Date), AvroSchema::TimeMicros => Type::Primitive(PrimitiveType::Time), AvroSchema::TimestampMicros => Type::Primitive(PrimitiveType::Timestamp), - AvroSchema::Uuid => Type::Primitive(PrimitiveType::Uuid), AvroSchema::Boolean => Type::Primitive(PrimitiveType::Boolean), AvroSchema::Int => Type::Primitive(PrimitiveType::Int), AvroSchema::Long => Type::Primitive(PrimitiveType::Long), AvroSchema::Float => Type::Primitive(PrimitiveType::Float), AvroSchema::Double => Type::Primitive(PrimitiveType::Double), AvroSchema::String | AvroSchema::Enum(_) => Type::Primitive(PrimitiveType::String), - AvroSchema::Fixed(fixed) => Type::Primitive(PrimitiveType::Fixed(fixed.size as u64)), + AvroSchema::Fixed(fixed) => { + if let Some(logical_type) = fixed.attributes.get(LOGICAL_TYPE) { + let logical_type = logical_type.as_str().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "logicalType in attributes of avro schema is not a string type", + ) + })?; + match logical_type { + UUID_LOGICAL_TYPE => Type::Primitive(PrimitiveType::Uuid), + ty => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Logical type {ty} is not support in iceberg primitive type.", + ), + )) + } + } + } else { + Type::Primitive(PrimitiveType::Fixed(fixed.size as u64)) + } + } AvroSchema::Bytes => Type::Primitive(PrimitiveType::Binary), AvroSchema::Null => return Ok(None), _ => { @@ -503,7 +536,6 @@ mod tests { )) .unwrap(); - println!("Input is {input}"); AvroSchema::parse_str(input.as_str()).unwrap() } diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index d5fc3eafc..c041710e9 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -333,6 +333,8 @@ pub struct StructType { /// Lookup for index by field id #[serde(skip_serializing)] id_lookup: OnceLock>, + #[serde(skip_serializing)] + name_lookup: OnceLock>, } impl<'de> Deserialize<'de> for StructType { @@ -390,8 +392,10 @@ impl StructType { Self { fields, id_lookup: OnceLock::new(), + name_lookup: OnceLock::new(), } } + /// Get struct field with certain id pub fn field_by_id(&self, id: i32) -> Option<&NestedFieldRef> { self.field_id_to_index(id).map(|idx| &self.fields[idx]) @@ -406,6 +410,25 @@ impl StructType { .copied() } + /// Get struct field with certain field name + pub fn field_by_name(&self, name: &str) -> Option<&NestedFieldRef> { + self.field_name_to_index(name).map(|idx| &self.fields[idx]) + } + + fn field_name_to_index(&self, name: &str) -> Option { + self.name_lookup + .get_or_init(|| { + HashMap::from_iter( + self.fields + .iter() + .enumerate() + .map(|(i, x)| (x.name.clone(), i)), + ) + }) + .get(name) + .copied() + } + /// Get fields. pub fn fields(&self) -> &[NestedFieldRef] { &self.fields @@ -773,6 +796,7 @@ mod tests { ) .into()], id_lookup: OnceLock::default(), + name_lookup: OnceLock::default(), }), ) } @@ -803,6 +827,7 @@ mod tests { ) .into()], id_lookup: OnceLock::default(), + name_lookup: OnceLock::default(), }), ) } @@ -845,6 +870,7 @@ mod tests { NestedField::optional(2, "data", Type::Primitive(PrimitiveType::Int)).into(), ], id_lookup: HashMap::from([(1, 0), (2, 1)]).into(), + name_lookup: HashMap::from([("id".to_string(), 0), ("data".to_string(), 1)]).into(), }), ) } diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index d924bd512..bd9f2e848 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -34,6 +34,9 @@ use uuid::Uuid; use crate::{Error, ErrorKind}; use super::datatypes::{PrimitiveType, Type}; +use super::MAX_DECIMAL_PRECISION; + +pub use _serde::RawLiteral; /// Values present in iceberg type #[derive(Clone, Debug, PartialEq, Hash, Eq, PartialOrd, Ord)] @@ -64,9 +67,8 @@ pub enum PrimitiveLiteral { Fixed(Vec), /// Binary value (without length) Binary(Vec), - /// Stores unscaled value as two’s-complement big-endian binary, - /// using the minimum number of bytes for the value - Decimal(Decimal), + /// Stores unscaled value as big int. According to iceberg spec, the precision must less than 38(`MAX_DECIMAL_PRECISION`) , so i128 is suit here. + Decimal(i128), } /// Values present in iceberg type @@ -420,7 +422,7 @@ impl Literal { } /// Creates a decimal literal. - pub fn decimal(decimal: Decimal) -> Self { + pub fn decimal(decimal: i128) -> Self { Self::Primitive(PrimitiveLiteral::Decimal(decimal)) } @@ -431,7 +433,7 @@ impl Literal { /// ```rust /// use rust_decimal::Decimal; /// use iceberg::spec::Literal; - /// let t1 = Literal::decimal(Decimal::new(12345, 2)); + /// let t1 = Literal::decimal(12345); /// let t2 = Literal::decimal_from_str("123.45").unwrap(); /// /// assert_eq!(t1, t2); @@ -440,7 +442,7 @@ impl Literal { let decimal = Decimal::from_str_exact(s.as_ref()).map_err(|e| { Error::new(ErrorKind::DataInvalid, "Can't parse decimal.").with_source(e) })?; - Ok(Self::decimal(decimal)) + Ok(Self::decimal(decimal.mantissa())) } } @@ -620,6 +622,36 @@ impl Struct { } } +/// An iterator that moves out of a struct. +pub struct StructValueIntoIter { + null_bitmap: bitvec::boxed::IntoIter, + fields: std::vec::IntoIter, +} + +impl Iterator for StructValueIntoIter { + type Item = Option; + + fn next(&mut self) -> Option { + match (self.null_bitmap.next(), self.fields.next()) { + (Some(null), Some(value)) => Some(if null { None } else { Some(value) }), + _ => None, + } + } +} + +impl IntoIterator for Struct { + type Item = Option; + + type IntoIter = StructValueIntoIter; + + fn into_iter(self) -> Self::IntoIter { + StructValueIntoIter { + null_bitmap: self.null_bitmap.into_iter(), + fields: self.fields.into_iter(), + } + } +} + impl FromIterator<(i32, Option, String)> for Struct { fn from_iter, String)>>(iter: I) -> Self { let mut fields = Vec::new(); @@ -893,9 +925,9 @@ impl Literal { PrimitiveLiteral::Binary(_) => Type::Primitive(PrimitiveType::Binary), PrimitiveLiteral::String(_) => Type::Primitive(PrimitiveType::String), PrimitiveLiteral::UUID(_) => Type::Primitive(PrimitiveType::Uuid), - PrimitiveLiteral::Decimal(dec) => Type::Primitive(PrimitiveType::Decimal { - precision: 38, - scale: dec.scale(), + PrimitiveLiteral::Decimal(_) => Type::Primitive(PrimitiveType::Decimal { + precision: MAX_DECIMAL_PRECISION, + scale: 0, }), }, _ => unimplemented!(), @@ -996,10 +1028,593 @@ mod timestamptz { } } +mod _serde { + use std::collections::BTreeMap; + + use crate::{ + spec::{PrimitiveType, Type, MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME}, + Error, ErrorKind, + }; + + use super::{Literal, PrimitiveLiteral}; + use serde::{ + de::Visitor, + ser::{SerializeMap, SerializeSeq, SerializeStruct}, + Deserialize, Serialize, + }; + use serde_bytes::ByteBuf; + use serde_derive::Deserialize as DeserializeDerive; + use serde_derive::Serialize as SerializeDerive; + + #[derive(SerializeDerive, DeserializeDerive, Debug)] + #[serde(transparent)] + /// Raw literal representation used for serde. The serialize way is used for Avro serializer. + pub struct RawLiteral(RawLiteralEnum); + + impl RawLiteral { + /// Covert literal to raw literal. + pub fn try_from(literal: Literal, ty: &Type) -> Result { + Ok(Self(RawLiteralEnum::try_from(literal, ty)?)) + } + + /// Convert raw literal to literal. + pub fn try_into(self, ty: &Type) -> Result, Error> { + self.0.try_into(ty) + } + } + + #[derive(SerializeDerive, Clone, Debug)] + #[serde(untagged)] + enum RawLiteralEnum { + Null, + Boolean(bool), + Int(i32), + Long(i64), + Float(f32), + Double(f64), + String(String), + Bytes(ByteBuf), + List(List), + StringMap(StringMap), + Record(Record), + } + + #[derive(Clone, Debug)] + struct Record { + required: Vec<(String, RawLiteralEnum)>, + optional: Vec<(String, Option)>, + } + + impl Serialize for Record { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let len = self.required.len() + self.optional.len(); + let mut record = serializer.serialize_struct("", len)?; + for (k, v) in &self.required { + record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?; + } + for (k, v) in &self.optional { + record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?; + } + record.end() + } + } + + #[derive(Clone, Debug)] + struct List { + list: Vec>, + required: bool, + } + + impl Serialize for List { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut seq = serializer.serialize_seq(Some(self.list.len()))?; + for value in &self.list { + if self.required { + seq.serialize_element(value.as_ref().ok_or_else(|| { + serde::ser::Error::custom( + "List element is required, element cannot be null", + ) + })?)?; + } else { + seq.serialize_element(&value)?; + } + } + seq.end() + } + } + + #[derive(Clone, Debug)] + struct StringMap { + raw: Vec<(String, Option)>, + required: bool, + } + + impl Serialize for StringMap { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut map = serializer.serialize_map(Some(self.raw.len()))?; + for (k, v) in &self.raw { + if self.required { + map.serialize_entry( + k, + v.as_ref().ok_or_else(|| { + serde::ser::Error::custom( + "Map element is required, element cannot be null", + ) + })?, + )?; + } else { + map.serialize_entry(k, v)?; + } + } + map.end() + } + } + + impl<'de> Deserialize<'de> for RawLiteralEnum { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct RawLiteralVisitor; + impl<'de> Visitor<'de> for RawLiteralVisitor { + type Value = RawLiteralEnum; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("expect") + } + + fn visit_bool(self, v: bool) -> Result + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Boolean(v)) + } + + fn visit_i32(self, v: i32) -> Result + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Int(v)) + } + + fn visit_i64(self, v: i64) -> Result + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Long(v)) + } + + /// Used in json + fn visit_u64(self, v: u64) -> Result + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Long(v as i64)) + } + + fn visit_f32(self, v: f32) -> Result + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Float(v)) + } + + fn visit_f64(self, v: f64) -> Result + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Double(v)) + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::String(v.to_string())) + } + + fn visit_bytes(self, v: &[u8]) -> Result + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Bytes(ByteBuf::from(v))) + } + + fn visit_borrowed_str(self, v: &'de str) -> Result + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::String(v.to_string())) + } + + fn visit_unit(self) -> Result + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Null) + } + + fn visit_map(self, mut map: A) -> Result + where + A: serde::de::MapAccess<'de>, + { + let mut required = Vec::new(); + while let Some(key) = map.next_key::()? { + let value = map.next_value::()?; + required.push((key, value)); + } + Ok(RawLiteralEnum::Record(Record { + required, + optional: Vec::new(), + })) + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: serde::de::SeqAccess<'de>, + { + let mut list = Vec::new(); + while let Some(value) = seq.next_element::()? { + list.push(Some(value)); + } + Ok(RawLiteralEnum::List(List { + list, + // `required` only used in serialize, just set default in deserialize. + required: false, + })) + } + } + deserializer.deserialize_any(RawLiteralVisitor) + } + } + + impl RawLiteralEnum { + pub fn try_from(literal: Literal, ty: &Type) -> Result { + let raw = match literal { + Literal::Primitive(prim) => match prim { + super::PrimitiveLiteral::Boolean(v) => RawLiteralEnum::Boolean(v), + super::PrimitiveLiteral::Int(v) => RawLiteralEnum::Int(v), + super::PrimitiveLiteral::Long(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::Float(v) => RawLiteralEnum::Float(v.0), + super::PrimitiveLiteral::Double(v) => RawLiteralEnum::Double(v.0), + super::PrimitiveLiteral::Date(v) => RawLiteralEnum::Int(v), + super::PrimitiveLiteral::Time(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::Timestamp(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::TimestampTZ(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::String(v) => RawLiteralEnum::String(v), + super::PrimitiveLiteral::UUID(v) => { + RawLiteralEnum::Bytes(ByteBuf::from(v.as_u128().to_be_bytes())) + } + super::PrimitiveLiteral::Fixed(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), + super::PrimitiveLiteral::Binary(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), + super::PrimitiveLiteral::Decimal(v) => { + RawLiteralEnum::Bytes(ByteBuf::from(v.to_be_bytes())) + } + }, + Literal::Struct(r#struct) => { + let mut required = Vec::new(); + let mut optional = Vec::new(); + if let Type::Struct(struct_ty) = ty { + for (value, field) in r#struct.into_iter().zip(struct_ty.fields()) { + if field.required { + if let Some(value) = value { + required.push(( + field.name.clone(), + RawLiteralEnum::try_from(value, &field.field_type)?, + )); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Can't convert null to required field", + )); + } + } else if let Some(value) = value { + optional.push(( + field.name.clone(), + Some(RawLiteralEnum::try_from(value, &field.field_type)?), + )); + } else { + optional.push((field.name.clone(), None)); + } + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a struct", ty), + )); + } + RawLiteralEnum::Record(Record { required, optional }) + } + Literal::List(list) => { + if let Type::List(list_ty) = ty { + let list = list + .into_iter() + .map(|v| { + v.map(|v| { + RawLiteralEnum::try_from(v, &list_ty.element_field.field_type) + }) + .transpose() + }) + .collect::>()?; + RawLiteralEnum::List(List { + list, + required: list_ty.element_field.required, + }) + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a list", ty), + )); + } + } + Literal::Map(map) => { + if let Type::Map(map_ty) = ty { + if let Type::Primitive(PrimitiveType::String) = *map_ty.key_field.field_type + { + let mut raw = Vec::with_capacity(map.len()); + for (k, v) in map { + if let Literal::Primitive(PrimitiveLiteral::String(k)) = k { + raw.push(( + k, + v.map(|v| { + RawLiteralEnum::try_from( + v, + &map_ty.value_field.field_type, + ) + }) + .transpose()?, + )); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "literal type is inconsistent with type", + )); + } + } + RawLiteralEnum::StringMap(StringMap { + raw, + required: map_ty.value_field.required, + }) + } else { + let list = map.into_iter().map(|(k,v)| { + let raw_k = + RawLiteralEnum::try_from(k, &map_ty.key_field.field_type)?; + let raw_v = v + .map(|v| { + RawLiteralEnum::try_from(v, &map_ty.value_field.field_type) + }) + .transpose()?; + if map_ty.value_field.required { + Ok(Some(RawLiteralEnum::Record(Record { + required: vec![ + (MAP_KEY_FIELD_NAME.to_string(), raw_k), + (MAP_VALUE_FIELD_NAME.to_string(), raw_v.ok_or_else(||Error::new(ErrorKind::DataInvalid, "Map value is required, value cannot be null"))?), + ], + optional: vec![], + }))) + } else { + Ok(Some(RawLiteralEnum::Record(Record { + required: vec![ + (MAP_KEY_FIELD_NAME.to_string(), raw_k), + ], + optional: vec![ + (MAP_VALUE_FIELD_NAME.to_string(), raw_v) + ], + }))) + } + }).collect::>()?; + RawLiteralEnum::List(List { + list, + required: true, + }) + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a map", ty), + )); + } + } + }; + Ok(raw) + } + + pub fn try_into(self, ty: &Type) -> Result, Error> { + let invalid_err = |v: &str| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Unable to convert raw literal ({}) fail convert to type {} for: type mismatch", + v, ty + ), + ) + }; + let invalid_err_with_reason = |v: &str, reason: &str| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Unable to convert raw literal ({}) fail convert to type {} for: {}", + v, ty, reason + ), + ) + }; + match self { + RawLiteralEnum::Null => Ok(None), + RawLiteralEnum::Boolean(v) => Ok(Some(Literal::bool(v))), + RawLiteralEnum::Int(v) => match ty { + Type::Primitive(PrimitiveType::Int) => Ok(Some(Literal::int(v))), + Type::Primitive(PrimitiveType::Date) => Ok(Some(Literal::date(v))), + _ => Err(invalid_err("int")), + }, + RawLiteralEnum::Long(v) => match ty { + Type::Primitive(PrimitiveType::Long) => Ok(Some(Literal::long(v))), + Type::Primitive(PrimitiveType::Time) => Ok(Some(Literal::time(v))), + Type::Primitive(PrimitiveType::Timestamp) => Ok(Some(Literal::timestamp(v))), + Type::Primitive(PrimitiveType::Timestamptz) => { + Ok(Some(Literal::timestamptz(v))) + } + _ => Err(invalid_err("long")), + }, + RawLiteralEnum::Float(v) => match ty { + Type::Primitive(PrimitiveType::Float) => Ok(Some(Literal::float(v))), + _ => Err(invalid_err("float")), + }, + RawLiteralEnum::Double(v) => match ty { + Type::Primitive(PrimitiveType::Double) => Ok(Some(Literal::double(v))), + _ => Err(invalid_err("double")), + }, + RawLiteralEnum::String(v) => match ty { + Type::Primitive(PrimitiveType::String) => Ok(Some(Literal::string(v))), + _ => Err(invalid_err("string")), + }, + // # TODO:https://github.com/apache/iceberg-rust/issues/86 + // rust avro don't support deserialize any bytes representation now. + RawLiteralEnum::Bytes(_) => Err(invalid_err_with_reason( + "bytes", + "todo: rust avro doesn't support deserialize any bytes representation now", + )), + RawLiteralEnum::List(v) => { + match ty { + Type::List(ty) => Ok(Some(Literal::List( + v.list + .into_iter() + .map(|v| { + if let Some(v) = v { + v.try_into(&ty.element_field.field_type) + } else { + Ok(None) + } + }) + .collect::>()?, + ))), + Type::Map(map_ty) => { + let key_ty = map_ty.key_field.field_type.as_ref(); + let value_ty = map_ty.value_field.field_type.as_ref(); + let mut map = BTreeMap::new(); + for k_v in v.list { + let k_v = k_v.ok_or_else(|| invalid_err_with_reason("list","In deserialize, None will be represented as Some(RawLiteral::Null), all element in list must be valid"))?; + if let RawLiteralEnum::Record(Record { + required, + optional: _, + }) = k_v + { + if required.len() != 2 { + return Err(invalid_err_with_reason("list","Record must contains two element(key and value) of array")); + } + let mut key = None; + let mut value = None; + required.into_iter().for_each(|(k, v)| { + if k == MAP_KEY_FIELD_NAME { + key = Some(v); + } else if k == MAP_VALUE_FIELD_NAME { + value = Some(v); + } + }); + match (key, value) { + (Some(k), Some(v)) => { + let key = k.try_into(key_ty)?.ok_or_else(|| { + invalid_err_with_reason( + "list", + "Key element in Map must be valid", + ) + })?; + let value = v.try_into(value_ty)?; + if map_ty.value_field.required && value.is_none() { + return Err(invalid_err_with_reason( + "list", + "Value element is required in this Map", + )); + } + map.insert(key, value); + } + _ => return Err(invalid_err_with_reason( + "list", + "The elements of record in list are not key and value", + )), + } + } else { + return Err(invalid_err_with_reason( + "list", + "Map should represented as record array.", + )); + } + } + Ok(Some(Literal::Map(map))) + } + _ => Err(invalid_err("list")), + } + } + RawLiteralEnum::Record(Record { + required, + optional: _, + }) => match ty { + Type::Struct(struct_ty) => { + let iters: Vec<(i32, Option, String)> = required + .into_iter() + .map(|(field_name, value)| { + let field = struct_ty + .field_by_name(field_name.as_str()) + .ok_or_else(|| { + invalid_err_with_reason( + "record", + &format!("field {} is not exist", &field_name), + ) + })?; + let value = value.try_into(&field.field_type)?; + Ok((field.id, value, field.name.clone())) + }) + .collect::>()?; + Ok(Some(Literal::Struct(super::Struct::from_iter(iters)))) + } + Type::Map(map_ty) => { + if *map_ty.key_field.field_type != Type::Primitive(PrimitiveType::String) { + return Err(invalid_err_with_reason( + "record", + "Map key must be string", + )); + } + let mut map = BTreeMap::new(); + for (k, v) in required { + let value = v.try_into(&map_ty.value_field.field_type)?; + if map_ty.value_field.required && value.is_none() { + return Err(invalid_err_with_reason( + "record", + "Value element is required in this Map", + )); + } + map.insert(Literal::string(k), value); + } + Ok(Some(Literal::Map(map))) + } + _ => Err(invalid_err("record")), + }, + RawLiteralEnum::StringMap(_) => Err(invalid_err("string map")), + } + } + } +} + #[cfg(test)] mod tests { - use crate::spec::datatypes::{ListType, MapType, NestedField, StructType}; + use apache_avro::{to_value, types::Value}; + + use crate::{ + avro::schema_to_avro_schema, + spec::{ + datatypes::{ListType, MapType, NestedField, StructType}, + Schema, + }, + }; use super::*; @@ -1027,7 +1642,7 @@ mod tests { let mut writer = apache_avro::Writer::new(&schema, Vec::new()); writer.append_ser(ByteBuf::from(literal)).unwrap(); let encoded = writer.into_inner().unwrap(); - let reader = apache_avro::Reader::new(&*encoded).unwrap(); + let reader = apache_avro::Reader::with_schema(&schema, &*encoded).unwrap(); for record in reader { let result = apache_avro::from_value::(&record.unwrap()).unwrap(); @@ -1036,6 +1651,63 @@ mod tests { } } + fn check_convert_with_avro(expected_literal: Literal, expected_type: &Type) { + let fields = vec![NestedField::required(1, "col", expected_type.clone()).into()]; + let schema = Schema::builder() + .with_fields(fields.clone()) + .build() + .unwrap(); + let avro_schema = schema_to_avro_schema("test", &schema).unwrap(); + let struct_type = Type::Struct(StructType::new(fields)); + let struct_literal = Literal::Struct(Struct::from_iter(vec![( + 1, + Some(expected_literal.clone()), + "col".to_string(), + )])); + + let mut writer = apache_avro::Writer::new(&avro_schema, Vec::new()); + let raw_literal = RawLiteral::try_from(struct_literal.clone(), &struct_type).unwrap(); + writer.append_ser(raw_literal).unwrap(); + let encoded = writer.into_inner().unwrap(); + + let reader = apache_avro::Reader::new(&*encoded).unwrap(); + for record in reader { + let result = apache_avro::from_value::(&record.unwrap()).unwrap(); + let desered_literal = result.try_into(&struct_type).unwrap().unwrap(); + assert_eq!(desered_literal, struct_literal); + } + } + + fn check_serialize_avro(literal: Literal, ty: &Type, expect_value: Value) { + let expect_value = Value::Record(vec![("col".to_string(), expect_value)]); + + let fields = vec![NestedField::required(1, "col", ty.clone()).into()]; + let schema = Schema::builder() + .with_fields(fields.clone()) + .build() + .unwrap(); + let avro_schema = schema_to_avro_schema("test", &schema).unwrap(); + let struct_type = Type::Struct(StructType::new(fields)); + let struct_literal = Literal::Struct(Struct::from_iter(vec![( + 1, + Some(literal.clone()), + "col".to_string(), + )])); + let mut writer = apache_avro::Writer::new(&avro_schema, Vec::new()); + let raw_literal = RawLiteral::try_from(struct_literal.clone(), &struct_type).unwrap(); + let value = to_value(raw_literal) + .unwrap() + .resolve(&avro_schema) + .unwrap(); + writer.append_value_ref(&value).unwrap(); + let encoded = writer.into_inner().unwrap(); + + let reader = apache_avro::Reader::new(&*encoded).unwrap(); + for record in reader { + assert_eq!(record.unwrap(), expect_value); + } + } + #[test] fn json_boolean() { let record = r#"true"#; @@ -1309,4 +1981,277 @@ mod tests { &Type::Primitive(PrimitiveType::String), ); } + + #[test] + fn avro_convert_test_int() { + check_convert_with_avro( + Literal::Primitive(PrimitiveLiteral::Int(32)), + &Type::Primitive(PrimitiveType::Int), + ); + } + + #[test] + fn avro_convert_test_long() { + check_convert_with_avro( + Literal::Primitive(PrimitiveLiteral::Long(32)), + &Type::Primitive(PrimitiveType::Long), + ); + } + + #[test] + fn avro_convert_test_float() { + check_convert_with_avro( + Literal::Primitive(PrimitiveLiteral::Float(OrderedFloat(1.0))), + &Type::Primitive(PrimitiveType::Float), + ); + } + + #[test] + fn avro_convert_test_double() { + check_convert_with_avro( + Literal::Primitive(PrimitiveLiteral::Double(OrderedFloat(1.0))), + &Type::Primitive(PrimitiveType::Double), + ); + } + + #[test] + fn avro_convert_test_string() { + check_convert_with_avro( + Literal::Primitive(PrimitiveLiteral::String("iceberg".to_string())), + &Type::Primitive(PrimitiveType::String), + ); + } + + #[test] + fn avro_convert_test_date() { + check_convert_with_avro( + Literal::Primitive(PrimitiveLiteral::Date(17486)), + &Type::Primitive(PrimitiveType::Date), + ); + } + + #[test] + fn avro_convert_test_time() { + check_convert_with_avro( + Literal::Primitive(PrimitiveLiteral::Time(81068123456)), + &Type::Primitive(PrimitiveType::Time), + ); + } + + #[test] + fn avro_convert_test_timestamp() { + check_convert_with_avro( + Literal::Primitive(PrimitiveLiteral::Timestamp(1510871468123456)), + &Type::Primitive(PrimitiveType::Timestamp), + ); + } + + #[test] + fn avro_convert_test_timestamptz() { + check_convert_with_avro( + Literal::Primitive(PrimitiveLiteral::TimestampTZ(1510871468123456)), + &Type::Primitive(PrimitiveType::Timestamptz), + ); + } + + #[test] + fn avro_convert_test_list() { + check_convert_with_avro( + Literal::List(vec![ + Some(Literal::Primitive(PrimitiveLiteral::Int(1))), + Some(Literal::Primitive(PrimitiveLiteral::Int(2))), + Some(Literal::Primitive(PrimitiveLiteral::Int(3))), + None, + ]), + &Type::List(ListType { + element_field: NestedField::list_element( + 0, + Type::Primitive(PrimitiveType::Int), + false, + ) + .into(), + }), + ); + + check_convert_with_avro( + Literal::List(vec![ + Some(Literal::Primitive(PrimitiveLiteral::Int(1))), + Some(Literal::Primitive(PrimitiveLiteral::Int(2))), + Some(Literal::Primitive(PrimitiveLiteral::Int(3))), + ]), + &Type::List(ListType { + element_field: NestedField::list_element( + 0, + Type::Primitive(PrimitiveType::Int), + true, + ) + .into(), + }), + ); + } + + #[test] + fn avro_convert_test_map() { + check_convert_with_avro( + Literal::Map(BTreeMap::from([ + ( + Literal::Primitive(PrimitiveLiteral::Int(1)), + Some(Literal::Primitive(PrimitiveLiteral::Long(1))), + ), + ( + Literal::Primitive(PrimitiveLiteral::Int(2)), + Some(Literal::Primitive(PrimitiveLiteral::Long(2))), + ), + (Literal::Primitive(PrimitiveLiteral::Int(3)), None), + ])), + &Type::Map(MapType { + key_field: NestedField::map_key_element(0, Type::Primitive(PrimitiveType::Int)) + .into(), + value_field: NestedField::map_value_element( + 1, + Type::Primitive(PrimitiveType::Long), + false, + ) + .into(), + }), + ); + + check_convert_with_avro( + Literal::Map(BTreeMap::from([ + ( + Literal::Primitive(PrimitiveLiteral::Int(1)), + Some(Literal::Primitive(PrimitiveLiteral::Long(1))), + ), + ( + Literal::Primitive(PrimitiveLiteral::Int(2)), + Some(Literal::Primitive(PrimitiveLiteral::Long(2))), + ), + ( + Literal::Primitive(PrimitiveLiteral::Int(3)), + Some(Literal::Primitive(PrimitiveLiteral::Long(3))), + ), + ])), + &Type::Map(MapType { + key_field: NestedField::map_key_element(0, Type::Primitive(PrimitiveType::Int)) + .into(), + value_field: NestedField::map_value_element( + 1, + Type::Primitive(PrimitiveType::Long), + true, + ) + .into(), + }), + ); + } + + #[test] + fn avro_convert_test_string_map() { + check_convert_with_avro( + Literal::Map(BTreeMap::from([ + ( + Literal::Primitive(PrimitiveLiteral::String("a".to_string())), + Some(Literal::Primitive(PrimitiveLiteral::Int(1))), + ), + ( + Literal::Primitive(PrimitiveLiteral::String("b".to_string())), + Some(Literal::Primitive(PrimitiveLiteral::Int(2))), + ), + ( + Literal::Primitive(PrimitiveLiteral::String("c".to_string())), + None, + ), + ])), + &Type::Map(MapType { + key_field: NestedField::map_key_element(0, Type::Primitive(PrimitiveType::String)) + .into(), + value_field: NestedField::map_value_element( + 1, + Type::Primitive(PrimitiveType::Int), + false, + ) + .into(), + }), + ); + + check_convert_with_avro( + Literal::Map(BTreeMap::from([ + ( + Literal::Primitive(PrimitiveLiteral::String("a".to_string())), + Some(Literal::Primitive(PrimitiveLiteral::Int(1))), + ), + ( + Literal::Primitive(PrimitiveLiteral::String("b".to_string())), + Some(Literal::Primitive(PrimitiveLiteral::Int(2))), + ), + ( + Literal::Primitive(PrimitiveLiteral::String("c".to_string())), + Some(Literal::Primitive(PrimitiveLiteral::Int(3))), + ), + ])), + &Type::Map(MapType { + key_field: NestedField::map_key_element(0, Type::Primitive(PrimitiveType::String)) + .into(), + value_field: NestedField::map_value_element( + 1, + Type::Primitive(PrimitiveType::Int), + true, + ) + .into(), + }), + ); + } + + #[test] + fn avro_convert_test_record() { + check_convert_with_avro( + Literal::Struct(Struct::from_iter(vec![ + ( + 1, + Some(Literal::Primitive(PrimitiveLiteral::Int(1))), + "id".to_string(), + ), + ( + 2, + Some(Literal::Primitive(PrimitiveLiteral::String( + "bar".to_string(), + ))), + "name".to_string(), + ), + (3, None, "address".to_string()), + ])), + &Type::Struct(StructType::new(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::optional(3, "address", Type::Primitive(PrimitiveType::String)).into(), + ])), + ); + } + + // # TODO:https://github.com/apache/iceberg-rust/issues/86 + // rust avro don't support deserialize any bytes representation now: + // - binary + // - decimal + #[test] + fn avro_convert_test_binary_ser() { + let literal = Literal::Primitive(PrimitiveLiteral::Binary(vec![1, 2, 3, 4, 5])); + let ty = Type::Primitive(PrimitiveType::Binary); + let expect_value = Value::Bytes(vec![1, 2, 3, 4, 5]); + check_serialize_avro(literal, &ty, expect_value); + } + + #[test] + fn avro_convert_test_decimal_ser() { + let literal = Literal::decimal(12345); + let ty = Type::Primitive(PrimitiveType::Decimal { + precision: 9, + scale: 8, + }); + let expect_value = Value::Decimal(apache_avro::Decimal::from(12345_i128.to_be_bytes())); + check_serialize_avro(literal, &ty, expect_value); + } + + // # TODO:https://github.com/apache/iceberg-rust/issues/86 + // rust avro can't support to convert any byte-like type to fixed in avro now. + // - uuid ser/de + // - fixed ser/de }