From c92a38fe5b2a17b8f9a1cd29d181b99793bd26fc Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Tue, 1 Nov 2022 20:49:39 +1300 Subject: [PATCH] Cleanup hash_utils adding support for decimal256 and f16 (#4053) * Cleanup hash_utils * Lint * Clippy --- datafusion-cli/Cargo.lock | 3 + datafusion/core/Cargo.toml | 3 + .../core/src/physical_plan/hash_utils.rs | 548 +++--------------- 3 files changed, 87 insertions(+), 467 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 7c8a7139f73b..db724780e65a 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -547,6 +547,8 @@ version = "13.0.0" dependencies = [ "ahash 0.8.0", "arrow", + "arrow-buffer", + "arrow-schema", "async-compression", "async-trait", "bytes", @@ -561,6 +563,7 @@ dependencies = [ "flate2", "futures", "glob", + "half", "hashbrown", "itertools", "lazy_static", diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index f5fbf997412d..2764273ce9a6 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -57,6 +57,8 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } apache-avro = { version = "0.14", optional = true } arrow = { version = "25.0.0", features = ["prettyprint"] } +arrow-buffer = "25.0.0" +arrow-schema = "25.0.0" async-compression = { version = "0.3.14", features = ["bzip2", "gzip", "futures-io", "tokio"] } async-trait = "0.1.41" bytes = "1.1" @@ -72,6 +74,7 @@ datafusion-sql = { path = "../sql", version = "13.0.0" } flate2 = "1.0.24" futures = "0.3" glob = "0.3.0" +half = { version = "2.1", default-features = false } hashbrown = { version = "0.12", features = ["raw"] } itertools = "0.10" lazy_static = { version = "^1.4.0" } diff --git a/datafusion/core/src/physical_plan/hash_utils.rs b/datafusion/core/src/physical_plan/hash_utils.rs index 899008aa44da..75f5bab46f1e 100644 --- a/datafusion/core/src/physical_plan/hash_utils.rs +++ b/datafusion/core/src/physical_plan/hash_utils.rs @@ -19,17 +19,10 @@ use crate::error::{DataFusionError, Result}; use ahash::RandomState; -use arrow::array::{ - Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array, - DictionaryArray, FixedSizeBinaryArray, Float32Array, Float64Array, Int16Array, - Int32Array, Int64Array, Int8Array, LargeStringArray, StringArray, - TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, - TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, -}; -use arrow::datatypes::{ - ArrowDictionaryKeyType, ArrowNativeType, DataType, Int16Type, Int32Type, Int64Type, - Int8Type, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type, -}; +use arrow::array::*; +use arrow::datatypes::*; +use arrow::{downcast_dictionary_array, downcast_primitive_array}; +use arrow_buffer::i256; use std::sync::Arc; // Combines two hashes into one hash @@ -52,199 +45,98 @@ fn hash_null(random_state: &RandomState, hashes_buffer: &'_ mut [u64], mul_col: } } -fn hash_decimal128<'a>( - array: &ArrayRef, +trait HashValue { + fn hash_one(&self, state: &RandomState) -> u64; +} + +impl<'a, T: HashValue + ?Sized> HashValue for &'a T { + fn hash_one(&self, state: &RandomState) -> u64 { + T::hash_one(self, state) + } +} + +macro_rules! hash_value { + ($($t:ty),+) => { + $(impl HashValue for $t { + fn hash_one(&self, state: &RandomState) -> u64 { + state.hash_one(self) + } + })+ + }; +} +hash_value!(i8, i16, i32, i64, i128, i256, u8, u16, u32, u64); +hash_value!(bool, str, [u8]); + +macro_rules! hash_float_value { + ($($t:ty),+) => { + $(impl HashValue for $t { + fn hash_one(&self, state: &RandomState) -> u64 { + state.hash_one(self.to_le_bytes()) + } + })+ + }; +} +hash_float_value!(half::f16, f32, f64); + +fn hash_array( + array: T, random_state: &RandomState, - hashes_buffer: &'a mut [u64], - mul_col: bool, -) { - let array = array.as_any().downcast_ref::().unwrap(); + hashes_buffer: &mut [u64], + multi_col: bool, +) where + T: ArrayAccessor, + T::Item: HashValue, +{ if array.null_count() == 0 { - if mul_col { + if multi_col { for (i, hash) in hashes_buffer.iter_mut().enumerate() { - *hash = combine_hashes(random_state.hash_one(&array.value(i)), *hash); + *hash = combine_hashes(array.value(i).hash_one(random_state), *hash); } } else { for (i, hash) in hashes_buffer.iter_mut().enumerate() { - *hash = random_state.hash_one(&array.value(i)); + *hash = array.value(i).hash_one(random_state); } } - } else if mul_col { + } else if multi_col { for (i, hash) in hashes_buffer.iter_mut().enumerate() { if !array.is_null(i) { - *hash = combine_hashes(random_state.hash_one(&array.value(i)), *hash); + *hash = combine_hashes(array.value(i).hash_one(random_state), *hash); } } } else { for (i, hash) in hashes_buffer.iter_mut().enumerate() { if !array.is_null(i) { - *hash = random_state.hash_one(&array.value(i)); + *hash = array.value(i).hash_one(random_state); } } } } -macro_rules! hash_array { - ($array_type:ident, $column: ident, $ty: ty, $hashes: ident, $random_state: ident, $multi_col: ident) => { - let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); - if array.null_count() == 0 { - if $multi_col { - for (i, hash) in $hashes.iter_mut().enumerate() { - *hash = - combine_hashes($random_state.hash_one(&array.value(i)), *hash); - } - } else { - for (i, hash) in $hashes.iter_mut().enumerate() { - *hash = $random_state.hash_one(&array.value(i)); - } - } - } else { - if $multi_col { - for (i, hash) in $hashes.iter_mut().enumerate() { - if !array.is_null(i) { - *hash = combine_hashes( - $random_state.hash_one(&array.value(i)), - *hash, - ); - } - } - } else { - for (i, hash) in $hashes.iter_mut().enumerate() { - if !array.is_null(i) { - *hash = $random_state.hash_one(&array.value(i)); - } - } - } - } - }; -} - -macro_rules! hash_array_primitive { - ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => { - let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); - let values = array.values(); - - if array.null_count() == 0 { - if $multi_col { - for (hash, value) in $hashes.iter_mut().zip(values.iter()) { - *hash = combine_hashes($random_state.hash_one(value), *hash); - } - } else { - for (hash, value) in $hashes.iter_mut().zip(values.iter()) { - *hash = $random_state.hash_one(value) - } - } - } else { - if $multi_col { - for (i, (hash, value)) in - $hashes.iter_mut().zip(values.iter()).enumerate() - { - if !array.is_null(i) { - *hash = combine_hashes($random_state.hash_one(value), *hash); - } - } - } else { - for (i, (hash, value)) in - $hashes.iter_mut().zip(values.iter()).enumerate() - { - if !array.is_null(i) { - *hash = $random_state.hash_one(value); - } - } - } - } - }; -} - -macro_rules! hash_array_float { - ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => { - let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); - let values = array.values(); - - if array.null_count() == 0 { - if $multi_col { - for (hash, value) in $hashes.iter_mut().zip(values.iter()) { - *hash = combine_hashes( - $random_state.hash_one(&$ty::from_le_bytes(value.to_le_bytes())), - *hash, - ); - } - } else { - for (hash, value) in $hashes.iter_mut().zip(values.iter()) { - *hash = - $random_state.hash_one(&$ty::from_le_bytes(value.to_le_bytes())) - } - } - } else { - if $multi_col { - for (i, (hash, value)) in - $hashes.iter_mut().zip(values.iter()).enumerate() - { - if !array.is_null(i) { - *hash = combine_hashes( - $random_state - .hash_one(&$ty::from_le_bytes(value.to_le_bytes())), - *hash, - ); - } - } - } else { - for (i, (hash, value)) in - $hashes.iter_mut().zip(values.iter()).enumerate() - { - if !array.is_null(i) { - *hash = $random_state - .hash_one(&$ty::from_le_bytes(value.to_le_bytes())); - } - } - } - } - }; -} - /// Hash the values in a dictionary array -fn create_hashes_dictionary( - array: &ArrayRef, +fn hash_dictionary( + array: &DictionaryArray, random_state: &RandomState, hashes_buffer: &mut [u64], multi_col: bool, ) -> Result<()> { - let dict_array = array.as_any().downcast_ref::>().unwrap(); - // Hash each dictionary value once, and then use that computed // hash for each key value to avoid a potentially expensive // redundant hashing for large dictionary elements (e.g. strings) - let dict_values = Arc::clone(dict_array.values()); - let mut dict_hashes = vec![0; dict_values.len()]; - create_hashes(&[dict_values], random_state, &mut dict_hashes)?; + let values = Arc::clone(array.values()); + let mut dict_hashes = vec![0; values.len()]; + create_hashes(&[values], random_state, &mut dict_hashes)?; // combine hash for each index in values if multi_col { - for (hash, key) in hashes_buffer.iter_mut().zip(dict_array.keys().iter()) { + for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) { if let Some(key) = key { - let idx = key - .to_usize() - .ok_or_else(|| { - DataFusionError::Internal(format!( - "Can not convert key value {:?} to usize in dictionary of type {:?}", - key, dict_array.data_type() - )) - })?; - *hash = combine_hashes(dict_hashes[idx], *hash) + *hash = combine_hashes(dict_hashes[key.as_usize()], *hash) } // no update for Null, consistent with other hashes } } else { - for (hash, key) in hashes_buffer.iter_mut().zip(dict_array.keys().iter()) { + for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) { if let Some(key) = key { - let idx = key - .to_usize() - .ok_or_else(|| { - DataFusionError::Internal(format!( - "Can not convert key value {:?} to usize in dictionary of type {:?}", - key, dict_array.data_type() - )) - })?; - *hash = dict_hashes[idx] + *hash = dict_hashes[key.as_usize()] } // no update for Null, consistent with other hashes } } @@ -312,309 +204,34 @@ pub fn create_hashes<'a>( ) -> Result<&'a mut Vec> { // combine hashes with `combine_hashes` if we have more than 1 column - use arrow::array::{BinaryArray, LargeBinaryArray}; let multi_col = arrays.len() > 1; for col in arrays { - match col.data_type() { - DataType::Null => { - hash_null(random_state, hashes_buffer, multi_col); + let array = col.as_ref(); + downcast_primitive_array! { + array => hash_array(array, random_state, hashes_buffer, multi_col), + DataType::Null => hash_null(random_state, hashes_buffer, multi_col), + DataType::Boolean => hash_array(as_boolean_array(array), random_state, hashes_buffer, multi_col), + DataType::Utf8 => hash_array(as_string_array(array), random_state, hashes_buffer, multi_col), + DataType::LargeUtf8 => hash_array(as_largestring_array(array), random_state, hashes_buffer, multi_col), + DataType::Binary => hash_array(as_generic_binary_array::(array), random_state, hashes_buffer, multi_col), + DataType::LargeBinary => hash_array(as_generic_binary_array::(array), random_state, hashes_buffer, multi_col), + DataType::FixedSizeBinary(_) => { + let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap(); + hash_array(array, random_state, hashes_buffer, multi_col) } DataType::Decimal128(_, _) => { - hash_decimal128(col, random_state, hashes_buffer, multi_col); - } - DataType::UInt8 => { - hash_array_primitive!( - UInt8Array, - col, - u8, - hashes_buffer, - random_state, - multi_col - ); - } - DataType::UInt16 => { - hash_array_primitive!( - UInt16Array, - col, - u16, - hashes_buffer, - random_state, - multi_col - ); - } - DataType::UInt32 => { - hash_array_primitive!( - UInt32Array, - col, - u32, - hashes_buffer, - random_state, - multi_col - ); - } - DataType::UInt64 => { - hash_array_primitive!( - UInt64Array, - col, - u64, - hashes_buffer, - random_state, - multi_col - ); - } - DataType::Int8 => { - hash_array_primitive!( - Int8Array, - col, - i8, - hashes_buffer, - random_state, - multi_col - ); - } - DataType::Int16 => { - hash_array_primitive!( - Int16Array, - col, - i16, - hashes_buffer, - random_state, - multi_col - ); - } - DataType::Int32 => { - hash_array_primitive!( - Int32Array, - col, - i32, - hashes_buffer, - random_state, - multi_col - ); + let array = as_primitive_array::(array); + hash_array(array, random_state, hashes_buffer, multi_col) } - DataType::Int64 => { - hash_array_primitive!( - Int64Array, - col, - i64, - hashes_buffer, - random_state, - multi_col - ); + DataType::Decimal256(_, _) => { + let array = as_primitive_array::(array); + hash_array(array, random_state, hashes_buffer, multi_col) } - DataType::Float32 => { - hash_array_float!( - Float32Array, - col, - u32, - hashes_buffer, - random_state, - multi_col - ); + DataType::Dictionary(_, _) => downcast_dictionary_array! { + array => hash_dictionary(array, random_state, hashes_buffer, multi_col)?, + _ => unreachable!() } - DataType::Float64 => { - hash_array_float!( - Float64Array, - col, - u64, - hashes_buffer, - random_state, - multi_col - ); - } - DataType::Timestamp(TimeUnit::Second, None) => { - hash_array_primitive!( - TimestampSecondArray, - col, - i64, - hashes_buffer, - random_state, - multi_col - ); - } - DataType::Timestamp(TimeUnit::Millisecond, None) => { - hash_array_primitive!( - TimestampMillisecondArray, - col, - i64, - hashes_buffer, - random_state, - multi_col - ); - } - DataType::Timestamp(TimeUnit::Microsecond, None) => { - hash_array_primitive!( - TimestampMicrosecondArray, - col, - i64, - hashes_buffer, - random_state, - multi_col - ); - } - DataType::Timestamp(TimeUnit::Nanosecond, _) => { - hash_array_primitive!( - TimestampNanosecondArray, - col, - i64, - hashes_buffer, - random_state, - multi_col - ); - } - DataType::Date32 => { - hash_array_primitive!( - Date32Array, - col, - i32, - hashes_buffer, - random_state, - multi_col - ); - } - DataType::Date64 => { - hash_array_primitive!( - Date64Array, - col, - i64, - hashes_buffer, - random_state, - multi_col - ); - } - DataType::Boolean => { - hash_array!( - BooleanArray, - col, - u8, - hashes_buffer, - random_state, - multi_col - ); - } - DataType::Utf8 => { - hash_array!( - StringArray, - col, - str, - hashes_buffer, - random_state, - multi_col - ); - } - DataType::LargeUtf8 => { - hash_array!( - LargeStringArray, - col, - str, - hashes_buffer, - random_state, - multi_col - ); - } - DataType::Binary => { - hash_array!( - BinaryArray, - col, - &[u8], - hashes_buffer, - random_state, - multi_col - ); - } - DataType::FixedSizeBinary(_) => { - hash_array!( - FixedSizeBinaryArray, - col, - &[u8], - hashes_buffer, - random_state, - multi_col - ); - } - DataType::LargeBinary => { - hash_array!( - LargeBinaryArray, - col, - &[u8], - hashes_buffer, - random_state, - multi_col - ); - } - DataType::Dictionary(index_type, _) => match **index_type { - DataType::Int8 => { - create_hashes_dictionary::( - col, - random_state, - hashes_buffer, - multi_col, - )?; - } - DataType::Int16 => { - create_hashes_dictionary::( - col, - random_state, - hashes_buffer, - multi_col, - )?; - } - DataType::Int32 => { - create_hashes_dictionary::( - col, - random_state, - hashes_buffer, - multi_col, - )?; - } - DataType::Int64 => { - create_hashes_dictionary::( - col, - random_state, - hashes_buffer, - multi_col, - )?; - } - DataType::UInt8 => { - create_hashes_dictionary::( - col, - random_state, - hashes_buffer, - multi_col, - )?; - } - DataType::UInt16 => { - create_hashes_dictionary::( - col, - random_state, - hashes_buffer, - multi_col, - )?; - } - DataType::UInt32 => { - create_hashes_dictionary::( - col, - random_state, - hashes_buffer, - multi_col, - )?; - } - DataType::UInt64 => { - create_hashes_dictionary::( - col, - random_state, - hashes_buffer, - multi_col, - )?; - } - _ => { - return Err(DataFusionError::Internal(format!( - "Unsupported dictionary type in hasher hashing: {}", - col.data_type(), - ))) - } - }, _ => { // This is internal because we should have caught this before. return Err(DataFusionError::Internal(format!( @@ -630,10 +247,7 @@ pub fn create_hashes<'a>( #[cfg(test)] mod tests { use crate::from_slice::FromSlice; - use arrow::{ - array::{BinaryArray, DictionaryArray, FixedSizeBinaryArray}, - datatypes::Int8Type, - }; + use arrow::{array::*, datatypes::*}; use std::sync::Arc; use super::*;