Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add transform_literal #287

Merged
merged 3 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions crates/iceberg/src/spec/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,16 @@ impl Datum {
)),
}
}

/// Get the primitive literal from datum.
pub fn literal(&self) -> &PrimitiveLiteral {
&self.literal
}

/// Get the primitive type from datum.
pub fn data_type(&self) -> &PrimitiveType {
&self.r#type
}
}

/// Values present in iceberg type
Expand Down
226 changes: 214 additions & 12 deletions crates/iceberg/src/transform/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use std::sync::Arc;
use arrow_array::ArrayRef;
use arrow_schema::{DataType, TimeUnit};

use crate::spec::{Datum, PrimitiveLiteral};

use super::TransformFunction;

#[derive(Debug)]
Expand All @@ -35,39 +37,47 @@ impl Bucket {

impl Bucket {
/// When switch the hash function, we only need to change this function.
#[inline]
fn hash_bytes(mut v: &[u8]) -> i32 {
murmur3::murmur3_32(&mut v, 0).unwrap() as i32
}

#[inline]
fn hash_int(v: i32) -> i32 {
Self::hash_long(v as i64)
}

#[inline]
fn hash_long(v: i64) -> i32 {
Self::hash_bytes(v.to_le_bytes().as_slice())
}

/// v is days from unix epoch
#[inline]
fn hash_date(v: i32) -> i32 {
Self::hash_int(v)
}

/// v is microseconds from midnight
#[inline]
fn hash_time(v: i64) -> i32 {
Self::hash_long(v)
}

/// v is microseconds from unix epoch
#[inline]
fn hash_timestamp(v: i64) -> i32 {
Self::hash_long(v)
}

#[inline]
fn hash_str(s: &str) -> i32 {
Self::hash_bytes(s.as_bytes())
}

/// Decimal values are hashed using the minimum number of bytes required to hold the unscaled value as a two’s complement big-endian
/// ref: https://iceberg.apache.org/spec/#appendix-b-32-bit-hash-requirements
#[inline]
fn hash_decimal(v: i128) -> i32 {
let bytes = v.to_be_bytes();
if let Some(start) = bytes.iter().position(|&x| x != 0) {
Expand All @@ -79,9 +89,50 @@ impl Bucket {

/// def bucket_N(x) = (murmur3_x86_32_hash(x) & Integer.MAX_VALUE) % N
/// ref: https://iceberg.apache.org/spec/#partitioning
#[inline]
fn bucket_n(&self, v: i32) -> i32 {
(v & i32::MAX) % (self.mod_n as i32)
}

#[inline]
fn bucket_int(&self, v: i32) -> i32 {
self.bucket_n(Self::hash_int(v))
}

#[inline]
fn bucket_long(&self, v: i64) -> i32 {
self.bucket_n(Self::hash_long(v))
}

#[inline]
fn bucket_decimal(&self, v: i128) -> i32 {
self.bucket_n(Self::hash_decimal(v))
}

#[inline]
fn bucket_date(&self, v: i32) -> i32 {
self.bucket_n(Self::hash_date(v))
}

#[inline]
fn bucket_time(&self, v: i64) -> i32 {
self.bucket_n(Self::hash_time(v))
}

#[inline]
fn bucket_timestamp(&self, v: i64) -> i32 {
self.bucket_n(Self::hash_timestamp(v))
}

#[inline]
fn bucket_str(&self, v: &str) -> i32 {
self.bucket_n(Self::hash_str(v))
}

#[inline]
fn bucket_bytes(&self, v: &[u8]) -> i32 {
self.bucket_n(Self::hash_bytes(v))
}
}

impl TransformFunction for Bucket {
Expand All @@ -91,82 +142,117 @@ impl TransformFunction for Bucket {
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.unwrap()
.unary(|v| self.bucket_n(Self::hash_int(v))),
.unary(|v| self.bucket_int(v)),
DataType::Int64 => input
.as_any()
.downcast_ref::<arrow_array::Int64Array>()
.unwrap()
.unary(|v| self.bucket_n(Self::hash_long(v))),
.unary(|v| self.bucket_long(v)),
DataType::Decimal128(_, _) => input
.as_any()
.downcast_ref::<arrow_array::Decimal128Array>()
.unwrap()
.unary(|v| self.bucket_n(Self::hash_decimal(v))),
.unary(|v| self.bucket_decimal(v)),
DataType::Date32 => input
.as_any()
.downcast_ref::<arrow_array::Date32Array>()
.unwrap()
.unary(|v| self.bucket_n(Self::hash_date(v))),
.unary(|v| self.bucket_date(v)),
DataType::Time64(TimeUnit::Microsecond) => input
.as_any()
.downcast_ref::<arrow_array::Time64MicrosecondArray>()
.unwrap()
.unary(|v| self.bucket_n(Self::hash_time(v))),
.unary(|v| self.bucket_time(v)),
DataType::Timestamp(TimeUnit::Microsecond, _) => input
.as_any()
.downcast_ref::<arrow_array::TimestampMicrosecondArray>()
.unwrap()
.unary(|v| self.bucket_n(Self::hash_timestamp(v))),
.unary(|v| self.bucket_timestamp(v)),
DataType::Utf8 => arrow_array::Int32Array::from_iter(
input
.as_any()
.downcast_ref::<arrow_array::StringArray>()
.unwrap()
.iter()
.map(|v| self.bucket_n(Self::hash_str(v.unwrap()))),
.map(|v| v.map(|v| self.bucket_str(v))),
),
DataType::LargeUtf8 => arrow_array::Int32Array::from_iter(
input
.as_any()
.downcast_ref::<arrow_array::LargeStringArray>()
.unwrap()
.iter()
.map(|v| self.bucket_n(Self::hash_str(v.unwrap()))),
.map(|v| v.map(|v| self.bucket_str(v))),
),
DataType::Binary => arrow_array::Int32Array::from_iter(
input
.as_any()
.downcast_ref::<arrow_array::BinaryArray>()
.unwrap()
.iter()
.map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))),
.map(|v| v.map(|v| self.bucket_bytes(v))),
),
DataType::LargeBinary => arrow_array::Int32Array::from_iter(
input
.as_any()
.downcast_ref::<arrow_array::LargeBinaryArray>()
.unwrap()
.iter()
.map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))),
.map(|v| v.map(|v| self.bucket_bytes(v))),
),
DataType::FixedSizeBinary(_) => arrow_array::Int32Array::from_iter(
input
.as_any()
.downcast_ref::<arrow_array::FixedSizeBinaryArray>()
.unwrap()
.iter()
.map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))),
.map(|v| v.map(|v| self.bucket_bytes(v))),
),
_ => unreachable!("Unsupported data type: {:?}", input.data_type()),
_ => {
return Err(crate::Error::new(
crate::ErrorKind::FeatureUnsupported,
format!(
"Unsupported data type for bucket transform: {:?}",
input.data_type()
),
))
}
};
Ok(Arc::new(res))
}

