Skip to content

Commit

Permalink
Speed up create_batch_from_map (#339)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dandandan authored May 27, 2021
1 parent 3f7736c commit 9e7bd2d
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 114 deletions.
156 changes: 67 additions & 89 deletions datafusion/src/physical_plan/hash_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use std::any::Any;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::vec;

use ahash::RandomState;
use futures::{
Expand All @@ -32,6 +33,7 @@ use crate::physical_plan::{
Accumulator, AggregateExpr, DisplayFormatType, Distribution, ExecutionPlan,
Partitioning, PhysicalExpr, SQLMetric,
};
use crate::scalar::ScalarValue;

use arrow::{
array::{Array, UInt32Builder},
Expand Down Expand Up @@ -623,10 +625,12 @@ fn create_key_for_col(col: &ArrayRef, row: usize, vec: &mut Vec<u8>) -> Result<(
DataType::UInt64 => {
dictionary_create_key_for_col::<UInt64Type>(col, row, vec)?;
}
_ => return Err(DataFusionError::Internal(format!(
_ => {
return Err(DataFusionError::Internal(format!(
"Unsupported GROUP BY type (dictionary index type not supported creating key) {}",
col.data_type(),
))),
)))
}
},
_ => {
// This is internal because we should have caught this before.
Expand Down Expand Up @@ -957,105 +961,79 @@ impl RecordBatchStream for HashAggregateStream {
}
}

/// Given Vec<Vec<ArrayRef>>, concatenates the inners `Vec<ArrayRef>` into `ArrayRef`, returning `Vec<ArrayRef>`
/// This assumes that `arrays` is not empty.
fn concatenate(arrays: Vec<Vec<ArrayRef>>) -> ArrowResult<Vec<ArrayRef>> {
(0..arrays[0].len())
.map(|column| {
let array_list = arrays
.iter()
.map(|a| a[column].as_ref())
.collect::<Vec<_>>();
compute::concat(&array_list)
})
.collect::<ArrowResult<Vec<_>>>()
}

/// Create a RecordBatch with all group keys and accumulator' states or values.
fn create_batch_from_map(
mode: &AggregateMode,
accumulators: &Accumulators,
num_group_expr: usize,
output_schema: &Schema,
) -> ArrowResult<RecordBatch> {
// 1. for each key
// 2. create single-row ArrayRef with all group expressions
// 3. create single-row ArrayRef with all aggregate states or values
// 4. collect all in a vector per key of vec<ArrayRef>, vec[i][j]
// 5. concatenate the arrays over the second index [j] into a single vec<ArrayRef>.
let arrays = accumulators
.iter()
.map(|(_, (group_by_values, accumulator_set, _))| {
// 2.
let mut groups = (0..num_group_expr)
.map(|i| match &group_by_values[i] {
GroupByScalar::Float32(n) => {
Arc::new(Float32Array::from(vec![(*n).into()] as Vec<f32>))
as ArrayRef
}
GroupByScalar::Float64(n) => {
Arc::new(Float64Array::from(vec![(*n).into()] as Vec<f64>))
as ArrayRef
}
GroupByScalar::Int8(n) => {
Arc::new(Int8Array::from(vec![*n])) as ArrayRef
}
GroupByScalar::Int16(n) => Arc::new(Int16Array::from(vec![*n])),
GroupByScalar::Int32(n) => Arc::new(Int32Array::from(vec![*n])),
GroupByScalar::Int64(n) => Arc::new(Int64Array::from(vec![*n])),
GroupByScalar::UInt8(n) => Arc::new(UInt8Array::from(vec![*n])),
GroupByScalar::UInt16(n) => Arc::new(UInt16Array::from(vec![*n])),
GroupByScalar::UInt32(n) => Arc::new(UInt32Array::from(vec![*n])),
GroupByScalar::UInt64(n) => Arc::new(UInt64Array::from(vec![*n])),
GroupByScalar::Utf8(str) => {
Arc::new(StringArray::from(vec![&***str]))
}
GroupByScalar::LargeUtf8(str) => {
Arc::new(LargeStringArray::from(vec![&***str]))
}
GroupByScalar::Boolean(b) => Arc::new(BooleanArray::from(vec![*b])),
GroupByScalar::TimeMillisecond(n) => {
Arc::new(TimestampMillisecondArray::from(vec![*n]))
}
GroupByScalar::TimeMicrosecond(n) => {
Arc::new(TimestampMicrosecondArray::from(vec![*n]))
}
GroupByScalar::TimeNanosecond(n) => {
Arc::new(TimestampNanosecondArray::from_vec(vec![*n], None))
}
GroupByScalar::Date32(n) => Arc::new(Date32Array::from(vec![*n])),
})
.collect::<Vec<ArrayRef>>();
if accumulators.is_empty() {
return Ok(RecordBatch::new_empty(Arc::new(output_schema.to_owned())));
}
let (_, (_, accs, _)) = accumulators.iter().next().unwrap();
let mut acc_data_types: Vec<usize> = vec![];

// 3.
groups.extend(
finalize_aggregation(accumulator_set, mode)
.map_err(DataFusionError::into_arrow_external_error)?,
);
// Calculate number/shape of state arrays
match mode {
AggregateMode::Partial => {
for acc in accs.iter() {
let state = acc
.state()
.map_err(DataFusionError::into_arrow_external_error)?;
acc_data_types.push(state.len());
}
}
AggregateMode::Final | AggregateMode::FinalPartitioned => {
acc_data_types = vec![1; accs.len()];
}
}

Ok(groups)
let mut columns = (0..num_group_expr)
.map(|i| {
ScalarValue::iter_to_array(accumulators.into_iter().map(
|(_, (group_by_values, _, _))| ScalarValue::from(&group_by_values[i]),
))
})
// 4.
.collect::<ArrowResult<Vec<Vec<ArrayRef>>>>()?;
.collect::<Result<Vec<_>>>()
.map_err(|x| x.into_arrow_external_error())?;

// add state / evaluated arrays
for (x, &state_len) in acc_data_types.iter().enumerate() {
for y in 0..state_len {
match mode {
AggregateMode::Partial => {
let res = ScalarValue::iter_to_array(accumulators.into_iter().map(
|(_, (_, accumulator, _))| {
let x = accumulator[x].state().unwrap();
x[y].clone()
},
))
.map_err(DataFusionError::into_arrow_external_error)?;

columns.push(res);
}
AggregateMode::Final | AggregateMode::FinalPartitioned => {
let res = ScalarValue::iter_to_array(accumulators.into_iter().map(
|(_, (_, accumulator, _))| accumulator[x].evaluate().unwrap(),
))
.map_err(DataFusionError::into_arrow_external_error)?;
columns.push(res);
}
}
}
}

let batch = if !arrays.is_empty() {
// 5.
let columns = concatenate(arrays)?;
// cast output if needed (e.g. for types like Dictionary where
// the intermediate GroupByScalar type was not the same as the
// output
let columns = columns
.iter()
.zip(output_schema.fields().iter())
.map(|(col, desired_field)| cast(col, desired_field.data_type()))
.collect::<ArrowResult<Vec<_>>>()?;

// cast output if needed (e.g. for types like Dictionary where
// the intermediate GroupByScalar type was not the same as the
// output
let columns = columns
.iter()
.zip(output_schema.fields().iter())
.map(|(col, desired_field)| cast(col, desired_field.data_type()))
.collect::<ArrowResult<Vec<_>>>()?;

RecordBatch::try_new(Arc::new(output_schema.to_owned()), columns)?
} else {
RecordBatch::new_empty(Arc::new(output_schema.to_owned()))
};
Ok(batch)
RecordBatch::try_new(Arc::new(output_schema.to_owned()), columns)
}

fn create_accumulators(
Expand Down
Loading

0 comments on commit 9e7bd2d

Please sign in to comment.