Skip to content

Commit

Permalink
Avoid copy in count / logical nulls (#221)
Browse files Browse the repository at this point in the history
* Avoid copy in count / logical nulls

* Update datafusion/physical-expr/src/aggregate/count.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

---------

Co-authored-by: Georgi Krastev <joro.kr.21@gmail.com>
  • Loading branch information
alamb and joroKr21 authored Dec 14, 2023
1 parent 23097f5 commit cecc493
Showing 1 changed file with 23 additions and 7 deletions.
30 changes: 23 additions & 7 deletions datafusion/physical-expr/src/aggregate/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Defines physical expressions that can evaluated at runtime during query execution

use std::any::Any;
use std::borrow::Cow;
use std::fmt::Debug;
use std::ops::BitAnd;
use std::sync::Arc;
Expand All @@ -31,7 +32,7 @@ use arrow::{array::ArrayRef, datatypes::Field};
use arrow_array::cast::AsArray;
use arrow_array::types::Int64Type;
use arrow_array::PrimitiveArray;
use arrow_buffer::BooleanBuffer;
use arrow_buffer::{BooleanBuffer, NullBuffer};
use datafusion_common::{downcast_value, ScalarValue};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::Accumulator;
Expand Down Expand Up @@ -117,13 +118,14 @@ impl GroupsAccumulator for CountGroupsAccumulator {
) -> Result<()> {
assert_eq!(values.len(), 1, "single argument to update_batch");
let values = &values[0];
let nulls = logical_nulls(values);

// Add one to each group's counter for each non null, non
// filtered value
self.counts.resize(total_num_groups, 0);
accumulate_indices(
group_indices,
values.logical_nulls().as_ref(),
nulls.as_ref().map(|nulls| nulls.as_ref()),
opt_filter,
|group_index| {
self.counts[group_index] += 1;
Expand Down Expand Up @@ -192,24 +194,38 @@ impl GroupsAccumulator for CountGroupsAccumulator {
}
}

/// Returns a `NullBuffer` indicating which values are null
///
/// Returns a [`Cow`] to avoid cloning `NullBuffers` for array types that
/// have this information already computed such as Primitive and StringArrays
fn logical_nulls(array: &dyn Array) -> Option<Cow<'_, NullBuffer>> {
match array.data_type() {
// These types have computed null buffers, so need a call to logical nulls
// TODO remove when upstream is released
// https://github.com/apache/arrow-rs/issues/5208
DataType::Null | DataType::Dictionary(_, _) => {
array.logical_nulls().map(Cow::Owned)
}
_ => array.nulls().map(Cow::Borrowed),
}
}

/// count null values for multiple columns
/// for each row if one column value is null, then null_count + 1
fn null_count_for_multiple_cols(values: &[ArrayRef]) -> usize {
if values.len() > 1 {
let result_bool_buf: Option<BooleanBuffer> = values
.iter()
.map(|a| a.logical_nulls())
.map(|a| logical_nulls(a))
.fold(None, |acc, b| match (acc, b) {
(Some(acc), Some(b)) => Some(acc.bitand(b.inner())),
(Some(acc), None) => Some(acc),
(None, Some(b)) => Some(b.into_inner()),
(None, Some(b)) => Some(b.into_owned().into_inner()),
_ => None,
});
result_bool_buf.map_or(0, |b| values[0].len() - b.count_set_bits())
} else {
values[0]
.logical_nulls()
.map_or(0, |nulls| nulls.null_count())
logical_nulls(&values[0]).map_or(0, |nulls| nulls.null_count())
}
}

Expand Down

0 comments on commit cecc493

Please sign in to comment.