fn transform_literal(&self, input: &Datum) -> crate::Result<Option<Datum>> {
let val = match input.literal() {
PrimitiveLiteral::Int(v) => self.bucket_int(*v),
PrimitiveLiteral::Long(v) => self.bucket_long(*v),
PrimitiveLiteral::Decimal(v) => self.bucket_decimal(*v),
PrimitiveLiteral::Date(v) => self.bucket_date(*v),
PrimitiveLiteral::Time(v) => self.bucket_time(*v),
PrimitiveLiteral::Timestamp(v) => self.bucket_timestamp(*v),
PrimitiveLiteral::String(v) => self.bucket_str(v.as_str()),
PrimitiveLiteral::UUID(v) => self.bucket_bytes(v.as_ref()),
PrimitiveLiteral::Binary(v) => self.bucket_bytes(v.as_ref()),
PrimitiveLiteral::Fixed(v) => self.bucket_bytes(v.as_ref()),
_ => {
return Err(crate::Error::new(
crate::ErrorKind::FeatureUnsupported,
format!(
"Unsupported data type for bucket transform: {:?}",
input.data_type()
),
))
}
};
Ok(Some(Datum::int(val)))
}
}

#[cfg(test)]
mod test {
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};

use crate::{spec::Datum, transform::TransformFunction};

use super::Bucket;
#[test]
fn test_hash() {
Expand Down Expand Up @@ -242,4 +328,120 @@ mod test {
-188683207
);
}

#[test]
fn test_int_literal() {
let bucket = Bucket::new(10);
assert_eq!(
bucket.transform_literal(&Datum::int(34)).unwrap().unwrap(),
Datum::int(9)
);
}

#[test]
fn test_long_literal() {
let bucket = Bucket::new(10);
assert_eq!(
bucket.transform_literal(&Datum::long(34)).unwrap().unwrap(),
Datum::int(9)
);
}

#[test]
fn test_decimal_literal() {
let bucket = Bucket::new(10);
assert_eq!(
bucket
.transform_literal(&Datum::decimal(1420).unwrap())
.unwrap()
.unwrap(),
Datum::int(9)
);
}

#[test]
fn test_date_literal() {
let bucket = Bucket::new(100);
assert_eq!(
bucket
.transform_literal(&Datum::date(17486))
.unwrap()
.unwrap(),
Datum::int(26)
);
}

#[test]
fn test_time_literal() {
let bucket = Bucket::new(100);
assert_eq!(
bucket
.transform_literal(&Datum::time_micros(81068000000).unwrap())
.unwrap()
.unwrap(),
Datum::int(59)
);
}

#[test]
fn test_timestamp_literal() {
let bucket = Bucket::new(100);
assert_eq!(
bucket
.transform_literal(&Datum::timestamp_micros(1510871468000000))
.unwrap()
.unwrap(),
Datum::int(7)
);
}

#[test]
fn test_str_literal() {
let bucket = Bucket::new(100);
assert_eq!(
bucket
.transform_literal(&Datum::string("iceberg"))
.unwrap()
.unwrap(),
Datum::int(89)
);
}

#[test]
fn test_uuid_literal() {
let bucket = Bucket::new(100);
assert_eq!(
bucket
.transform_literal(&Datum::uuid(
"F79C3E09-677C-4BBD-A479-3F349CB785E7".parse().unwrap()
))
.unwrap()
.unwrap(),
Datum::int(40)
);
}

#[test]
fn test_binary_literal() {
let bucket = Bucket::new(128);
assert_eq!(
bucket
.transform_literal(&Datum::binary(b"\x00\x01\x02\x03".to_vec()))
.unwrap()
.unwrap(),
Datum::int(57)
);
}

#[test]
fn test_fixed_literal() {
let bucket = Bucket::new(128);
assert_eq!(
bucket
.transform_literal(&Datum::fixed(b"foo".to_vec()))
.unwrap()
.unwrap(),
Datum::int(32)
);
}
}
4 changes: 4 additions & 0 deletions crates/iceberg/src/transform/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ impl TransformFunction for Identity {
fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
Ok(input)
}

fn transform_literal(&self, input: &crate::spec::Datum) -> Result<Option<crate::spec::Datum>> {
Ok(Some(input.clone()))
}
}
Loading
Loading