diff --git a/crates/iceberg/src/avro/schema.rs b/crates/iceberg/src/avro/schema.rs index fb5eee9a4..971096203 100644 --- a/crates/iceberg/src/avro/schema.rs +++ b/crates/iceberg/src/avro/schema.rs @@ -261,10 +261,7 @@ pub(crate) fn avro_decimal_schema(precision: usize, scale: usize) -> Result Type::Primitive(PrimitiveType::Uuid), + UUID_LOGICAL_TYPE => Type::Primitive(PrimitiveType::Uuid), ty => { return Err(Error::new( ErrorKind::FeatureUnsupported, diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index 29e2795c2..7e54fb08f 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -626,30 +626,21 @@ impl Struct { pub struct StructValueIntoIter { null_bitmap: bitvec::boxed::IntoIter, fields: std::vec::IntoIter, - field_ids: std::vec::IntoIter, - field_names: std::vec::IntoIter, } impl Iterator for StructValueIntoIter { - type Item = (i32, Option, String); + type Item = Option; fn next(&mut self) -> Option { - match ( - self.null_bitmap.next(), - self.fields.next(), - self.field_ids.next(), - self.field_names.next(), - ) { - (Some(null), Some(value), Some(id), Some(name)) => { - Some((id, if null { None } else { Some(value) }, name)) - } + match (self.null_bitmap.next(), self.fields.next()) { + (Some(null), Some(value)) => Some(if null { None } else { Some(value) }), _ => None, } } } impl IntoIterator for Struct { - type Item = (i32, Option, String); + type Item = Option; type IntoIter = StructValueIntoIter; @@ -657,8 +648,6 @@ impl IntoIterator for Struct { StructValueIntoIter { null_bitmap: self.null_bitmap.into_iter(), fields: self.fields.into_iter(), - field_ids: self.field_ids.into_iter(), - field_names: self.field_names.into_iter(), } } } @@ -1043,7 +1032,7 @@ mod _serde { use std::collections::BTreeMap; use crate::{ - spec::{PrimitiveType, Type}, + spec::{PrimitiveType, Type, MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME}, Error, ErrorKind, }; @@ -1057,7 +1046,7 @@ mod _serde { use serde_derive::Deserialize as DeserializeDerive; use serde_derive::Serialize as SerializeDerive; - #[derive(SerializeDerive, DeserializeDerive)] + #[derive(SerializeDerive, DeserializeDerive, Debug)] #[serde(transparent)] /// Raw literal representation used for serde. The serialize way is used for Avro serializer. pub struct RawLiteral(RawLiteralEnum); @@ -1074,7 +1063,7 @@ mod _serde { } } - #[derive(SerializeDerive, Clone)] + #[derive(SerializeDerive, Clone, Debug)] #[serde(untagged)] enum RawLiteralEnum { Null, @@ -1090,7 +1079,7 @@ mod _serde { Record(Record), } - #[derive(Clone)] + #[derive(Clone, Debug)] struct Record { required: Vec<(String, RawLiteralEnum)>, optional: Vec<(String, Option)>, @@ -1113,7 +1102,7 @@ mod _serde { } } - #[derive(Clone)] + #[derive(Clone, Debug)] struct List { list: Vec>, required: bool, @@ -1140,7 +1129,7 @@ mod _serde { } } - #[derive(Clone)] + #[derive(Clone, Debug)] struct StringMap { raw: Vec<(String, Option)>, required: bool, @@ -1308,21 +1297,18 @@ mod _serde { super::PrimitiveLiteral::Fixed(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), super::PrimitiveLiteral::Binary(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), super::PrimitiveLiteral::Decimal(v) => { - RawLiteralEnum::Bytes(ByteBuf::from(v.to_le_bytes())) + RawLiteralEnum::Bytes(ByteBuf::from(v.to_be_bytes())) } }, Literal::Struct(r#struct) => { let mut required = Vec::new(); let mut optional = Vec::new(); - if let Type::Struct(sturct_ty) = ty { - for (id, value, field_name) in r#struct.into_iter() { - let field = sturct_ty.field_by_id(id).ok_or_else(|| { - Error::new(ErrorKind::DataInvalid, "field not found") - })?; + if let Type::Struct(struct_ty) = ty { + for (value, field) in r#struct.into_iter().zip(struct_ty.fields()) { if field.required { if let Some(value) = value { required.push(( - field_name, + field.name.clone(), RawLiteralEnum::try_from(value, &field.field_type)?, )); } else { @@ -1333,11 +1319,11 @@ mod _serde { } } else if let Some(value) = value { optional.push(( - field_name, + field.name.clone(), Some(RawLiteralEnum::try_from(value, &field.field_type)?), )); } else { - optional.push((field_name, None)); + optional.push((field.name.clone(), None)); } } } else { @@ -1410,18 +1396,18 @@ mod _serde { if map_ty.value_field.required { Ok(Some(RawLiteralEnum::Record(Record { required: vec![ - ("key".to_string(), raw_k), - ("value".to_string(), raw_v.ok_or_else(||Error::new(ErrorKind::DataInvalid, "Map value is required, value cannot be null"))?), + (MAP_KEY_FIELD_NAME.to_string(), raw_k), + (MAP_VALUE_FIELD_NAME.to_string(), raw_v.ok_or_else(||Error::new(ErrorKind::DataInvalid, "Map value is required, value cannot be null"))?), ], optional: vec![], }))) } else { Ok(Some(RawLiteralEnum::Record(Record { required: vec![ - ("key".to_string(), raw_k), + (MAP_KEY_FIELD_NAME.to_string(), raw_k), ], optional: vec![ - ("value".to_string(), raw_v) + (MAP_VALUE_FIELD_NAME.to_string(), raw_v) ], }))) } @@ -1447,11 +1433,20 @@ mod _serde { Error::new( ErrorKind::DataInvalid, format!( - "Unable to convert raw literal ({}) fail convert to type {}", + "Unable to convert raw literal ({}) fail convert to type {} for: type mismatch", v, ty ), ) }; + let invalid_err_with_reason = |v: &str, reason: &str| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Unable to convert raw literal ({}) fail convert to type {} for: {}", + v, ty, reason + ), + ) + }; match self { RawLiteralEnum::Null => Ok(None), RawLiteralEnum::Boolean(v) => Ok(Some(Literal::bool(v))), @@ -1483,66 +1478,81 @@ mod _serde { }, // # TODO // rust avro don't support deserialize any bytes representation now. - RawLiteralEnum::Bytes(_) => Err(invalid_err("bytes")), - RawLiteralEnum::List(v) => match ty { - Type::List(ty) => Ok(Some(Literal::List( - v.list - .into_iter() - .map(|v| { - if let Some(v) = v { - v.try_into(&ty.element_field.field_type) - } else { - Ok(None) - } - }) - .collect::>()?, - ))), - Type::Map(map_ty) => { - let key_ty = map_ty.key_field.field_type.as_ref(); - let value_ty = map_ty.value_field.field_type.as_ref(); - let mut map = BTreeMap::new(); - for k_v in v.list { - // In deserialize, the element always be `Some`. `None` will be represented - // as `Some(RawLiteral::Null)` - let k_v = k_v.ok_or_else(|| invalid_err("list"))?; - if let RawLiteralEnum::Record(Record { - required, - optional: _, - }) = k_v - { - if required.len() != 2 { - return Err(invalid_err("list")); - } - let mut key = None; - let mut value = None; - required.into_iter().for_each(|(k, v)| { - if k == "key" { - key = Some(v); - } else if k == "value" { - value = Some(v); + RawLiteralEnum::Bytes(_) => Err(invalid_err_with_reason( + "bytes", + "todo: rust avro doesn't support deserialize any bytes representation now", + )), + RawLiteralEnum::List(v) => { + match ty { + Type::List(ty) => Ok(Some(Literal::List( + v.list + .into_iter() + .map(|v| { + if let Some(v) = v { + v.try_into(&ty.element_field.field_type) + } else { + Ok(None) + } + }) + .collect::>()?, + ))), + Type::Map(map_ty) => { + let key_ty = map_ty.key_field.field_type.as_ref(); + let value_ty = map_ty.value_field.field_type.as_ref(); + let mut map = BTreeMap::new(); + for k_v in v.list { + let k_v = k_v.ok_or_else(|| invalid_err_with_reason("list","In deserialize, None will be represented as Some(RawLiteral::Null), all element in list must be valid"))?; + if let RawLiteralEnum::Record(Record { + required, + optional: _, + }) = k_v + { + if required.len() != 2 { + return Err(invalid_err_with_reason("list","Record must contains two element(key and value) of array")); } - }); - match (key, value) { - (Some(k), Some(v)) => { - let key = k - .try_into(key_ty)? - .ok_or_else(|| invalid_err("list"))?; - let value = v.try_into(value_ty)?; - if map_ty.value_field.required && value.is_none() { - return Err(invalid_err("list")); + let mut key = None; + let mut value = None; + required.into_iter().for_each(|(k, v)| { + if k == MAP_KEY_FIELD_NAME { + key = Some(v); + } else if k == MAP_VALUE_FIELD_NAME { + value = Some(v); } - map.insert(key, value); + }); + match (key, value) { + (Some(k), Some(v)) => { + let key = k.try_into(key_ty)?.ok_or_else(|| { + invalid_err_with_reason( + "list", + "Key element in Map must be valid", + ) + })?; + let value = v.try_into(value_ty)?; + if map_ty.value_field.required && value.is_none() { + return Err(invalid_err_with_reason( + "list", + "Value element is required in this Map", + )); + } + map.insert(key, value); + } + _ => return Err(invalid_err_with_reason( + "list", + "The elements of record in list are not key and value", + )), } - _ => return Err(invalid_err("list")), + } else { + return Err(invalid_err_with_reason( + "list", + "Map should represented as record array.", + )); } - } else { - return Err(invalid_err("list")); } + Ok(Some(Literal::Map(map))) } - Ok(Some(Literal::Map(map))) + _ => Err(invalid_err("list")), } - _ => Err(invalid_err("list")), - }, + } RawLiteralEnum::Record(Record { required, optional: _, @@ -1553,7 +1563,12 @@ mod _serde { .map(|(field_name, value)| { let field = struct_ty .field_by_name(field_name.as_str()) - .ok_or_else(|| invalid_err("record"))?; + .ok_or_else(|| { + invalid_err_with_reason( + "record", + &format!("field {} is not exist", &field_name), + ) + })?; let value = value.try_into(&field.field_type)?; Ok((field.id, value, field.name.clone())) }) @@ -1562,13 +1577,19 @@ mod _serde { } Type::Map(map_ty) => { if *map_ty.key_field.field_type != Type::Primitive(PrimitiveType::String) { - return Err(invalid_err("record")); + return Err(invalid_err_with_reason( + "record", + "Map key must be string", + )); } let mut map = BTreeMap::new(); for (k, v) in required { let value = v.try_into(&map_ty.value_field.field_type)?; if map_ty.value_field.required && value.is_none() { - return Err(invalid_err("record")); + return Err(invalid_err_with_reason( + "record", + "Value element is required in this Map", + )); } map.insert(Literal::string(k), value); } @@ -1585,6 +1606,8 @@ mod _serde { #[cfg(test)] mod tests { + use apache_avro::{to_value, types::Value}; + use crate::{ avro::schema_to_avro_schema, spec::{ @@ -1655,6 +1678,36 @@ mod tests { } } + fn check_serialzie_avro(literal: Literal, ty: &Type, expect_value: Value) { + let expect_value = Value::Record(vec![("col".to_string(), expect_value)]); + + let fields = vec![NestedField::required(1, "col", ty.clone()).into()]; + let schema = Schema::builder() + .with_fields(fields.clone()) + .build() + .unwrap(); + let avro_schema = schema_to_avro_schema("test", &schema).unwrap(); + let struct_type = Type::Struct(StructType::new(fields)); + let struct_literal = Literal::Struct(Struct::from_iter(vec![( + 1, + Some(literal.clone()), + "col".to_string(), + )])); + let mut writer = apache_avro::Writer::new(&avro_schema, Vec::new()); + let raw_literal = RawLiteral::try_from(struct_literal.clone(), &struct_type).unwrap(); + let value = to_value(raw_literal) + .unwrap() + .resolve(&avro_schema) + .unwrap(); + writer.append_value_ref(&value).unwrap(); + let encoded = writer.into_inner().unwrap(); + + let reader = apache_avro::Reader::new(&*encoded).unwrap(); + for record in reader { + assert_eq!(record.unwrap(), expect_value); + } + } + #[test] fn json_boolean() { let record = r#"true"#; @@ -2177,7 +2230,28 @@ mod tests { // # TODO // rust avro don't support deserialize any bytes representation now: // - binary - // - fixed // - decimal - // - uuid + #[test] + fn avro_convert_test_binary_ser() { + let literal = Literal::Primitive(PrimitiveLiteral::Binary(vec![1, 2, 3, 4, 5])); + let ty = Type::Primitive(PrimitiveType::Binary); + let expect_value = Value::Bytes(vec![1, 2, 3, 4, 5]); + check_serialzie_avro(literal, &ty, expect_value); + } + + #[test] + fn avro_convert_test_decimal_ser() { + let literal = Literal::decimal(12345); + let ty = Type::Primitive(PrimitiveType::Decimal { + precision: 9, + scale: 8, + }); + let expect_value = Value::Decimal(apache_avro::Decimal::from(12345_i128.to_be_bytes())); + check_serialzie_avro(literal, &ty, expect_value); + } + + // # TODO + // rust avro can't support to convert any byte-like type to fixed in avro now. + // - uuid ser/de + // - fixed ser/de }