From a5ec80108da29f1e82fc84fbca8d2aaa1a8b5755 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Fri, 25 Aug 2023 11:27:04 +0800 Subject: [PATCH 1/5] feat: Introduce conversion between iceberg schema and avro schema --- crates/iceberg/Cargo.toml | 3 +- crates/iceberg/src/avro/mod.rs | 19 + crates/iceberg/src/avro/schema.rs | 917 ++++++++++++++++++ crates/iceberg/src/error.rs | 6 + crates/iceberg/src/lib.rs | 2 + crates/iceberg/src/spec/datatypes.rs | 59 +- crates/iceberg/src/spec/schema.rs | 1 + .../testdata/avro_schema_manifest_entry.json | 286 ++++++ .../avro_schema_manifest_file_v1.json | 139 +++ .../avro_schema_manifest_file_v2.json | 141 +++ 10 files changed, 1562 insertions(+), 11 deletions(-) create mode 100644 crates/iceberg/src/avro/mod.rs create mode 100644 crates/iceberg/src/avro/schema.rs create mode 100644 crates/iceberg/testdata/avro_schema_manifest_entry.json create mode 100644 crates/iceberg/testdata/avro_schema_manifest_file_v1.json create mode 100644 crates/iceberg/testdata/avro_schema_manifest_file_v2.json diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 053acf87b..c110147c6 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -27,7 +27,7 @@ license = "Apache-2.0" keywords = ["iceberg"] [dependencies] -apache-avro = "0.15.0" +apache-avro = "0.15" serde = {version = "^1.0", features = ["rc"]} serde_bytes = "0.11.8" serde_json = "^1.0" @@ -43,6 +43,7 @@ serde_repr = "0.1.16" itertools = "0.11" bimap = "0.6" derive_builder = "0.12.0" +either = "1" [dev-dependencies] diff --git a/crates/iceberg/src/avro/mod.rs b/crates/iceberg/src/avro/mod.rs new file mode 100644 index 000000000..487d9b4f8 --- /dev/null +++ b/crates/iceberg/src/avro/mod.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Avro related codes. +mod schema; diff --git a/crates/iceberg/src/avro/schema.rs b/crates/iceberg/src/avro/schema.rs new file mode 100644 index 000000000..1de4b7430 --- /dev/null +++ b/crates/iceberg/src/avro/schema.rs @@ -0,0 +1,917 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Conversion between iceberg and avro schema. +use crate::spec::{ + visit_schema, ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, + SchemaVisitor, StructType, Type, DECIMAL_LENGTH, +}; +use crate::{ensure_data_valid, Error, ErrorKind, Result}; +use apache_avro::schema::{ + DecimalSchema, FixedSchema, Name, RecordField as AvroRecordField, RecordFieldOrder, + RecordSchema, UnionSchema, +}; +use apache_avro::Schema as AvroSchema; +use itertools::{Either, Itertools}; +use serde_json::{Number, Value}; + +const FILED_ID_PROP: &str = "field-id"; + +struct SchemaToAvroSchema { + schema: String, +} + +impl SchemaVisitor for SchemaToAvroSchema { + type T = Either; + + fn schema( + &mut self, + _schema: &Schema, + value: Either, + ) -> Result> { + let mut avro_schema = value.unwrap_left(); + + if let AvroSchema::Record(record) = &mut avro_schema { + record.name = Name::from(self.schema.as_str()); + } else { + return Err(Error::new( + ErrorKind::Unexpected, + "Schema result must be avro record!", + )); + } + + Ok(Either::Left(avro_schema)) + } + + fn field( + &mut self, + field: &NestedFieldRef, + avro_schema: Either, + ) -> Result> { + let mut field_schema = avro_schema.unwrap_left(); + if let AvroSchema::Record(record) = &mut field_schema { + record.name = Name::from(format!("r{}", field.id).as_str()); + } + + if !field.required { + field_schema = avro_optional(field_schema)?; + } + + let mut avro_record_field = AvroRecordField { + name: field.name.clone(), + schema: field_schema, + order: RecordFieldOrder::Ignore, + position: 0, + doc: field.doc.clone(), + aliases: None, + default: None, + custom_attributes: Default::default(), + }; + + if !field.required { + avro_record_field.default = Some(Value::Null); + } + avro_record_field.custom_attributes.insert( + FILED_ID_PROP.to_string(), + Value::Number(Number::from(field.id)), + ); + + Ok(Either::Right(avro_record_field)) + } + + fn r#struct( + &mut self, + _struct: &StructType, + results: Vec>, + ) -> Result> { + let avro_fields = results.into_iter().map(|r| r.unwrap_right()).collect(); + + Ok(Either::Left(AvroSchema::Record(RecordSchema { + // The name of this record schema should be determined later, by schema name or field + // name, here we use a temporary placeholder to do it. + name: Name::new("null")?, + aliases: None, + doc: None, + fields: avro_fields, + lookup: Default::default(), + attributes: Default::default(), + }))) + } + + fn list( + &mut self, + list: &ListType, + value: Either, + ) -> Result> { + let mut field_schema = value.unwrap_left(); + + if let AvroSchema::Record(record) = &mut field_schema { + record.name = Name::from(format!("r{}", list.element_field.id).as_str()); + } + + if !list.element_field.required { + field_schema = avro_optional(field_schema)?; + } + + // TODO: We need to add element id prop here, but rust's avro schema doesn't support property except record schema. + Ok(Either::Left(AvroSchema::Array(Box::new(field_schema)))) + } + + fn map( + &mut self, + map: &MapType, + key_value: Either, + value: Either, + ) -> Result> { + let key_field_schema = key_value.unwrap_left(); + let mut value_field_schema = value.unwrap_left(); + if !map.value_field.required { + value_field_schema = avro_optional(value_field_schema)?; + } + + if matches!(key_field_schema, AvroSchema::String) { + Ok(Either::Left(AvroSchema::Map(Box::new(value_field_schema)))) + } else { + // Avro map requires that key must be string type. Here we convert it to array if key is + // not string type. + let key_field = { + let mut field = AvroRecordField { + name: map.key_field.name.clone(), + doc: None, + aliases: None, + default: None, + schema: key_field_schema, + order: RecordFieldOrder::Ascending, + position: 0, + custom_attributes: Default::default(), + }; + field.custom_attributes.insert( + FILED_ID_PROP.to_string(), + Value::Number(Number::from(map.key_field.id)), + ); + field + }; + + let value_field = { + let mut field = AvroRecordField { + name: map.key_field.name.clone(), + doc: None, + aliases: None, + default: None, + schema: value_field_schema, + order: RecordFieldOrder::Ignore, + position: 0, + custom_attributes: Default::default(), + }; + field.custom_attributes.insert( + FILED_ID_PROP.to_string(), + Value::Number(Number::from(map.value_field.id)), + ); + field + }; + + let item_avro_schema = AvroSchema::Record(RecordSchema { + name: Name::from(format!("k{}_v{}", map.key_field.id, map.value_field.id).as_str()), + aliases: None, + doc: None, + fields: vec![key_field, value_field], + lookup: Default::default(), + attributes: Default::default(), + }); + + Ok(Either::Left(item_avro_schema)) + } + } + + fn primitive(&mut self, p: &PrimitiveType) -> Result> { + let avro_schema = match p { + PrimitiveType::Boolean => AvroSchema::Boolean, + PrimitiveType::Int => AvroSchema::Int, + PrimitiveType::Long => AvroSchema::Long, + PrimitiveType::Float => AvroSchema::Float, + PrimitiveType::Double => AvroSchema::Double, + PrimitiveType::Date => AvroSchema::Date, + PrimitiveType::Time => AvroSchema::TimeMicros, + PrimitiveType::Timestamp => AvroSchema::TimestampMicros, + PrimitiveType::Timestamptz => AvroSchema::TimestampMicros, + PrimitiveType::String => AvroSchema::String, + PrimitiveType::Uuid => AvroSchema::Uuid, + PrimitiveType::Fixed(len) => avro_fixed_schema((*len) as usize)?, + PrimitiveType::Binary => AvroSchema::Bytes, + PrimitiveType::Decimal { precision, scale } => { + avro_decimal_schema(*precision as usize, *scale as usize)? + } + }; + Ok(Either::Left(avro_schema)) + } +} + +/// Converting iceberg schema to avro schema. +pub(crate) fn schema_to_avro_schema(name: impl ToString, schema: &Schema) -> Result { + let mut converter = SchemaToAvroSchema { + schema: name.to_string(), + }; + + visit_schema(schema, &mut converter).map(Either::unwrap_left) +} + +pub(crate) fn avro_fixed_schema(len: usize) -> Result { + Ok(AvroSchema::Fixed(FixedSchema { + name: Name::new(format!("fixed_{len}").as_str())?, + aliases: None, + doc: None, + size: len, + attributes: Default::default(), + })) +} + +pub(crate) fn avro_decimal_schema(precision: usize, scale: usize) -> Result { + Ok(AvroSchema::Decimal(DecimalSchema { + precision, + scale, + inner: Box::new(avro_fixed_schema(DECIMAL_LENGTH)?), + })) +} + +fn avro_optional(avro_schema: AvroSchema) -> Result { + Ok(AvroSchema::Union(UnionSchema::new(vec![ + AvroSchema::Null, + avro_schema, + ])?)) +} + +fn is_avro_optional(avro_schema: &AvroSchema) -> bool { + match avro_schema { + AvroSchema::Union(union) => union.is_nullable(), + _ => false, + } +} + +/// Post order avro schema visitor. +pub trait AvroSchemaVisitor { + type T; + + fn record(&mut self, record: &RecordSchema, fields: Vec) -> Result; + + fn union(&mut self, union: &UnionSchema, options: Vec) -> Result; + + fn array(&mut self, array: &AvroSchema, item: Self::T) -> Result; + fn map(&mut self, map: &AvroSchema, value: Self::T) -> Result; + + fn primitive(&mut self, schema: &AvroSchema) -> Result; +} + +/// Visit avro schema in post order visitor. +pub fn visit(schema: &AvroSchema, visitor: &mut V) -> Result { + match schema { + AvroSchema::Record(record) => { + let field_results = record + .fields + .iter() + .map(|f| visit(&f.schema, visitor)) + .collect::>>()?; + + visitor.record(record, field_results) + } + AvroSchema::Union(union) => { + let option_results = union + .variants() + .iter() + .map(|f| visit(f, visitor)) + .collect::>>()?; + + visitor.union(union, option_results) + } + AvroSchema::Array(item) => { + let item_result = visit(item, visitor)?; + visitor.array(schema, item_result) + } + AvroSchema::Map(inner) => { + let item_result = visit(inner, visitor)?; + visitor.map(schema, item_result) + } + schema => visitor.primitive(schema), + } +} + +struct AvroSchemaToSchema { + next_id: i32, +} + +impl AvroSchemaToSchema { + fn next_field_id(&mut self) -> i32 { + self.next_id += 1; + self.next_id + } +} + +impl AvroSchemaVisitor for AvroSchemaToSchema { + // Only `AvroSchema::Null` will return `None` + type T = Option; + + fn record( + &mut self, + record: &RecordSchema, + field_types: Vec>, + ) -> Result> { + let mut fields = Vec::with_capacity(field_types.len()); + for (avro_field, typ) in record.fields.iter().zip_eq(field_types) { + let field_id = avro_field + .custom_attributes + .get(FILED_ID_PROP) + .and_then(Value::as_i64) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Can't convert field, missing field id: {avro_field:?}"), + ) + })?; + + let optional = is_avro_optional(&avro_field.schema); + + let mut field = if optional { + NestedField::optional(field_id as i32, &avro_field.name, typ.unwrap()) + } else { + NestedField::required(field_id as i32, &avro_field.name, typ.unwrap()) + }; + + if let Some(doc) = &avro_field.doc { + field = field.with_doc(doc); + } + + fields.push(field.into()); + } + + Ok(Some(Type::Struct(StructType::new(fields)))) + } + + fn union( + &mut self, + union: &UnionSchema, + mut options: Vec>, + ) -> Result> { + ensure_data_valid!( + options.len() <= 2 && !options.is_empty(), + "Can't convert avro union type {:?} to iceberg.", + union + ); + + if options.len() > 1 { + ensure_data_valid!( + options[0].is_none(), + "Can't convert avro union type {:?} to iceberg.", + union + ); + } + + if options.len() == 1 { + Ok(Some(options.remove(0).unwrap())) + } else { + Ok(Some(options.remove(1).unwrap())) + } + } + + fn array(&mut self, array: &AvroSchema, item: Option) -> Result { + if let AvroSchema::Array(item_schema) = array { + let element_field = NestedField::list_element( + self.next_field_id(), + item.unwrap(), + !is_avro_optional(item_schema), + ) + .into(); + Ok(Some(Type::List(ListType { element_field }))) + } else { + Err(Error::new( + ErrorKind::Unexpected, + "Expected avro array schema, but {array}", + )) + } + } + + fn map(&mut self, map: &AvroSchema, value: Option) -> Result> { + if let AvroSchema::Map(value_schema) = map { + // Due to avro rust implementation's limitation, we can't store attributes in map schema, + // we will fix it later when it has been resolved. + let key_field = NestedField::map_key_element( + self.next_field_id(), + Type::Primitive(PrimitiveType::String), + ); + let value_field = NestedField::map_value_element( + self.next_field_id(), + value.unwrap(), + !is_avro_optional(value_schema), + ); + Ok(Some(Type::Map(MapType { + key_field: key_field.into(), + value_field: value_field.into(), + }))) + } else { + Err(Error::new( + ErrorKind::Unexpected, + "Expected avro map schema, but {map}", + )) + } + } + + fn primitive(&mut self, schema: &AvroSchema) -> Result> { + let typ = match schema { + AvroSchema::Decimal(decimal) => { + Type::decimal(decimal.precision as u32, decimal.scale as u32)? + } + AvroSchema::Date => Type::Primitive(PrimitiveType::Date), + AvroSchema::TimeMicros => Type::Primitive(PrimitiveType::Time), + AvroSchema::TimestampMicros => Type::Primitive(PrimitiveType::Timestamp), + AvroSchema::Uuid => Type::Primitive(PrimitiveType::Uuid), + AvroSchema::Boolean => Type::Primitive(PrimitiveType::Boolean), + AvroSchema::Int => Type::Primitive(PrimitiveType::Int), + AvroSchema::Long => Type::Primitive(PrimitiveType::Long), + AvroSchema::Float => Type::Primitive(PrimitiveType::Float), + AvroSchema::Double => Type::Primitive(PrimitiveType::Double), + AvroSchema::String | AvroSchema::Enum(_) => Type::Primitive(PrimitiveType::String), + AvroSchema::Fixed(fixed) => Type::Primitive(PrimitiveType::Fixed(fixed.size as u64)), + AvroSchema::Bytes => Type::Primitive(PrimitiveType::Binary), + AvroSchema::Null => return Ok(None), + _ => { + return Err(Error::new( + ErrorKind::Unexpected, + "Unable to convert avro {schema} to iceberg primitive type.", + )) + } + }; + + Ok(Some(typ)) + } +} + +/// Converts avro schema to iceberg schema. +pub(crate) fn avro_schema_to_schema(avro_schema: &AvroSchema) -> Result { + if let AvroSchema::Record(_) = avro_schema { + let mut converter = AvroSchemaToSchema { next_id: 0 }; + let typ = visit(avro_schema, &mut converter)?.expect("Iceberg schema should not be none."); + if let Type::Struct(s) = typ { + Schema::builder() + .with_fields(s.fields().iter().cloned()) + .build() + } else { + Err(Error::new( + ErrorKind::Unexpected, + format!("Expected to convert avro record schema to struct type, but {typ}"), + )) + } + } else { + Err(Error::new( + ErrorKind::DataInvalid, + "Can't convert non record avro schema to iceberg schema: {avro_schema}", + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::avro::schema::AvroSchemaToSchema; + use crate::spec::{ListType, MapType, NestedField, PrimitiveType, Schema, StructType, Type}; + use apache_avro::schema::{Namespace, UnionSchema}; + use apache_avro::Schema as AvroSchema; + use std::fs::read_to_string; + + fn read_test_data_file_to_avro_schema(filename: &str) -> AvroSchema { + let input = read_to_string(format!( + "{}/testdata/{}", + env!("CARGO_MANIFEST_DIR"), + filename + )) + .unwrap(); + + println!("Input is {input}"); + AvroSchema::parse_str(input.as_str()).unwrap() + } + + fn check_schema_conversion( + avro_schema: AvroSchema, + expected_iceberg_schema: Schema, + check_avro_to_iceberg: bool, + ) { + if check_avro_to_iceberg { + let converted_iceberg_schema = avro_schema_to_schema(&avro_schema).unwrap(); + assert_eq!(expected_iceberg_schema, converted_iceberg_schema); + } + + let converted_avro_schema = schema_to_avro_schema( + avro_schema.name().unwrap().fullname(Namespace::None), + &expected_iceberg_schema, + ) + .unwrap(); + assert_eq!(avro_schema, converted_avro_schema); + } + + #[test] + fn test_manifest_file_v1_schema() { + let fields = vec![ + NestedField::required(500, "manifest_path", PrimitiveType::String.into()) + .with_doc("Location URI with FS scheme") + .into(), + NestedField::required(501, "manifest_length", PrimitiveType::Long.into()) + .with_doc("Total file size in bytes") + .into(), + NestedField::required(502, "partition_spec_id", PrimitiveType::Int.into()) + .with_doc("Spec ID used to write") + .into(), + NestedField::optional(503, "added_snapshot_id", PrimitiveType::Long.into()) + .with_doc("Snapshot ID that added the manifest") + .into(), + NestedField::optional(504, "added_data_files_count", PrimitiveType::Int.into()) + .with_doc("Added entry count") + .into(), + NestedField::optional(505, "existing_data_files_count", PrimitiveType::Int.into()) + .with_doc("Existing entry count") + .into(), + NestedField::optional(506, "deleted_data_files_count", PrimitiveType::Int.into()) + .with_doc("Deleted entry count") + .into(), + NestedField::optional( + 507, + "partitions", + ListType { + element_field: NestedField::list_element( + 508, + StructType::new(vec![ + NestedField::required( + 509, + "contains_null", + PrimitiveType::Boolean.into(), + ) + .with_doc("True if any file has a null partition value") + .into(), + NestedField::optional( + 518, + "contains_nan", + PrimitiveType::Boolean.into(), + ) + .with_doc("True if any file has a nan partition value") + .into(), + NestedField::optional(510, "lower_bound", PrimitiveType::Binary.into()) + .with_doc("Partition lower bound for all files") + .into(), + NestedField::optional(511, "upper_bound", PrimitiveType::Binary.into()) + .with_doc("Partition upper bound for all files") + .into(), + ]) + .into(), + true, + ) + .into(), + } + .into(), + ) + .with_doc("Summary for each partition") + .into(), + NestedField::optional(512, "added_rows_count", PrimitiveType::Long.into()) + .with_doc("Added rows count") + .into(), + NestedField::optional(513, "existing_rows_count", PrimitiveType::Long.into()) + .with_doc("Existing rows count") + .into(), + NestedField::optional(514, "deleted_rows_count", PrimitiveType::Long.into()) + .with_doc("Deleted rows count") + .into(), + ]; + + let iceberg_schema = Schema::builder().with_fields(fields).build().unwrap(); + check_schema_conversion( + read_test_data_file_to_avro_schema("avro_schema_manifest_file_v1.json"), + iceberg_schema, + false, + ); + } + + #[test] + fn test_avro_list_required_primitive() { + let avro_schema = { + AvroSchema::parse_str( + r#" +{ + "type": "record", + "name": "avro_schema", + "fields": [ + { + "name": "array_with_string", + "type": { + "type": "array", + "items": "string", + "default": [], + "element-id": 101 + }, + "field-id": 100 + } + ] +}"#, + ) + .unwrap() + }; + + let iceberg_schema = { + Schema::builder() + .with_fields(vec![NestedField::required( + 100, + "array_with_string", + ListType { + element_field: NestedField::list_element( + 101, + PrimitiveType::String.into(), + true, + ) + .into(), + } + .into(), + ) + .into()]) + .build() + .unwrap() + }; + + check_schema_conversion(avro_schema, iceberg_schema, false); + } + + #[test] + fn test_avro_list_wrapped_primitive() { + let avro_schema = { + AvroSchema::parse_str( + r#" +{ + "type": "record", + "name": "avro_schema", + "fields": [ + { + "name": "array_with_string", + "type": { + "type": "array", + "items": {"type": "string"}, + "default": [], + "element-id": 101 + }, + "field-id": 100 + } + ] +} +"#, + ) + .unwrap() + }; + + let iceberg_schema = { + Schema::builder() + .with_fields(vec![NestedField::required( + 100, + "array_with_string", + ListType { + element_field: NestedField::list_element( + 101, + PrimitiveType::String.into(), + true, + ) + .into(), + } + .into(), + ) + .into()]) + .build() + .unwrap() + }; + + check_schema_conversion(avro_schema, iceberg_schema, false); + } + + #[test] + fn test_avro_list_required_record() { + let avro_schema = { + AvroSchema::parse_str( + r#" +{ + "type": "record", + "name": "avro_schema", + "fields": [ + { + "name": "array_with_record", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "r101", + "fields": [ + { + "name": "contains_null", + "type": "boolean", + "field-id": 102 + }, + { + "name": "contains_nan", + "type": ["null", "boolean"], + "field-id": 103 + } + ] + }, + "element-id": 101 + }, + "field-id": 100 + } + ] +} +"#, + ) + .unwrap() + }; + + let iceberg_schema = { + Schema::builder() + .with_fields(vec![NestedField::required( + 100, + "array_with_record", + ListType { + element_field: NestedField::list_element( + 101, + StructType::new(vec![ + NestedField::required( + 102, + "contains_null", + PrimitiveType::Boolean.into(), + ) + .into(), + NestedField::optional( + 103, + "contains_nan", + PrimitiveType::Boolean.into(), + ) + .into(), + ]) + .into(), + true, + ) + .into(), + } + .into(), + ) + .into()]) + .build() + .unwrap() + }; + + check_schema_conversion(avro_schema, iceberg_schema, false); + } + + #[test] + fn test_resolve_union() { + let avro_schema = UnionSchema::new(vec![ + AvroSchema::Null, + AvroSchema::String, + AvroSchema::Boolean, + ]) + .unwrap(); + + let mut converter = AvroSchemaToSchema { next_id: 0 }; + + let options = avro_schema + .variants() + .iter() + .map(|v| converter.primitive(v).unwrap()) + .collect(); + assert!(converter.union(&avro_schema, options).is_err()); + } + + #[test] + fn test_string_type() { + let mut converter = AvroSchemaToSchema { next_id: 0 }; + let avro_schema = AvroSchema::String; + + assert_eq!( + Some(PrimitiveType::String.into()), + converter.primitive(&avro_schema).unwrap() + ); + } + + #[test] + fn test_map_type() { + let avro_schema = { + AvroSchema::parse_str( + r#" +{ + "type": "map", + "values": ["null", "long"], + "key-id": 101, + "value-id": 102 +} +"#, + ) + .unwrap() + }; + + let mut converter = AvroSchemaToSchema { next_id: 0 }; + let iceberg_type = Type::Map(MapType { + key_field: NestedField::map_key_element(1, PrimitiveType::String.into()).into(), + value_field: NestedField::map_value_element(2, PrimitiveType::Long.into(), false) + .into(), + }); + + assert_eq!( + iceberg_type, + converter + .map(&avro_schema, Some(PrimitiveType::Long.into())) + .unwrap() + .unwrap() + ); + } + + #[test] + fn test_fixed_type() { + let avro_schema = { + AvroSchema::parse_str( + r#" + {"name": "test", "type": "fixed", "size": 22} + "#, + ) + .unwrap() + }; + + let mut converter = AvroSchemaToSchema { next_id: 0 }; + + let iceberg_type = Type::from(PrimitiveType::Fixed(22)); + + assert_eq!( + iceberg_type, + converter.primitive(&avro_schema).unwrap().unwrap() + ); + } + + #[test] + fn test_unknonwn_primitive() { + let mut converter = AvroSchemaToSchema { next_id: 0 }; + + assert!(converter.primitive(&AvroSchema::Duration).is_err()); + } + + #[test] + fn test_no_field_id() { + let avro_schema = { + AvroSchema::parse_str( + r#" +{ + "type": "record", + "name": "avro_schema", + "fields": [ + { + "name": "array_with_string", + "type": "string" + } + ] +} +"#, + ) + .unwrap() + }; + + assert!(avro_schema_to_schema(&avro_schema).is_err()); + } + + #[test] + fn test_decimal_type() { + let avro_schema = { + AvroSchema::parse_str( + r#" + {"type": "bytes", "logicalType": "decimal", "precision": 25, "scale": 19} + "#, + ) + .unwrap() + }; + + let mut converter = AvroSchemaToSchema { next_id: 0 }; + + assert_eq!( + Type::decimal(25, 19).unwrap(), + converter.primitive(&avro_schema).unwrap().unwrap() + ); + } + + #[test] + fn test_date_type() { + let mut converter = AvroSchemaToSchema { next_id: 0 }; + + assert_eq!( + Type::from(PrimitiveType::Date), + converter.primitive(&AvroSchema::Date).unwrap().unwrap() + ); + } +} diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index b2a4ba816..48bdebc53 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -289,6 +289,12 @@ define_from_err!( "Failed to convert between uuid und iceberg value" ); +define_from_err!( + apache_avro::Error, + ErrorKind::DataInvalid, + "Failure in conversion with avro" +); + /// Helper macro to check arguments. /// /// diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 8e77b9702..432718b53 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -18,6 +18,7 @@ //! Native Rust implementation of Apache Iceberg #![deny(missing_docs)] +#![allow(dead_code)] #[macro_use] extern crate derive_builder; @@ -27,4 +28,5 @@ pub use error::Error; pub use error::ErrorKind; pub use error::Result; +mod avro; pub mod spec; diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index eef24e1df..9981d344f 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -18,6 +18,8 @@ /*! * Data Types */ +use crate::ensure_data_valid; +use crate::error::Result; use ::serde::de::{MapAccess, Visitor}; use serde::de::{Error, IntoDeserializer}; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; @@ -34,6 +36,8 @@ pub(crate) const LIST_FILED_NAME: &str = "element"; pub(crate) const MAP_KEY_FIELD_NAME: &str = "key"; pub(crate) const MAP_VALUE_FIELD_NAME: &str = "value"; +pub(crate) const MAX_DECIMAL_PRECISION: u32 = 38; + #[derive(Debug, PartialEq, Eq, Clone)] /// All data types are either primitives or nested types, which are maps, lists, or structs. pub enum Type { @@ -70,6 +74,37 @@ impl Type { pub fn is_struct(&self) -> bool { matches!(self, Type::Struct(_)) } + + /// Creates decimal type. + #[inline(always)] + pub fn decimal(precision: u32, scale: u32) -> Result { + ensure_data_valid!(precision <= MAX_DECIMAL_PRECISION, "Decimals with precision larger than {MAX_DECIMAL_PRECISION} are not supported: {precision}",); + Ok(Type::Primitive(PrimitiveType::Decimal { precision, scale })) + } +} + +impl From for Type { + fn from(value: PrimitiveType) -> Self { + Self::Primitive(value) + } +} + +impl From for Type { + fn from(value: StructType) -> Self { + Type::Struct(value) + } +} + +impl From for Type { + fn from(value: ListType) -> Self { + Type::List(value) + } +} + +impl From for Type { + fn from(value: MapType) -> Self { + Type::Map(value) + } } /// Primitive data types @@ -112,7 +147,7 @@ pub enum PrimitiveType { } impl Serialize for Type { - fn serialize(&self, serializer: S) -> Result + fn serialize(&self, serializer: S) -> std::result::Result where S: Serializer, { @@ -122,7 +157,7 @@ impl Serialize for Type { } impl<'de> Deserialize<'de> for Type { - fn deserialize(deserializer: D) -> Result + fn deserialize(deserializer: D) -> std::result::Result where D: Deserializer<'de>, { @@ -132,7 +167,7 @@ impl<'de> Deserialize<'de> for Type { } impl<'de> Deserialize<'de> for PrimitiveType { - fn deserialize(deserializer: D) -> Result + fn deserialize(deserializer: D) -> std::result::Result where D: Deserializer<'de>, { @@ -148,7 +183,7 @@ impl<'de> Deserialize<'de> for PrimitiveType { } impl Serialize for PrimitiveType { - fn serialize(&self, serializer: S) -> Result + fn serialize(&self, serializer: S) -> std::result::Result where S: Serializer, { @@ -162,7 +197,7 @@ impl Serialize for PrimitiveType { } } -fn deserialize_decimal<'de, D>(deserializer: D) -> Result +fn deserialize_decimal<'de, D>(deserializer: D) -> std::result::Result where D: Deserializer<'de>, { @@ -179,14 +214,18 @@ where }) } -fn serialize_decimal(precision: &u32, scale: &u32, serializer: S) -> Result +fn serialize_decimal( + precision: &u32, + scale: &u32, + serializer: S, +) -> std::result::Result where S: Serializer, { serializer.serialize_str(&format!("decimal({precision},{scale})")) } -fn deserialize_fixed<'de, D>(deserializer: D) -> Result +fn deserialize_fixed<'de, D>(deserializer: D) -> std::result::Result where D: Deserializer<'de>, { @@ -201,7 +240,7 @@ where .map_err(D::Error::custom) } -fn serialize_fixed(value: &u64, serializer: S) -> Result +fn serialize_fixed(value: &u64, serializer: S) -> std::result::Result where S: Serializer, { @@ -243,7 +282,7 @@ pub struct StructType { } impl<'de> Deserialize<'de> for StructType { - fn deserialize(deserializer: D) -> Result + fn deserialize(deserializer: D) -> std::result::Result where D: Deserializer<'de>, { @@ -263,7 +302,7 @@ impl<'de> Deserialize<'de> for StructType { formatter.write_str("struct") } - fn visit_map(self, mut map: V) -> Result + fn visit_map(self, mut map: V) -> std::result::Result where V: MapAccess<'de>, { diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index 865df25df..af8d71c23 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -30,6 +30,7 @@ use std::fmt::{Display, Formatter}; use std::sync::OnceLock; const DEFAULT_SCHEMA_ID: i32 = 0; +pub(crate) const DECIMAL_LENGTH: usize = 128; /// Defines schema in iceberg. #[derive(Debug, PartialEq, Eq, Clone)] diff --git a/crates/iceberg/testdata/avro_schema_manifest_entry.json b/crates/iceberg/testdata/avro_schema_manifest_entry.json new file mode 100644 index 000000000..876c5fa27 --- /dev/null +++ b/crates/iceberg/testdata/avro_schema_manifest_entry.json @@ -0,0 +1,286 @@ +{ + "type": "record", + "name": "manifest_entry", + "fields": [ + { + "name": "status", + "type": "int", + "field-id": 0 + }, + { + "name": "snapshot_id", + "type": [ + "null", + "long" + ], + "field-id": 1 + }, + { + "name": "data_file", + "type": { + "type": "record", + "name": "r2", + "fields": [ + { + "name": "file_path", + "type": "string", + "doc": "Location URI with FS scheme", + "field-id": 100 + }, + { + "name": "file_format", + "type": "string", + "doc": "File format name: avro, orc, or parquet", + "field-id": 101 + }, + { + "name": "partition", + "type": { + "type": "record", + "name": "r102", + "fields": [ + { + "field-id": 1000, + "name": "VendorID", + "type": [ + "null", + "int" + ] + }, + { + "field-id": 1001, + "name": "tpep_pickup_datetime", + "type": [ + "null", + { + "type": "int", + "logicalType": "date" + } + ] + } + ] + }, + "field-id": 102 + }, + { + "name": "record_count", + "type": "long", + "doc": "Number of records in the file", + "field-id": 103 + }, + { + "name": "file_size_in_bytes", + "type": "long", + "doc": "Total file size in bytes", + "field-id": 104 + }, + { + "name": "block_size_in_bytes", + "type": "long", + "field-id": 105 + }, + { + "name": "column_sizes", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "k117_v118", + "fields": [ + { + "name": "key", + "type": "int", + "field-id": 117 + }, + { + "name": "value", + "type": "long", + "field-id": 118 + } + ] + }, + "logicalType": "map" + } + ], + "doc": "Map of column id to total size on disk", + "field-id": 108 + }, + { + "name": "value_counts", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "k119_v120", + "fields": [ + { + "name": "key", + "type": "int", + "field-id": 119 + }, + { + "name": "value", + "type": "long", + "field-id": 120 + } + ] + }, + "logicalType": "map" + } + ], + "doc": "Map of column id to total count, including null and NaN", + "field-id": 109 + }, + { + "name": "null_value_counts", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "k121_v122", + "fields": [ + { + "name": "key", + "type": "int", + "field-id": 121 + }, + { + "name": "value", + "type": "long", + "field-id": 122 + } + ] + }, + "logicalType": "map" + } + ], + "doc": "Map of column id to null value count", + "field-id": 110 + }, + { + "name": "nan_value_counts", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "k138_v139", + "fields": [ + { + "name": "key", + "type": "int", + "field-id": 138 + }, + { + "name": "value", + "type": "long", + "field-id": 139 + } + ] + }, + "logicalType": "map" + } + ], + "doc": "Map of column id to number of NaN values in the column", + "field-id": 137 + }, + { + "name": "lower_bounds", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "k126_v127", + "fields": [ + { + "name": "key", + "type": "int", + "field-id": 126 + }, + { + "name": "value", + "type": "bytes", + "field-id": 127 + } + ] + }, + "logicalType": "map" + } + ], + "doc": "Map of column id to lower bound", + "field-id": 125 + }, + { + "name": "upper_bounds", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "k129_v130", + "fields": [ + { + "name": "key", + "type": "int", + "field-id": 129 + }, + { + "name": "value", + "type": "bytes", + "field-id": 130 + } + ] + }, + "logicalType": "map" + } + ], + "doc": "Map of column id to upper bound", + "field-id": 128 + }, + { + "name": "key_metadata", + "type": [ + "null", + "bytes" + ], + "doc": "Encryption key metadata blob", + "field-id": 131 + }, + { + "name": "split_offsets", + "type": [ + "null", + { + "type": "array", + "items": "long", + "element-id": 133 + } + ], + "doc": "Splittable offsets", + "field-id": 132 + }, + { + "name": "sort_order_id", + "type": [ + "null", + "int" + ], + "doc": "Sort order ID", + "field-id": 140 + } + ] + }, + "field-id": 2 + } + ] +} \ No newline at end of file diff --git a/crates/iceberg/testdata/avro_schema_manifest_file_v1.json b/crates/iceberg/testdata/avro_schema_manifest_file_v1.json new file mode 100644 index 000000000..b185094fd --- /dev/null +++ b/crates/iceberg/testdata/avro_schema_manifest_file_v1.json @@ -0,0 +1,139 @@ +{ + "type": "record", + "name": "manifest_file", + "fields": [ + { + "name": "manifest_path", + "type": "string", + "doc": "Location URI with FS scheme", + "field-id": 500 + }, + { + "name": "manifest_length", + "type": "long", + "doc": "Total file size in bytes", + "field-id": 501 + }, + { + "name": "partition_spec_id", + "type": "int", + "doc": "Spec ID used to write", + "field-id": 502 + }, + { + "name": "added_snapshot_id", + "type": [ + "null", + "long" + ], + "doc": "Snapshot ID that added the manifest", + "default": null, + "field-id": 503 + }, + { + "name": "added_data_files_count", + "type": [ + "null", + "int" + ], + "doc": "Added entry count", + "field-id": 504 + }, + { + "name": "existing_data_files_count", + "type": [ + "null", + "int" + ], + "doc": "Existing entry count", + "field-id": 505 + }, + { + "name": "deleted_data_files_count", + "type": [ + "null", + "int" + ], + "doc": "Deleted entry count", + "field-id": 506 + }, + { + "name": "partitions", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "r508", + "fields": [ + { + "name": "contains_null", + "type": "boolean", + "doc": "True if any file has a null partition value", + "field-id": 509 + }, + { + "name": "contains_nan", + "type": [ + "null", + "boolean" + ], + "doc": "True if any file has a nan partition value", + "field-id": 518 + }, + { + "name": "lower_bound", + "type": [ + "null", + "bytes" + ], + "doc": "Partition lower bound for all files", + "field-id": 510 + }, + { + "name": "upper_bound", + "type": [ + "null", + "bytes" + ], + "doc": "Partition upper bound for all files", + "field-id": 511 + } + ] + }, + "element-id": 508 + } + ], + "doc": "Summary for each partition", + "field-id": 507 + }, + { + "name": "added_rows_count", + "type": [ + "null", + "long" + ], + "doc": "Added rows count", + "field-id": 512 + }, + { + "name": "existing_rows_count", + "type": [ + "null", + "long" + ], + "doc": "Existing rows count", + "field-id": 513 + }, + { + "name": "deleted_rows_count", + "type": [ + "null", + "long" + ], + "doc": "Deleted rows count", + "field-id": 514 + } + ] +} diff --git a/crates/iceberg/testdata/avro_schema_manifest_file_v2.json b/crates/iceberg/testdata/avro_schema_manifest_file_v2.json new file mode 100644 index 000000000..34b97b9c6 --- /dev/null +++ b/crates/iceberg/testdata/avro_schema_manifest_file_v2.json @@ -0,0 +1,141 @@ +{ + "type": "record", + "name": "manifest_file", + "fields": [ + { + "name": "manifest_path", + "type": "string", + "doc": "Location URI with FS scheme", + "field-id": 500 + }, + { + "name": "manifest_length", + "type": "long", + "doc": "Total file size in bytes", + "field-id": 501 + }, + { + "name": "partition_spec_id", + "type": "int", + "doc": "Spec ID used to write", + "field-id": 502 + }, + { + "name": "content", + "type": "int", + "doc": "Contents of the manifest: 0=data, 1=deletes", + "field-id": 517 + }, + { + "name": "sequence_number", + "type": [ + "null", + "long" + ], + "doc": "Sequence number when the manifest was added", + "field-id": 515 + }, + { + "name": "min_sequence_number", + "type": [ + "null", + "long" + ], + "doc": "Lowest sequence number in the manifest", + "field-id": 516 + }, + { + "name": "added_snapshot_id", + "type": "long", + "doc": "Snapshot ID that added the manifest", + "field-id": 503 + }, + { + "name": "added_files_count", + "type": "int", + "doc": "Added entry count", + "field-id": 504 + }, + { + "name": "existing_files_count", + "type": "int", + "doc": "Existing entry count", + "field-id": 505 + }, + { + "name": "deleted_files_count", + "type": "int", + "doc": "Deleted entry count", + "field-id": 506 + }, + { + "name": "added_rows_count", + "type": "long", + "doc": "Added rows count", + "field-id": 512 + }, + { + "name": "existing_rows_count", + "type": "long", + "doc": "Existing rows count", + "field-id": 513 + }, + { + "name": "deleted_rows_count", + "type": "long", + "doc": "Deleted rows count", + "field-id": 514 + }, + { + "name": "partitions", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "r508", + "fields": [ + { + "name": "contains_null", + "type": "boolean", + "doc": "True if any file has a null partition value", + "field-id": 509 + }, + { + "name": "contains_nan", + "type": [ + "null", + "boolean" + ], + "doc": "True if any file has a nan partition value", + "field-id": 518 + }, + { + "name": "lower_bound", + "type": [ + "null", + "bytes" + ], + "doc": "Partition lower bound for all files", + "field-id": 510 + }, + { + "name": "upper_bound", + "type": [ + "null", + "bytes" + ], + "doc": "Partition upper bound for all files", + "field-id": 511 + } + ] + }, + "element-id": 508 + } + ], + "doc": "Summary for each partition", + "field-id": 507 + } + ] +} \ No newline at end of file From 16071127974aaff9e46a322a0fedf0e36d56fa4e Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Fri, 25 Aug 2023 11:32:15 +0800 Subject: [PATCH 2/5] Fix typo --- crates/iceberg/src/avro/schema.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/avro/schema.rs b/crates/iceberg/src/avro/schema.rs index 1de4b7430..7ba3f1967 100644 --- a/crates/iceberg/src/avro/schema.rs +++ b/crates/iceberg/src/avro/schema.rs @@ -857,7 +857,7 @@ mod tests { } #[test] - fn test_unknonwn_primitive() { + fn test_unknown_primitive() { let mut converter = AvroSchemaToSchema { next_id: 0 }; assert!(converter.primitive(&AvroSchema::Duration).is_err()); From 26fcfe5c9746b7a99f8460178e740d7b4f12ad02 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Tue, 29 Aug 2023 09:28:40 +0000 Subject: [PATCH 3/5] Fix comments --- crates/iceberg/src/avro/schema.rs | 36 +++++++++++++------------------ crates/iceberg/src/lib.rs | 1 - 2 files changed, 15 insertions(+), 22 deletions(-) diff --git a/crates/iceberg/src/avro/schema.rs b/crates/iceberg/src/avro/schema.rs index 7ba3f1967..2d5d87a57 100644 --- a/crates/iceberg/src/avro/schema.rs +++ b/crates/iceberg/src/avro/schema.rs @@ -35,14 +35,12 @@ struct SchemaToAvroSchema { schema: String, } +type AvroSchemaOrField = Either; + impl SchemaVisitor for SchemaToAvroSchema { - type T = Either; + type T = AvroSchemaOrField; - fn schema( - &mut self, - _schema: &Schema, - value: Either, - ) -> Result> { + fn schema(&mut self, _schema: &Schema, value: AvroSchemaOrField) -> Result { let mut avro_schema = value.unwrap_left(); if let AvroSchema::Record(record) = &mut avro_schema { @@ -60,8 +58,8 @@ impl SchemaVisitor for SchemaToAvroSchema { fn field( &mut self, field: &NestedFieldRef, - avro_schema: Either, - ) -> Result> { + avro_schema: AvroSchemaOrField, + ) -> Result { let mut field_schema = avro_schema.unwrap_left(); if let AvroSchema::Record(record) = &mut field_schema { record.name = Name::from(format!("r{}", field.id).as_str()); @@ -96,8 +94,8 @@ impl SchemaVisitor for SchemaToAvroSchema { fn r#struct( &mut self, _struct: &StructType, - results: Vec>, - ) -> Result> { + results: Vec, + ) -> Result { let avro_fields = results.into_iter().map(|r| r.unwrap_right()).collect(); Ok(Either::Left(AvroSchema::Record(RecordSchema { @@ -112,11 +110,7 @@ impl SchemaVisitor for SchemaToAvroSchema { }))) } - fn list( - &mut self, - list: &ListType, - value: Either, - ) -> Result> { + fn list(&mut self, list: &ListType, value: AvroSchemaOrField) -> Result { let mut field_schema = value.unwrap_left(); if let AvroSchema::Record(record) = &mut field_schema { @@ -134,9 +128,9 @@ impl SchemaVisitor for SchemaToAvroSchema { fn map( &mut self, map: &MapType, - key_value: Either, - value: Either, - ) -> Result> { + key_value: AvroSchemaOrField, + value: AvroSchemaOrField, + ) -> Result { let key_field_schema = key_value.unwrap_left(); let mut value_field_schema = value.unwrap_left(); if !map.value_field.required { @@ -197,7 +191,7 @@ impl SchemaVisitor for SchemaToAvroSchema { } } - fn primitive(&mut self, p: &PrimitiveType) -> Result> { + fn primitive(&mut self, p: &PrimitiveType) -> Result { let avro_schema = match p { PrimitiveType::Boolean => AvroSchema::Boolean, PrimitiveType::Int => AvroSchema::Int, @@ -262,7 +256,7 @@ fn is_avro_optional(avro_schema: &AvroSchema) -> bool { } /// Post order avro schema visitor. -pub trait AvroSchemaVisitor { +pub(crate) trait AvroSchemaVisitor { type T; fn record(&mut self, record: &RecordSchema, fields: Vec) -> Result; @@ -276,7 +270,7 @@ pub trait AvroSchemaVisitor { } /// Visit avro schema in post order visitor. -pub fn visit(schema: &AvroSchema, visitor: &mut V) -> Result { +pub(crate) fn visit(schema: &AvroSchema, visitor: &mut V) -> Result { match schema { AvroSchema::Record(record) => { let field_results = record diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 432718b53..5ef9ad300 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -18,7 +18,6 @@ //! Native Rust implementation of Apache Iceberg #![deny(missing_docs)] -#![allow(dead_code)] #[macro_use] extern crate derive_builder; From 3af3119fc685ccd29b894d2da9d7d0b58d6d9bb2 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Tue, 29 Aug 2023 09:31:07 +0000 Subject: [PATCH 4/5] Fix clippy --- crates/iceberg/src/avro/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/iceberg/src/avro/mod.rs b/crates/iceberg/src/avro/mod.rs index 487d9b4f8..a6bb0735f 100644 --- a/crates/iceberg/src/avro/mod.rs +++ b/crates/iceberg/src/avro/mod.rs @@ -16,4 +16,5 @@ // under the License. //! Avro related codes. +#[allow(dead_code)] mod schema; From 4b4123116ba81034f2c4626795abb47059cd2e52 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Thu, 31 Aug 2023 11:46:52 +0800 Subject: [PATCH 5/5] Calculate decimal bytes --- crates/iceberg/Cargo.toml | 1 + crates/iceberg/src/avro/schema.rs | 6 ++- crates/iceberg/src/spec/datatypes.rs | 75 +++++++++++++++++++++++++++- crates/iceberg/src/spec/schema.rs | 1 - 4 files changed, 79 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index c110147c6..665ccad4d 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -44,6 +44,7 @@ itertools = "0.11" bimap = "0.6" derive_builder = "0.12.0" either = "1" +lazy_static = "1" [dev-dependencies] diff --git a/crates/iceberg/src/avro/schema.rs b/crates/iceberg/src/avro/schema.rs index 2d5d87a57..244d2082e 100644 --- a/crates/iceberg/src/avro/schema.rs +++ b/crates/iceberg/src/avro/schema.rs @@ -18,7 +18,7 @@ //! Conversion between iceberg and avro schema. use crate::spec::{ visit_schema, ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, - SchemaVisitor, StructType, Type, DECIMAL_LENGTH, + SchemaVisitor, StructType, Type, }; use crate::{ensure_data_valid, Error, ErrorKind, Result}; use apache_avro::schema::{ @@ -237,7 +237,9 @@ pub(crate) fn avro_decimal_schema(precision: usize, scale: usize) -> Result= ((i+1) as u32) { + *required_len = (j+1) as u32; + break; + } + } + } + + ret + }; + + } +} + #[derive(Debug, PartialEq, Eq, Clone)] /// All data types are either primitives or nested types, which are maps, lists, or structs. pub enum Type { @@ -75,10 +112,27 @@ impl Type { matches!(self, Type::Struct(_)) } + /// Return max precision for decimal given [`num_bytes`] bytes. + #[inline(always)] + pub fn decimal_max_precision(num_bytes: u32) -> Result { + ensure_data_valid!( + num_bytes > 0 && num_bytes <= MAX_DECIMAL_BYTES, + "Decimal length larger than {MAX_DECIMAL_BYTES} is not supported: {num_bytes}", + ); + Ok(MAX_PRECISION[num_bytes as usize - 1]) + } + + /// Returns minimum bytes required for decimal with [`precision`]. + #[inline(always)] + pub fn decimal_required_bytes(precision: u32) -> Result { + ensure_data_valid!(precision > 0 && precision <= MAX_DECIMAL_PRECISION, "Decimals with precision larger than {MAX_DECIMAL_PRECISION} are not supported: {precision}",); + Ok(REQUIRED_LENGTH[precision as usize - 1]) + } + /// Creates decimal type. #[inline(always)] pub fn decimal(precision: u32, scale: u32) -> Result { - ensure_data_valid!(precision <= MAX_DECIMAL_PRECISION, "Decimals with precision larger than {MAX_DECIMAL_PRECISION} are not supported: {precision}",); + ensure_data_valid!(precision > 0 && precision <= MAX_DECIMAL_PRECISION, "Decimals with precision larger than {MAX_DECIMAL_PRECISION} are not supported: {precision}",); Ok(Type::Primitive(PrimitiveType::Decimal { precision, scale })) } } @@ -672,6 +726,7 @@ pub struct MapType { #[cfg(test)] mod tests { + use pretty_assertions::assert_eq; use uuid::Uuid; use crate::spec::values::PrimitiveLiteral; @@ -952,4 +1007,22 @@ mod tests { }), ); } + + #[test] + fn test_decimal_precision() { + let expected_max_precision = [ + 2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, + 57, + ]; + for (i, max_precision) in expected_max_precision.iter().enumerate() { + assert_eq!( + *max_precision, + Type::decimal_max_precision(i as u32 + 1).unwrap(), + "Failed calculate max precision for {i}" + ); + } + + assert_eq!(5, Type::decimal_required_bytes(10).unwrap()); + assert_eq!(16, Type::decimal_required_bytes(38).unwrap()); + } } diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index af8d71c23..865df25df 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -30,7 +30,6 @@ use std::fmt::{Display, Formatter}; use std::sync::OnceLock; const DEFAULT_SCHEMA_ID: i32 = 0; -pub(crate) const DECIMAL_LENGTH: usize = 128; /// Defines schema in iceberg. #[derive(Debug, PartialEq, Eq, Clone)]