Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up create_batch_from_map #339

Merged
merged 6 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 67 additions & 89 deletions datafusion/src/physical_plan/hash_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use std::any::Any;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::vec;

use ahash::RandomState;
use futures::{
Expand All @@ -32,6 +33,7 @@ use crate::physical_plan::{
Accumulator, AggregateExpr, DisplayFormatType, Distribution, ExecutionPlan,
Partitioning, PhysicalExpr, SQLMetric,
};
use crate::scalar::ScalarValue;

use arrow::{
array::{Array, UInt32Builder},
Expand Down Expand Up @@ -623,10 +625,12 @@ fn create_key_for_col(col: &ArrayRef, row: usize, vec: &mut Vec<u8>) -> Result<(
DataType::UInt64 => {
dictionary_create_key_for_col::<UInt64Type>(col, row, vec)?;
}
_ => return Err(DataFusionError::Internal(format!(
_ => {
return Err(DataFusionError::Internal(format!(
"Unsupported GROUP BY type (dictionary index type not supported creating key) {}",
col.data_type(),
))),
)))
}
},
_ => {
// This is internal because we should have caught this before.
Expand Down Expand Up @@ -956,105 +960,79 @@ impl RecordBatchStream for HashAggregateStream {
}
}

/// Given Vec<Vec<ArrayRef>>, concatenates the inners `Vec<ArrayRef>` into `ArrayRef`, returning `Vec<ArrayRef>`
/// This assumes that `arrays` is not empty.
fn concatenate(arrays: Vec<Vec<ArrayRef>>) -> ArrowResult<Vec<ArrayRef>> {
(0..arrays[0].len())
.map(|column| {
let array_list = arrays
.iter()
.map(|a| a[column].as_ref())
.collect::<Vec<_>>();
compute::concat(&array_list)
})
.collect::<ArrowResult<Vec<_>>>()
}

/// Create a RecordBatch with all group keys and accumulator' states or values.
fn create_batch_from_map(
mode: &AggregateMode,
accumulators: &Accumulators,
num_group_expr: usize,
output_schema: &Schema,
) -> ArrowResult<RecordBatch> {
// 1. for each key
// 2. create single-row ArrayRef with all group expressions
// 3. create single-row ArrayRef with all aggregate states or values
// 4. collect all in a vector per key of vec<ArrayRef>, vec[i][j]
// 5. concatenate the arrays over the second index [j] into a single vec<ArrayRef>.
let arrays = accumulators
.iter()
.map(|(_, (group_by_values, accumulator_set, _))| {
// 2.
let mut groups = (0..num_group_expr)
.map(|i| match &group_by_values[i] {
GroupByScalar::Float32(n) => {
Arc::new(Float32Array::from(vec![(*n).into()] as Vec<f32>))
as ArrayRef
}
GroupByScalar::Float64(n) => {
Arc::new(Float64Array::from(vec![(*n).into()] as Vec<f64>))
as ArrayRef
}
GroupByScalar::Int8(n) => {
Arc::new(Int8Array::from(vec![*n])) as ArrayRef
}
GroupByScalar::Int16(n) => Arc::new(Int16Array::from(vec![*n])),
GroupByScalar::Int32(n) => Arc::new(Int32Array::from(vec![*n])),
GroupByScalar::Int64(n) => Arc::new(Int64Array::from(vec![*n])),
GroupByScalar::UInt8(n) => Arc::new(UInt8Array::from(vec![*n])),
GroupByScalar::UInt16(n) => Arc::new(UInt16Array::from(vec![*n])),
GroupByScalar::UInt32(n) => Arc::new(UInt32Array::from(vec![*n])),
GroupByScalar::UInt64(n) => Arc::new(UInt64Array::from(vec![*n])),
GroupByScalar::Utf8(str) => {
Arc::new(StringArray::from(vec![&***str]))
}
GroupByScalar::LargeUtf8(str) => {
Arc::new(LargeStringArray::from(vec![&***str]))
}
GroupByScalar::Boolean(b) => Arc::new(BooleanArray::from(vec![*b])),
GroupByScalar::TimeMillisecond(n) => {
Arc::new(TimestampMillisecondArray::from(vec![*n]))
}
GroupByScalar::TimeMicrosecond(n) => {
Arc::new(TimestampMicrosecondArray::from(vec![*n]))
}
GroupByScalar::TimeNanosecond(n) => {
Arc::new(TimestampNanosecondArray::from_vec(vec![*n], None))
}
GroupByScalar::Date32(n) => Arc::new(Date32Array::from(vec![*n])),
})
.collect::<Vec<ArrayRef>>();
if accumulators.is_empty() {
return Ok(RecordBatch::new_empty(Arc::new(output_schema.to_owned())));
}
let (_, (_, accs, _)) = accumulators.iter().next().unwrap();
let mut acc_data_types: Vec<usize> = vec![];

// 3.
groups.extend(
finalize_aggregation(accumulator_set, mode)
.map_err(DataFusionError::into_arrow_external_error)?,
);
// Calculate number/shape of state arrays
match mode {
AggregateMode::Partial => {
for acc in accs.iter() {
let state = acc
.state()
.map_err(DataFusionError::into_arrow_external_error)?;
acc_data_types.push(state.len());
}
}
AggregateMode::Final | AggregateMode::FinalPartitioned => {
acc_data_types = vec![1; accs.len()];
}
}

Ok(groups)
let mut columns = (0..num_group_expr)
.map(|i| {
ScalarValue::iter_to_array(accumulators.into_iter().map(
|(_, (group_by_values, _, _))| ScalarValue::from(&group_by_values[i]),
))
})
// 4.
.collect::<ArrowResult<Vec<Vec<ArrayRef>>>>()?;
.collect::<Result<Vec<_>>>()
.map_err(|x| x.into_arrow_external_error())?;

// add state / evaluated arrays
for (x, &state_len) in acc_data_types.iter().enumerate() {
for y in 0..state_len {
match mode {
AggregateMode::Partial => {
let res = ScalarValue::iter_to_array(accumulators.into_iter().map(
|(_, (_, accumulator, _))| {
let x = accumulator[x].state().unwrap();
x[y].clone()
},
))
.map_err(DataFusionError::into_arrow_external_error)?;

columns.push(res);
}
AggregateMode::Final | AggregateMode::FinalPartitioned => {
let res = ScalarValue::iter_to_array(accumulators.into_iter().map(
|(_, (_, accumulator, _))| accumulator[x].evaluate().unwrap(),
))
.map_err(DataFusionError::into_arrow_external_error)?;
columns.push(res);
}
}
}
}

let batch = if !arrays.is_empty() {
// 5.
let columns = concatenate(arrays)?;
// cast output if needed (e.g. for types like Dictionary where
// the intermediate GroupByScalar type was not the same as the
// output
let columns = columns
.iter()
.zip(output_schema.fields().iter())
.map(|(col, desired_field)| cast(col, desired_field.data_type()))
.collect::<ArrowResult<Vec<_>>>()?;

// cast output if needed (e.g. for types like Dictionary where
// the intermediate GroupByScalar type was not the same as the
// output
let columns = columns
.iter()
.zip(output_schema.fields().iter())
.map(|(col, desired_field)| cast(col, desired_field.data_type()))
.collect::<ArrowResult<Vec<_>>>()?;

RecordBatch::try_new(Arc::new(output_schema.to_owned()), columns)?
} else {
RecordBatch::new_empty(Arc::new(output_schema.to_owned()))
};
Ok(batch)
RecordBatch::try_new(Arc::new(output_schema.to_owned()), columns)
}

fn create_accumulators(
Expand Down
Loading