Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support ser/deser of value #82

Merged
merged 9 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 69 additions & 37 deletions crates/iceberg/src/avro/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ use itertools::{Either, Itertools};
use serde_json::{Number, Value};

const FILED_ID_PROP: &str = "field-id";
const UUID_BYTES: usize = 16;
const UUID_LOGICAL_TYPE: &str = "uuid";
// # TODO: https://github.com/apache/iceberg-rust/issues/86
// This const may better to maintain in avro-rs.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can create an issue for avro-rs and comment the issue link here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have added a #86 to track them

const LOGICAL_TYPE: &str = "logicalType";

struct SchemaToAvroSchema {
schema: String,
Expand Down Expand Up @@ -98,26 +103,13 @@ impl SchemaVisitor for SchemaToAvroSchema {
_struct: &StructType,
results: Vec<AvroSchemaOrField>,
) -> Result<AvroSchemaOrField> {
let avro_fields: Vec<AvroRecordField> =
results.into_iter().map(|r| r.unwrap_right()).collect();
let avro_fields = results.into_iter().map(|r| r.unwrap_right()).collect_vec();

let lookup = BTreeMap::from_iter(
avro_fields
.iter()
.enumerate()
.map(|(i, field)| (field.name.clone(), i)),
);

Ok(Either::Left(AvroSchema::Record(RecordSchema {
Ok(Either::Left(
// The name of this record schema should be determined later, by schema name or field
// name, here we use a temporary placeholder to do it.
name: Name::new("null")?,
aliases: None,
doc: None,
fields: avro_fields,
lookup,
attributes: Default::default(),
})))
avro_record_schema("null", avro_fields)?,
))
}

fn list(&mut self, list: &ListType, value: AvroSchemaOrField) -> Result<AvroSchemaOrField> {
Expand Down Expand Up @@ -172,7 +164,7 @@ impl SchemaVisitor for SchemaToAvroSchema {

let value_field = {
let mut field = AvroRecordField {
name: map.key_field.name.clone(),
name: map.value_field.name.clone(),
doc: None,
aliases: None,
default: None,
Expand All @@ -188,16 +180,13 @@ impl SchemaVisitor for SchemaToAvroSchema {
field
};

let item_avro_schema = AvroSchema::Record(RecordSchema {
name: Name::from(format!("k{}_v{}", map.key_field.id, map.value_field.id).as_str()),
aliases: None,
doc: None,
fields: vec![key_field, value_field],
lookup: Default::default(),
attributes: Default::default(),
});
let fields = vec![key_field, value_field];
let item_avro_schema = avro_record_schema(
format!("k{}_v{}", map.key_field.id, map.value_field.id).as_str(),
fields,
)?;

Ok(Either::Left(item_avro_schema))
Ok(Either::Left(AvroSchema::Array(item_avro_schema.into())))
ZENOTME marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -213,8 +202,8 @@ impl SchemaVisitor for SchemaToAvroSchema {
PrimitiveType::Timestamp => AvroSchema::TimestampMicros,
PrimitiveType::Timestamptz => AvroSchema::TimestampMicros,
PrimitiveType::String => AvroSchema::String,
PrimitiveType::Uuid => AvroSchema::Uuid,
PrimitiveType::Fixed(len) => avro_fixed_schema((*len) as usize)?,
PrimitiveType::Uuid => avro_fixed_schema(UUID_BYTES, Some(UUID_LOGICAL_TYPE))?,
PrimitiveType::Fixed(len) => avro_fixed_schema((*len) as usize, None)?,
PrimitiveType::Binary => AvroSchema::Bytes,
PrimitiveType::Decimal { precision, scale } => {
avro_decimal_schema(*precision as usize, *scale as usize)?
Expand All @@ -233,23 +222,46 @@ pub(crate) fn schema_to_avro_schema(name: impl ToString, schema: &Schema) -> Res
visit_schema(schema, &mut converter).map(Either::unwrap_left)
}

pub(crate) fn avro_fixed_schema(len: usize) -> Result<AvroSchema> {
fn avro_record_schema(name: &str, fields: Vec<AvroRecordField>) -> Result<AvroSchema> {
let lookup = fields
.iter()
.enumerate()
.map(|f| (f.1.name.clone(), f.0))
.collect();

Ok(AvroSchema::Record(RecordSchema {
name: Name::new(name)?,
aliases: None,
doc: None,
fields,
lookup,
attributes: Default::default(),
}))
}

pub(crate) fn avro_fixed_schema(len: usize, logical_type: Option<&str>) -> Result<AvroSchema> {
let attributes = if let Some(logical_type) = logical_type {
BTreeMap::from([(
LOGICAL_TYPE.to_string(),
Value::String(logical_type.to_string()),
)])
Comment on lines +244 to +247
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out of curiosity. Is BTreeMap the default in Rust? Trees tend to have many pointers and, therefore have faster lookups in exchange for a larger memory footprint (compared to a HashMap).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's not default. For here it's just because avro::FixedSchema use BTreeMap. And indeed HashMap can save more memory. But I find that seems avro prefer to use BTreeMap. (I'm not sure that maybe they need the sorted order when iterate it the attributes in serialization?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sometimes comes up if you want the value to be hashable. BTreeMap is hashable while HashMap is not hashable. I think it has something to do with requiring an order to be hashable.

} else {
Default::default()
};
Ok(AvroSchema::Fixed(FixedSchema {
name: Name::new(format!("fixed_{len}").as_str())?,
aliases: None,
doc: None,
size: len,
attributes: Default::default(),
attributes,
}))
}

pub(crate) fn avro_decimal_schema(precision: usize, scale: usize) -> Result<AvroSchema> {
Ok(AvroSchema::Decimal(DecimalSchema {
precision,
scale,
inner: Box::new(avro_fixed_schema(
Type::decimal_required_bytes(precision as u32)? as usize,
)?),
inner: Box::new(AvroSchema::Bytes),
}))
}

Expand Down Expand Up @@ -441,14 +453,35 @@ impl AvroSchemaVisitor for AvroSchemaToSchema {
AvroSchema::Date => Type::Primitive(PrimitiveType::Date),
AvroSchema::TimeMicros => Type::Primitive(PrimitiveType::Time),
AvroSchema::TimestampMicros => Type::Primitive(PrimitiveType::Timestamp),
AvroSchema::Uuid => Type::Primitive(PrimitiveType::Uuid),
AvroSchema::Boolean => Type::Primitive(PrimitiveType::Boolean),
AvroSchema::Int => Type::Primitive(PrimitiveType::Int),
AvroSchema::Long => Type::Primitive(PrimitiveType::Long),
AvroSchema::Float => Type::Primitive(PrimitiveType::Float),
AvroSchema::Double => Type::Primitive(PrimitiveType::Double),
AvroSchema::String | AvroSchema::Enum(_) => Type::Primitive(PrimitiveType::String),
AvroSchema::Fixed(fixed) => Type::Primitive(PrimitiveType::Fixed(fixed.size as u64)),
AvroSchema::Fixed(fixed) => {
if let Some(logical_type) = fixed.attributes.get(LOGICAL_TYPE) {
let logical_type = logical_type.as_str().ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
"logicalType in attributes of avro schema is not a string type",
)
})?;
match logical_type {
UUID_LOGICAL_TYPE => Type::Primitive(PrimitiveType::Uuid),
ty => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! We should add more checks here I think. I will do it later.

return Err(Error::new(
ErrorKind::FeatureUnsupported,
format!(
"Logical type {ty} is not support in iceberg primitive type.",
),
))
}
}
} else {
Type::Primitive(PrimitiveType::Fixed(fixed.size as u64))
}
}
AvroSchema::Bytes => Type::Primitive(PrimitiveType::Binary),
AvroSchema::Null => return Ok(None),
_ => {
Expand Down Expand Up @@ -503,7 +536,6 @@ mod tests {
))
.unwrap();

println!("Input is {input}");
AvroSchema::parse_str(input.as_str()).unwrap()
}

Expand Down
26 changes: 26 additions & 0 deletions crates/iceberg/src/spec/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ pub struct StructType {
/// Lookup for index by field id
#[serde(skip_serializing)]
id_lookup: OnceLock<HashMap<i32, usize>>,
#[serde(skip_serializing)]
name_lookup: OnceLock<HashMap<String, usize>>,
}

impl<'de> Deserialize<'de> for StructType {
Expand Down Expand Up @@ -390,8 +392,10 @@ impl StructType {
Self {
fields,
id_lookup: OnceLock::new(),
name_lookup: OnceLock::new(),
}
}

/// Get struct field with certain id
pub fn field_by_id(&self, id: i32) -> Option<&NestedFieldRef> {
self.field_id_to_index(id).map(|idx| &self.fields[idx])
Expand All @@ -406,6 +410,25 @@ impl StructType {
.copied()
}

/// Get struct field with certain field name
pub fn field_by_name(&self, name: &str) -> Option<&NestedFieldRef> {
self.field_name_to_index(name).map(|idx| &self.fields[idx])
}

fn field_name_to_index(&self, name: &str) -> Option<usize> {
self.name_lookup
.get_or_init(|| {
HashMap::from_iter(
self.fields
.iter()
.enumerate()
.map(|(i, x)| (x.name.clone(), i)),
)
})
.get(name)
.copied()
}

/// Get fields.
pub fn fields(&self) -> &[NestedFieldRef] {
&self.fields
Expand Down Expand Up @@ -773,6 +796,7 @@ mod tests {
)
.into()],
id_lookup: OnceLock::default(),
name_lookup: OnceLock::default(),
}),
)
}
Expand Down Expand Up @@ -803,6 +827,7 @@ mod tests {
)
.into()],
id_lookup: OnceLock::default(),
name_lookup: OnceLock::default(),
}),
)
}
Expand Down Expand Up @@ -845,6 +870,7 @@ mod tests {
NestedField::optional(2, "data", Type::Primitive(PrimitiveType::Int)).into(),
],
id_lookup: HashMap::from([(1, 0), (2, 1)]).into(),
name_lookup: HashMap::from([("id".to_string(), 0), ("data".to_string(), 1)]).into(),
}),
)
}
Expand Down
Loading
Loading