From 4ee05b4d6a3101af424de7e44a8d3eeb63e688bd Mon Sep 17 00:00:00 2001 From: JanKaul Date: Wed, 9 Aug 2023 12:52:36 +0200 Subject: [PATCH] feat: Implement Iceberg values (#20) * implement values * improve getters * fix clippy warnings * fix clippy warnings * change into bytebuf to from * add license header * Use Long instead of LongInt Co-authored-by: Renjie Liu * use more general error kind * use Naivetime * use naivedate * use naivedatetime * fix clippy warnings * use uuid * use orderedfloat * fix clippy warnings * fix tests * use datatime utz * fix docs * rename value to literal * introduce primitive literal * remove length from fixed * serialize json via serde_json value * remove derive serialize/deserialize * implement From Literal for ByteBuf * implement From Literal for JsonValue * fix From Literal for JsonValue * implement struct * fix clippy warnings * add avro tests for some types * fix clippy warnings * fix nested field * fix nested field * implement list test * implement map test * fix error * fix clippy warnings * change timestamps to int/long * convert nulls to None * add tests for null * null test for struct * fix clippy warning * convert json null to option --------- Co-authored-by: Renjie Liu --- crates/iceberg/Cargo.toml | 5 + crates/iceberg/src/error.rs | 24 + crates/iceberg/src/spec/datatypes.rs | 5 + crates/iceberg/src/spec/mod.rs | 1 + crates/iceberg/src/spec/values.rs | 964 +++++++++++++++++++++++++++ 5 files changed, 999 insertions(+) create mode 100644 crates/iceberg/src/spec/values.rs diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 72c10c7cb..e1b5de2cb 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -34,6 +34,11 @@ serde_json = "^1.0" serde_derive = "^1.0" anyhow = "1.0.72" once_cell = "1" +rust_decimal = "1.31.0" +chrono = "0.4" +uuid = "1.4.1" +ordered-float = "3.7.0" +bitvec = "1.0.1" [dev-dependencies] pretty_assertions = "1.4.0" diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index c64d35193..6cb41db79 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -259,6 +259,30 @@ define_from_err!( "handling invalid utf-8 characters" ); +define_from_err!( + std::array::TryFromSliceError, + ErrorKind::DataInvalid, + "failed to convert byte slive to array" +); + +define_from_err!( + std::num::TryFromIntError, + ErrorKind::DataInvalid, + "failed to convert integer" +); + +define_from_err!( + chrono::ParseError, + ErrorKind::DataInvalid, + "Failed to parse string to date or time" +); + +define_from_err!( + uuid::Error, + ErrorKind::DataInvalid, + "Failed to convert between uuid und iceberg value" +); + #[cfg(test)] mod tests { use anyhow::anyhow; diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index 301415cef..5750ab968 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -22,6 +22,7 @@ use ::serde::de::{MapAccess, Visitor}; use serde::de::{Error, IntoDeserializer}; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; use std::cell::OnceCell; +use std::slice::Iter; use std::{collections::HashMap, fmt, ops::Index}; /// Field name for list type. @@ -293,6 +294,10 @@ impl StructType { .get(&field_id) .copied() } + /// Returns an iteratorr over the struct fields + pub fn iter(&self) -> Iter { + self.fields.iter() + } } impl PartialEq for StructType { diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs index 3b998a773..eb66abcdf 100644 --- a/crates/iceberg/src/spec/mod.rs +++ b/crates/iceberg/src/spec/mod.rs @@ -19,3 +19,4 @@ pub mod datatypes; pub mod schema; +pub mod values; diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs new file mode 100644 index 000000000..771ffd88e --- /dev/null +++ b/crates/iceberg/src/spec/values.rs @@ -0,0 +1,964 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/*! + * Value in iceberg + */ + +use std::{any::Any, collections::BTreeMap}; + +use bitvec::vec::BitVec; +use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; +use ordered_float::OrderedFloat; +use rust_decimal::Decimal; +use serde_bytes::ByteBuf; +use serde_json::{Map as JsonMap, Number, Value as JsonValue}; +use uuid::Uuid; + +use crate::{Error, ErrorKind}; + +use super::datatypes::{PrimitiveType, Type}; + +/// Values present in iceberg type +#[derive(Clone, Debug, PartialEq, Hash, Eq, PartialOrd, Ord)] +pub enum PrimitiveLiteral { + /// 0x00 for false, non-zero byte for true + Boolean(bool), + /// Stored as 4-byte little-endian + Int(i32), + /// Stored as 8-byte little-endian + Long(i64), + /// Stored as 4-byte little-endian + Float(OrderedFloat), + /// Stored as 8-byte little-endian + Double(OrderedFloat), + /// Stores days from the 1970-01-01 in an 4-byte little-endian int + Date(i32), + /// Stores microseconds from midnight in an 8-byte little-endian long + Time(i64), + /// Timestamp without timezone + Timestamp(i64), + /// Timestamp with timezone + TimestampTZ(i64), + /// UTF-8 bytes (without length) + String(String), + /// 16-byte big-endian value + UUID(Uuid), + /// Binary value + Fixed(Vec), + /// Binary value (without length) + Binary(Vec), + /// Stores unscaled value as two’s-complement big-endian binary, + /// using the minimum number of bytes for the value + Decimal(Decimal), +} + +/// Values present in iceberg type +#[derive(Clone, Debug, PartialEq, Hash, Eq, PartialOrd, Ord)] +pub enum Literal { + /// A primitive value + Primitive(PrimitiveLiteral), + /// A struct is a tuple of typed values. Each field in the tuple is named and has an integer id that is unique in the table schema. + /// Each field can be either optional or required, meaning that values can (or cannot) be null. Fields may be any type. + /// Fields may have an optional comment or doc string. Fields can have default values. + Struct(Struct), + /// A list is a collection of values with some element type. + /// The element field has an integer id that is unique in the table schema. + /// Elements can be either optional or required. Element types may be any type. + List(Vec>), + /// A map is a collection of key-value pairs with a key type and a value type. + /// Both the key field and value field each have an integer id that is unique in the table schema. + /// Map keys are required and map values can be either optional or required. Both map keys and map values may be any type, including nested types. + Map(BTreeMap>), +} + +impl From for ByteBuf { + fn from(value: Literal) -> Self { + match value { + Literal::Primitive(prim) => match prim { + PrimitiveLiteral::Boolean(val) => { + if val { + ByteBuf::from([0u8]) + } else { + ByteBuf::from([1u8]) + } + } + PrimitiveLiteral::Int(val) => ByteBuf::from(val.to_le_bytes()), + PrimitiveLiteral::Long(val) => ByteBuf::from(val.to_le_bytes()), + PrimitiveLiteral::Float(val) => ByteBuf::from(val.to_le_bytes()), + PrimitiveLiteral::Double(val) => ByteBuf::from(val.to_le_bytes()), + PrimitiveLiteral::Date(val) => ByteBuf::from(val.to_le_bytes()), + PrimitiveLiteral::Time(val) => ByteBuf::from(val.to_le_bytes()), + PrimitiveLiteral::Timestamp(val) => ByteBuf::from(val.to_le_bytes()), + PrimitiveLiteral::TimestampTZ(val) => ByteBuf::from(val.to_le_bytes()), + PrimitiveLiteral::String(val) => ByteBuf::from(val.as_bytes()), + PrimitiveLiteral::UUID(val) => ByteBuf::from(val.as_u128().to_be_bytes()), + PrimitiveLiteral::Fixed(val) => ByteBuf::from(val), + PrimitiveLiteral::Binary(val) => ByteBuf::from(val), + PrimitiveLiteral::Decimal(_) => todo!(), + }, + _ => unimplemented!(), + } + } +} + +impl From<&Literal> for JsonValue { + fn from(value: &Literal) -> Self { + match value { + Literal::Primitive(prim) => match prim { + PrimitiveLiteral::Boolean(val) => JsonValue::Bool(*val), + PrimitiveLiteral::Int(val) => JsonValue::Number((*val).into()), + PrimitiveLiteral::Long(val) => JsonValue::Number((*val).into()), + PrimitiveLiteral::Float(val) => match Number::from_f64(val.0 as f64) { + Some(number) => JsonValue::Number(number), + None => JsonValue::Null, + }, + PrimitiveLiteral::Double(val) => match Number::from_f64(val.0) { + Some(number) => JsonValue::Number(number), + None => JsonValue::Null, + }, + PrimitiveLiteral::Date(val) => { + JsonValue::String(date::days_to_date(*val).to_string()) + } + PrimitiveLiteral::Time(val) => { + JsonValue::String(time::microseconds_to_time(*val).to_string()) + } + PrimitiveLiteral::Timestamp(val) => JsonValue::String( + timestamp::microseconds_to_datetime(*val) + .format("%Y-%m-%dT%H:%M:%S%.f") + .to_string(), + ), + PrimitiveLiteral::TimestampTZ(val) => JsonValue::String( + timestamptz::microseconds_to_datetimetz(*val) + .format("%Y-%m-%dT%H:%M:%S%.f+00:00") + .to_string(), + ), + PrimitiveLiteral::String(val) => JsonValue::String(val.clone()), + PrimitiveLiteral::UUID(val) => JsonValue::String(val.to_string()), + PrimitiveLiteral::Fixed(val) => { + JsonValue::String(val.iter().fold(String::new(), |mut acc, x| { + acc.push_str(&format!("{:x}", x)); + acc + })) + } + PrimitiveLiteral::Binary(val) => { + JsonValue::String(val.iter().fold(String::new(), |mut acc, x| { + acc.push_str(&format!("{:x}", x)); + acc + })) + } + PrimitiveLiteral::Decimal(_) => todo!(), + }, + Literal::Struct(s) => { + JsonValue::Object(JsonMap::from_iter(s.iter().map(|(id, value, _)| { + let json: JsonValue = match value { + Some(val) => val.into(), + None => JsonValue::Null, + }; + (id.to_string(), json) + }))) + } + Literal::List(list) => JsonValue::Array( + list.iter() + .map(|opt| match opt { + Some(literal) => literal.into(), + None => JsonValue::Null, + }) + .collect(), + ), + Literal::Map(map) => { + let mut object = JsonMap::with_capacity(2); + object.insert( + "keys".to_string(), + JsonValue::Array(map.keys().map(|literal| literal.into()).collect()), + ); + object.insert( + "values".to_string(), + JsonValue::Array( + map.values() + .map(|literal| match literal { + Some(literal) => literal.into(), + None => JsonValue::Null, + }) + .collect(), + ), + ); + JsonValue::Object(object) + } + } + } +} + +/// The partition struct stores the tuple of partition values for each file. +/// Its type is derived from the partition fields of the partition spec used to write the manifest file. +/// In v2, the partition struct’s field ids must match the ids from the partition spec. +#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)] +pub struct Struct { + /// Vector to store the field values + fields: Vec, + /// Vector to store the field ids + field_ids: Vec, + /// Vector to store the field names + field_names: Vec, + /// Null bitmap + null_bitmap: BitVec, +} + +impl Struct { + /// Create a iterator to read the field in order of (field_id, field_value, field_name). + pub fn iter(&self) -> impl Iterator, &str)> { + self.null_bitmap + .iter() + .zip(self.fields.iter()) + .zip(self.field_ids.iter()) + .zip(self.field_names.iter()) + .map(|(((null, value), id), name)| { + (id, if *null { None } else { Some(value) }, name.as_str()) + }) + } +} + +impl FromIterator<(i32, Option, String)> for Struct { + fn from_iter, String)>>(iter: I) -> Self { + let mut fields = Vec::new(); + let mut field_ids = Vec::new(); + let mut field_names = Vec::new(); + let mut null_bitmap = BitVec::new(); + + for (id, value, name) in iter.into_iter() { + field_ids.push(id); + field_names.push(name); + match value { + Some(value) => { + fields.push(value); + null_bitmap.push(false) + } + None => { + fields.push(Literal::Primitive(PrimitiveLiteral::Boolean(false))); + null_bitmap.push(true) + } + } + } + Struct { + fields, + field_ids, + field_names, + null_bitmap, + } + } +} + +impl Literal { + /// Create iceberg value from bytes + pub fn try_from_bytes(bytes: &[u8], data_type: &Type) -> Result { + match data_type { + Type::Primitive(primitive) => match primitive { + PrimitiveType::Boolean => { + if bytes.len() == 1 && bytes[0] == 0u8 { + Ok(Literal::Primitive(PrimitiveLiteral::Boolean(false))) + } else { + Ok(Literal::Primitive(PrimitiveLiteral::Boolean(true))) + } + } + PrimitiveType::Int => Ok(Literal::Primitive(PrimitiveLiteral::Int( + i32::from_le_bytes(bytes.try_into()?), + ))), + PrimitiveType::Long => Ok(Literal::Primitive(PrimitiveLiteral::Long( + i64::from_le_bytes(bytes.try_into()?), + ))), + PrimitiveType::Float => Ok(Literal::Primitive(PrimitiveLiteral::Float( + OrderedFloat(f32::from_le_bytes(bytes.try_into()?)), + ))), + PrimitiveType::Double => Ok(Literal::Primitive(PrimitiveLiteral::Double( + OrderedFloat(f64::from_le_bytes(bytes.try_into()?)), + ))), + PrimitiveType::Date => Ok(Literal::Primitive(PrimitiveLiteral::Date( + i32::from_le_bytes(bytes.try_into()?), + ))), + PrimitiveType::Time => Ok(Literal::Primitive(PrimitiveLiteral::Time( + i64::from_le_bytes(bytes.try_into()?), + ))), + PrimitiveType::Timestamp => Ok(Literal::Primitive(PrimitiveLiteral::Timestamp( + i64::from_le_bytes(bytes.try_into()?), + ))), + PrimitiveType::Timestamptz => Ok(Literal::Primitive( + PrimitiveLiteral::TimestampTZ(i64::from_le_bytes(bytes.try_into()?)), + )), + PrimitiveType::String => Ok(Literal::Primitive(PrimitiveLiteral::String( + std::str::from_utf8(bytes)?.to_string(), + ))), + PrimitiveType::Uuid => Ok(Literal::Primitive(PrimitiveLiteral::UUID( + Uuid::from_u128(u128::from_be_bytes(bytes.try_into()?)), + ))), + PrimitiveType::Fixed(_) => Ok(Literal::Primitive(PrimitiveLiteral::Fixed( + Vec::from(bytes), + ))), + PrimitiveType::Binary => Ok(Literal::Primitive(PrimitiveLiteral::Binary( + Vec::from(bytes), + ))), + PrimitiveType::Decimal { + precision: _, + scale: _, + } => todo!(), + }, + _ => Err(Error::new( + crate::ErrorKind::DataInvalid, + "Converting bytes to non-primitive types is not supported.", + )), + } + } + + /// Create iceberg value from a json value + pub fn try_from_json(value: JsonValue, data_type: &Type) -> Result, Error> { + match data_type { + Type::Primitive(primitive) => match (primitive, value) { + (PrimitiveType::Boolean, JsonValue::Bool(bool)) => { + Ok(Some(Literal::Primitive(PrimitiveLiteral::Boolean(bool)))) + } + (PrimitiveType::Int, JsonValue::Number(number)) => { + Ok(Some(Literal::Primitive(PrimitiveLiteral::Int( + number + .as_i64() + .ok_or(Error::new( + crate::ErrorKind::DataInvalid, + "Failed to convert json number to int", + ))? + .try_into()?, + )))) + } + (PrimitiveType::Long, JsonValue::Number(number)) => Ok(Some(Literal::Primitive( + PrimitiveLiteral::Long(number.as_i64().ok_or(Error::new( + crate::ErrorKind::DataInvalid, + "Failed to convert json number to long", + ))?), + ))), + (PrimitiveType::Float, JsonValue::Number(number)) => Ok(Some(Literal::Primitive( + PrimitiveLiteral::Float(OrderedFloat(number.as_f64().ok_or(Error::new( + crate::ErrorKind::DataInvalid, + "Failed to convert json number to float", + ))? as f32)), + ))), + (PrimitiveType::Double, JsonValue::Number(number)) => Ok(Some(Literal::Primitive( + PrimitiveLiteral::Double(OrderedFloat(number.as_f64().ok_or(Error::new( + crate::ErrorKind::DataInvalid, + "Failed to convert json number to double", + ))?)), + ))), + (PrimitiveType::Date, JsonValue::String(s)) => { + Ok(Some(Literal::Primitive(PrimitiveLiteral::Date( + date::date_to_days(&NaiveDate::parse_from_str(&s, "%Y-%m-%d")?), + )))) + } + (PrimitiveType::Time, JsonValue::String(s)) => { + Ok(Some(Literal::Primitive(PrimitiveLiteral::Time( + time::time_to_microseconds(&NaiveTime::parse_from_str(&s, "%H:%M:%S%.f")?), + )))) + } + (PrimitiveType::Timestamp, JsonValue::String(s)) => Ok(Some(Literal::Primitive( + PrimitiveLiteral::Timestamp(timestamp::datetime_to_microseconds( + &NaiveDateTime::parse_from_str(&s, "%Y-%m-%dT%H:%M:%S%.f")?, + )), + ))), + (PrimitiveType::Timestamptz, JsonValue::String(s)) => { + Ok(Some(Literal::Primitive(PrimitiveLiteral::TimestampTZ( + timestamptz::datetimetz_to_microseconds(&DateTime::from_utc( + NaiveDateTime::parse_from_str(&s, "%Y-%m-%dT%H:%M:%S%.f+00:00")?, + Utc, + )), + )))) + } + (PrimitiveType::String, JsonValue::String(s)) => { + Ok(Some(Literal::Primitive(PrimitiveLiteral::String(s)))) + } + (PrimitiveType::Uuid, JsonValue::String(s)) => Ok(Some(Literal::Primitive( + PrimitiveLiteral::UUID(Uuid::parse_str(&s)?), + ))), + (PrimitiveType::Fixed(_), JsonValue::String(_)) => todo!(), + (PrimitiveType::Binary, JsonValue::String(_)) => todo!(), + ( + PrimitiveType::Decimal { + precision: _, + scale: _, + }, + JsonValue::String(_), + ) => todo!(), + (_, JsonValue::Null) => Ok(None), + (i, j) => Err(Error::new( + crate::ErrorKind::DataInvalid, + format!( + "The json value {} doesn't fit to the iceberg type {}.", + j, i + ), + )), + }, + Type::Struct(schema) => { + if let JsonValue::Object(mut object) = value { + Ok(Some(Literal::Struct(Struct::from_iter(schema.iter().map( + |field| { + ( + field.id, + object.remove(&field.id.to_string()).and_then(|value| { + Literal::try_from_json(value, &field.field_type) + .and_then(|value| { + value.ok_or(Error::new( + ErrorKind::DataInvalid, + "Key of map cannot be null", + )) + }) + .ok() + }), + field.name.clone(), + ) + }, + ))))) + } else { + Err(Error::new( + crate::ErrorKind::DataInvalid, + "The json value for a struct type must be an object.", + )) + } + } + Type::List(list) => { + if let JsonValue::Array(array) = value { + Ok(Some(Literal::List( + array + .into_iter() + .map(|value| { + Literal::try_from_json(value, &list.element_field.field_type) + }) + .collect::, Error>>()?, + ))) + } else { + Err(Error::new( + crate::ErrorKind::DataInvalid, + "The json value for a list type must be an array.", + )) + } + } + Type::Map(map) => { + if let JsonValue::Object(mut object) = value { + if let (Some(JsonValue::Array(keys)), Some(JsonValue::Array(values))) = + (object.remove("keys"), object.remove("values")) + { + Ok(Some(Literal::Map(BTreeMap::from_iter( + keys.into_iter() + .zip(values.into_iter()) + .map(|(key, value)| { + Ok(( + Literal::try_from_json(key, &map.key_field.field_type) + .and_then(|value| { + value.ok_or(Error::new( + ErrorKind::DataInvalid, + "Key of map cannot be null", + )) + })?, + Literal::try_from_json(value, &map.value_field.field_type)?, + )) + }) + .collect::, Error>>()? + .into_iter(), + )))) + } else { + Err(Error::new( + crate::ErrorKind::DataInvalid, + "The json value for a list type must be an array.", + )) + } + } else { + Err(Error::new( + crate::ErrorKind::DataInvalid, + "The json value for a list type must be an array.", + )) + } + } + } + } + + /// Get datatype of value + pub fn datatype(&self) -> Type { + match self { + Literal::Primitive(prim) => match prim { + PrimitiveLiteral::Boolean(_) => Type::Primitive(PrimitiveType::Boolean), + PrimitiveLiteral::Int(_) => Type::Primitive(PrimitiveType::Int), + PrimitiveLiteral::Long(_) => Type::Primitive(PrimitiveType::Long), + PrimitiveLiteral::Float(_) => Type::Primitive(PrimitiveType::Float), + PrimitiveLiteral::Double(_) => Type::Primitive(PrimitiveType::Double), + PrimitiveLiteral::Date(_) => Type::Primitive(PrimitiveType::Date), + PrimitiveLiteral::Time(_) => Type::Primitive(PrimitiveType::Time), + PrimitiveLiteral::Timestamp(_) => Type::Primitive(PrimitiveType::Timestamp), + PrimitiveLiteral::TimestampTZ(_) => Type::Primitive(PrimitiveType::Timestamptz), + PrimitiveLiteral::Fixed(vec) => { + Type::Primitive(PrimitiveType::Fixed(vec.len() as u64)) + } + PrimitiveLiteral::Binary(_) => Type::Primitive(PrimitiveType::Binary), + PrimitiveLiteral::String(_) => Type::Primitive(PrimitiveType::String), + PrimitiveLiteral::UUID(_) => Type::Primitive(PrimitiveType::Uuid), + PrimitiveLiteral::Decimal(dec) => Type::Primitive(PrimitiveType::Decimal { + precision: 38, + scale: dec.scale(), + }), + }, + _ => unimplemented!(), + } + } + + /// Convert Value to the any type + pub fn into_any(self) -> Box { + match self { + Literal::Primitive(prim) => match prim { + PrimitiveLiteral::Boolean(any) => Box::new(any), + PrimitiveLiteral::Int(any) => Box::new(any), + PrimitiveLiteral::Long(any) => Box::new(any), + PrimitiveLiteral::Float(any) => Box::new(any), + PrimitiveLiteral::Double(any) => Box::new(any), + PrimitiveLiteral::Date(any) => Box::new(any), + PrimitiveLiteral::Time(any) => Box::new(any), + PrimitiveLiteral::Timestamp(any) => Box::new(any), + PrimitiveLiteral::TimestampTZ(any) => Box::new(any), + PrimitiveLiteral::Fixed(any) => Box::new(any), + PrimitiveLiteral::Binary(any) => Box::new(any), + PrimitiveLiteral::String(any) => Box::new(any), + PrimitiveLiteral::UUID(any) => Box::new(any), + PrimitiveLiteral::Decimal(any) => Box::new(any), + }, + _ => unimplemented!(), + } + } +} + +mod date { + use chrono::{NaiveDate, NaiveDateTime}; + + pub(crate) fn date_to_days(date: &NaiveDate) -> i32 { + date.signed_duration_since( + // This is always the same and shouldn't fail + NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(), + ) + .num_days() as i32 + } + + pub(crate) fn days_to_date(days: i32) -> NaiveDate { + // This shouldn't fail until the year 262000 + NaiveDateTime::from_timestamp_opt(days as i64 * 86_400, 0) + .unwrap() + .date() + } +} + +mod time { + use chrono::NaiveTime; + + pub(crate) fn time_to_microseconds(time: &NaiveTime) -> i64 { + time.signed_duration_since( + // This is always the same and shouldn't fail + NaiveTime::from_num_seconds_from_midnight_opt(0, 0).unwrap(), + ) + .num_microseconds() + .unwrap() + } + + pub(crate) fn microseconds_to_time(micros: i64) -> NaiveTime { + let (secs, rem) = (micros / 1_000_000, micros % 1_000_000); + + NaiveTime::from_num_seconds_from_midnight_opt(secs as u32, rem as u32 * 1_000).unwrap() + } +} + +mod timestamp { + use chrono::NaiveDateTime; + + pub(crate) fn datetime_to_microseconds(time: &NaiveDateTime) -> i64 { + time.timestamp_micros() + } + + pub(crate) fn microseconds_to_datetime(micros: i64) -> NaiveDateTime { + let (secs, rem) = (micros / 1_000_000, micros % 1_000_000); + + // This shouldn't fail until the year 262000 + NaiveDateTime::from_timestamp_opt(secs, rem as u32 * 1_000).unwrap() + } +} + +mod timestamptz { + use chrono::{DateTime, NaiveDateTime, Utc}; + + pub(crate) fn datetimetz_to_microseconds(time: &DateTime) -> i64 { + time.timestamp_micros() + } + + pub(crate) fn microseconds_to_datetimetz(micros: i64) -> DateTime { + let (secs, rem) = (micros / 1_000_000, micros % 1_000_000); + + DateTime::::from_utc( + // This shouldn't fail until the year 262000 + NaiveDateTime::from_timestamp_opt(secs, rem as u32 * 1_000).unwrap(), + Utc, + ) + } +} + +#[cfg(test)] +mod tests { + + use crate::spec::datatypes::{ListType, MapType, NestedField, StructType}; + + use super::*; + + fn check_json_serde(json: &str, expected_literal: Literal, expected_type: &Type) { + let raw_json_value = serde_json::from_str::(json).unwrap(); + let desered_literal = + Literal::try_from_json(raw_json_value.clone(), expected_type).unwrap(); + assert_eq!(desered_literal, Some(expected_literal.clone())); + + let expected_json_value: JsonValue = (&expected_literal).into(); + let sered_json = serde_json::to_string(&expected_json_value).unwrap(); + let parsed_json_value = serde_json::from_str::(&sered_json).unwrap(); + + assert_eq!(parsed_json_value, raw_json_value); + } + + fn check_avro_bytes_serde(input: Vec, expected_literal: Literal, expected_type: &Type) { + let raw_schema = r#""bytes""#; + let schema = apache_avro::Schema::parse_str(raw_schema).unwrap(); + + let bytes = ByteBuf::from(input); + let literal = Literal::try_from_bytes(&bytes, expected_type).unwrap(); + assert_eq!(literal, expected_literal); + + let mut writer = apache_avro::Writer::new(&schema, Vec::new()); + writer.append_ser(bytes).unwrap(); + let encoded = writer.into_inner().unwrap(); + let reader = apache_avro::Reader::new(&*encoded).unwrap(); + + for record in reader { + let result = apache_avro::from_value::(&record.unwrap()).unwrap(); + let desered_literal = Literal::try_from_bytes(&result, expected_type).unwrap(); + assert_eq!(desered_literal, expected_literal); + } + } + + #[test] + fn json_boolean() { + let record = r#"true"#; + + check_json_serde( + record, + Literal::Primitive(PrimitiveLiteral::Boolean(true)), + &Type::Primitive(PrimitiveType::Boolean), + ); + } + + #[test] + fn json_int() { + let record = r#"32"#; + + check_json_serde( + record, + Literal::Primitive(PrimitiveLiteral::Int(32)), + &Type::Primitive(PrimitiveType::Int), + ); + } + + #[test] + fn json_long() { + let record = r#"32"#; + + check_json_serde( + record, + Literal::Primitive(PrimitiveLiteral::Long(32)), + &Type::Primitive(PrimitiveType::Long), + ); + } + + #[test] + fn json_float() { + let record = r#"1.0"#; + + check_json_serde( + record, + Literal::Primitive(PrimitiveLiteral::Float(OrderedFloat(1.0))), + &Type::Primitive(PrimitiveType::Float), + ); + } + + #[test] + fn json_double() { + let record = r#"1.0"#; + + check_json_serde( + record, + Literal::Primitive(PrimitiveLiteral::Double(OrderedFloat(1.0))), + &Type::Primitive(PrimitiveType::Double), + ); + } + + #[test] + fn json_date() { + let record = r#""2017-11-16""#; + + check_json_serde( + record, + Literal::Primitive(PrimitiveLiteral::Date(17486)), + &Type::Primitive(PrimitiveType::Date), + ); + } + + #[test] + fn json_time() { + let record = r#""22:31:08.123456""#; + + check_json_serde( + record, + Literal::Primitive(PrimitiveLiteral::Time(81068123456)), + &Type::Primitive(PrimitiveType::Time), + ); + } + + #[test] + fn json_timestamp() { + let record = r#""2017-11-16T22:31:08.123456""#; + + check_json_serde( + record, + Literal::Primitive(PrimitiveLiteral::Timestamp(1510871468123456)), + &Type::Primitive(PrimitiveType::Timestamp), + ); + } + + #[test] + fn json_timestamptz() { + let record = r#""2017-11-16T22:31:08.123456+00:00""#; + + check_json_serde( + record, + Literal::Primitive(PrimitiveLiteral::TimestampTZ(1510871468123456)), + &Type::Primitive(PrimitiveType::Timestamptz), + ); + } + + #[test] + fn json_string() { + let record = r#""iceberg""#; + + check_json_serde( + record, + Literal::Primitive(PrimitiveLiteral::String("iceberg".to_string())), + &Type::Primitive(PrimitiveType::String), + ); + } + + #[test] + fn json_uuid() { + let record = r#""f79c3e09-677c-4bbd-a479-3f349cb785e7""#; + + check_json_serde( + record, + Literal::Primitive(PrimitiveLiteral::UUID( + Uuid::parse_str("f79c3e09-677c-4bbd-a479-3f349cb785e7").unwrap(), + )), + &Type::Primitive(PrimitiveType::Uuid), + ); + } + + #[test] + fn json_struct() { + let record = r#"{"1": 1, "2": "bar", "3": null}"#; + + check_json_serde( + record, + Literal::Struct(Struct::from_iter(vec![ + ( + 1, + Some(Literal::Primitive(PrimitiveLiteral::Int(1))), + "id".to_string(), + ), + ( + 2, + Some(Literal::Primitive(PrimitiveLiteral::String( + "bar".to_string(), + ))), + "name".to_string(), + ), + (3, None, "address".to_string()), + ])), + &Type::Struct(StructType::new(vec![ + NestedField { + id: 1, + name: "id".to_string(), + required: true, + field_type: Box::new(Type::Primitive(PrimitiveType::Int)), + doc: None, + initial_default: None, + write_default: None, + }, + NestedField { + id: 2, + name: "name".to_string(), + required: false, + field_type: Box::new(Type::Primitive(PrimitiveType::String)), + doc: None, + initial_default: None, + write_default: None, + }, + NestedField { + id: 3, + name: "address".to_string(), + required: false, + field_type: Box::new(Type::Primitive(PrimitiveType::String)), + doc: None, + initial_default: None, + write_default: None, + }, + ])), + ); + } + + #[test] + fn json_list() { + let record = r#"[1, 2, 3, null]"#; + + check_json_serde( + record, + Literal::List(vec![ + Some(Literal::Primitive(PrimitiveLiteral::Int(1))), + Some(Literal::Primitive(PrimitiveLiteral::Int(2))), + Some(Literal::Primitive(PrimitiveLiteral::Int(3))), + None, + ]), + &Type::List(ListType { + element_field: NestedField { + id: 0, + name: "".to_string(), + required: true, + field_type: Box::new(Type::Primitive(PrimitiveType::Int)), + doc: None, + initial_default: None, + write_default: None, + }, + }), + ); + } + + #[test] + fn json_map() { + let record = r#"{ "keys": ["a", "b", "c"], "values": [1, 2, null] }"#; + + check_json_serde( + record, + Literal::Map(BTreeMap::from([ + ( + Literal::Primitive(PrimitiveLiteral::String("a".to_string())), + Some(Literal::Primitive(PrimitiveLiteral::Int(1))), + ), + ( + Literal::Primitive(PrimitiveLiteral::String("b".to_string())), + Some(Literal::Primitive(PrimitiveLiteral::Int(2))), + ), + ( + Literal::Primitive(PrimitiveLiteral::String("c".to_string())), + None, + ), + ])), + &Type::Map(MapType { + key_field: NestedField { + id: 0, + name: "key".to_string(), + required: true, + field_type: Box::new(Type::Primitive(PrimitiveType::String)), + doc: None, + initial_default: None, + write_default: None, + }, + value_field: NestedField { + id: 1, + name: "value".to_string(), + required: true, + field_type: Box::new(Type::Primitive(PrimitiveType::Int)), + doc: None, + initial_default: None, + write_default: None, + }, + }), + ); + } + + #[test] + fn avro_bytes_boolean() { + let bytes = vec![1u8]; + + check_avro_bytes_serde( + bytes, + Literal::Primitive(PrimitiveLiteral::Boolean(true)), + &Type::Primitive(PrimitiveType::Boolean), + ); + } + + #[test] + fn avro_bytes_int() { + let bytes = vec![32u8, 0u8, 0u8, 0u8]; + + check_avro_bytes_serde( + bytes, + Literal::Primitive(PrimitiveLiteral::Int(32)), + &Type::Primitive(PrimitiveType::Int), + ); + } + + #[test] + fn avro_bytes_long() { + let bytes = vec![32u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8]; + + check_avro_bytes_serde( + bytes, + Literal::Primitive(PrimitiveLiteral::Long(32)), + &Type::Primitive(PrimitiveType::Long), + ); + } + + #[test] + fn avro_bytes_float() { + let bytes = vec![0u8, 0u8, 128u8, 63u8]; + + check_avro_bytes_serde( + bytes, + Literal::Primitive(PrimitiveLiteral::Float(OrderedFloat(1.0))), + &Type::Primitive(PrimitiveType::Float), + ); + } + + #[test] + fn avro_bytes_double() { + let bytes = vec![0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 240u8, 63u8]; + + check_avro_bytes_serde( + bytes, + Literal::Primitive(PrimitiveLiteral::Double(OrderedFloat(1.0))), + &Type::Primitive(PrimitiveType::Double), + ); + } + + #[test] + fn avro_bytes_string() { + let bytes = vec![105u8, 99u8, 101u8, 98u8, 101u8, 114u8, 103u8]; + + check_avro_bytes_serde( + bytes, + Literal::Primitive(PrimitiveLiteral::String("iceberg".to_string())), + &Type::Primitive(PrimitiveType::String), + ); + } +}