Skip to content

Commit

Permalink
Count distinct floats (#252)
Browse files Browse the repository at this point in the history
* Added test for boolean COUNT DISTINCT

* ran cargo fmt

* Fixed clippy warnings

* Added COUNT DISTINCT support for floating point numbers
  • Loading branch information
pjmore authored May 4, 2021
1 parent 5f6024d commit cb0a4a9
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 2 deletions.
85 changes: 83 additions & 2 deletions datafusion/src/physical_plan/distinct_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,9 @@ mod tests {
use super::*;

use arrow::array::{
ArrayRef, BooleanArray, Int16Array, Int32Array, Int64Array, Int8Array, ListArray,
UInt16Array, UInt32Array, UInt64Array, UInt8Array,
ArrayRef, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array,
Int64Array, Int8Array, ListArray, UInt16Array, UInt32Array, UInt64Array,
UInt8Array,
};
use arrow::array::{Int32Builder, ListBuilder, UInt64Builder};
use arrow::datatypes::DataType;
Expand Down Expand Up @@ -355,6 +356,76 @@ mod tests {
}};
}

//Used trait to create associated constant for f32 and f64
trait SubNormal: 'static {
const SUBNORMAL: Self;
}

impl SubNormal for f64 {
const SUBNORMAL: Self = 1.0e-308_f64;
}

impl SubNormal for f32 {
const SUBNORMAL: Self = 1.0e-38_f32;
}

macro_rules! test_count_distinct_update_batch_floating_point {
($ARRAY_TYPE:ident, $DATA_TYPE:ident, $PRIM_TYPE:ty) => {{
use ordered_float::OrderedFloat;
let values: Vec<Option<$PRIM_TYPE>> = vec![
Some(<$PRIM_TYPE>::INFINITY),
Some(<$PRIM_TYPE>::NAN),
Some(1.0),
Some(<$PRIM_TYPE as SubNormal>::SUBNORMAL),
Some(1.0),
Some(<$PRIM_TYPE>::INFINITY),
None,
Some(3.0),
Some(-4.5),
Some(2.0),
None,
Some(2.0),
Some(3.0),
Some(<$PRIM_TYPE>::NEG_INFINITY),
Some(1.0),
Some(<$PRIM_TYPE>::NAN),
Some(<$PRIM_TYPE>::NEG_INFINITY),
];

let arrays = vec![Arc::new($ARRAY_TYPE::from(values)) as ArrayRef];

let (states, result) = run_update_batch(&arrays)?;

let mut state_vec =
state_to_vec!(&states[0], $DATA_TYPE, $PRIM_TYPE).unwrap();
state_vec.sort_by(|a, b| match (a, b) {
(Some(lhs), Some(rhs)) => {
OrderedFloat::from(*lhs).cmp(&OrderedFloat::from(*rhs))
}
_ => a.partial_cmp(b).unwrap(),
});

let nan_idx = state_vec.len() - 1;
assert_eq!(states.len(), 1);
assert_eq!(
&state_vec[..nan_idx],
vec![
Some(<$PRIM_TYPE>::NEG_INFINITY),
Some(-4.5),
Some(<$PRIM_TYPE as SubNormal>::SUBNORMAL),
Some(1.0),
Some(2.0),
Some(3.0),
Some(<$PRIM_TYPE>::INFINITY)
]
);
assert!(state_vec[nan_idx].unwrap_or_default().is_nan());
assert_eq!(result, ScalarValue::UInt64(Some(8)));

Ok(())
}};
}

#[test]
fn count_distinct_update_batch_i8() -> Result<()> {
test_count_distinct_update_batch_numeric!(Int8Array, Int8, i8)
Expand Down Expand Up @@ -395,6 +466,16 @@ mod tests {
test_count_distinct_update_batch_numeric!(UInt64Array, UInt64, u64)
}

#[test]
fn count_distinct_update_batch_f32() -> Result<()> {
test_count_distinct_update_batch_floating_point!(Float32Array, Float32, f32)
}

#[test]
fn count_distinct_update_batch_f64() -> Result<()> {
test_count_distinct_update_batch_floating_point!(Float64Array, Float64, f64)
}

#[test]
fn count_distinct_update_batch_boolean() -> Result<()> {
let get_count = |data: BooleanArray| -> Result<(Vec<Option<bool>>, u64)> {
Expand Down
2 changes: 2 additions & 0 deletions datafusion/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,8 @@ impl ScalarValue {
DataType::UInt32 => build_list!(UInt32Builder, UInt32, values, size),
DataType::UInt64 => build_list!(UInt64Builder, UInt64, values, size),
DataType::Utf8 => build_list!(StringBuilder, Utf8, values, size),
DataType::Float32 => build_list!(Float32Builder, Float32, values, size),
DataType::Float64 => build_list!(Float64Builder, Float64, values, size),
DataType::LargeUtf8 => {
build_list!(LargeStringBuilder, LargeUtf8, values, size)
}
Expand Down

0 comments on commit cb0a4a9

Please sign in to comment.