Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip partial aggregation based on the cardinality of hash value instead of group values #12697

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,7 @@ config_namespace! {
/// Aggregation ratio (number of distinct groups / number of input rows)
/// threshold for skipping partial aggregation. If the value is greater
/// then partial aggregation will skip aggregation for further input
pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8

/// Number of input rows partial aggregation partition should process, before
/// aggregation ratio check and trying to switch to skipping aggregation mode
pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why has this setting been deleted? Checking if aggregation should be skipped right in the beginning of the execution may lead to skipping decision made based on insufficient amount of data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is that in my strategy I make the decision per batch since I assume the data is distributed evenly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config with 0.1 and 100_000

┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       main ┃  threshold ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │     0.41ms │     0.45ms │  1.10x slower │
│ QQuery 1     │    42.61ms │    42.55ms │     no change │
│ QQuery 2     │    75.89ms │    74.88ms │     no change │
│ QQuery 3     │    72.24ms │    61.81ms │ +1.17x faster │
│ QQuery 4     │   399.04ms │   359.82ms │ +1.11x faster │
│ QQuery 5     │   663.20ms │   699.03ms │  1.05x slower │
│ QQuery 6     │    37.91ms │    38.01ms │     no change │
│ QQuery 7     │    42.71ms │    40.05ms │ +1.07x faster │
│ QQuery 8     │   638.09ms │   699.07ms │  1.10x slower │
│ QQuery 9     │   678.82ms │   742.56ms │  1.09x slower │
│ QQuery 10    │   200.65ms │   209.83ms │     no change │
│ QQuery 11    │   233.85ms │   220.56ms │ +1.06x faster │
│ QQuery 12    │   718.91ms │   660.93ms │ +1.09x faster │
│ QQuery 13    │   926.86ms │   937.19ms │     no change │
│ QQuery 14    │   873.60ms │   753.51ms │ +1.16x faster │
│ QQuery 15    │   498.70ms │   483.92ms │     no change │
│ QQuery 16    │  1305.01ms │  1238.38ms │ +1.05x faster │
│ QQuery 17    │  1190.26ms │  1111.67ms │ +1.07x faster │
│ QQuery 18    │  3347.18ms │  2903.17ms │ +1.15x faster │
│ QQuery 19    │    55.77ms │    61.39ms │  1.10x slower │
│ QQuery 20    │   943.13ms │   919.37ms │     no change │
│ QQuery 21    │  1206.45ms │  1230.95ms │     no change │
│ QQuery 22    │  3230.71ms │  3479.20ms │  1.08x slower │
│ QQuery 23    │  8186.73ms │  7989.35ms │     no change │
│ QQuery 24    │   500.91ms │   507.13ms │     no change │
│ QQuery 25    │   502.13ms │   551.57ms │  1.10x slower │
│ QQuery 26    │   561.53ms │   568.74ms │     no change │
│ QQuery 27    │  1338.06ms │  1382.16ms │     no change │
│ QQuery 28    │ 10507.08ms │ 11253.66ms │  1.07x slower │
│ QQuery 29    │   398.16ms │   429.90ms │  1.08x slower │
│ QQuery 30    │   760.72ms │   664.19ms │ +1.15x faster │
│ QQuery 31    │   712.91ms │   772.65ms │  1.08x slower │
│ QQuery 32    │  3567.36ms │  4167.37ms │  1.17x slower │
│ QQuery 33    │  4863.25ms │  4311.27ms │ +1.13x faster │
│ QQuery 34    │  4115.97ms │  3564.32ms │ +1.15x faster │
│ QQuery 35    │   998.40ms │   979.18ms │     no change │
│ QQuery 36    │   147.99ms │   136.97ms │ +1.08x faster │
│ QQuery 37    │   101.82ms │   102.79ms │     no change │
│ QQuery 38    │   108.77ms │   107.66ms │     no change │
│ QQuery 39    │   327.29ms │   272.91ms │ +1.20x faster │
│ QQuery 40    │    33.56ms │    34.88ms │     no change │
│ QQuery 41    │    34.09ms │    32.18ms │ +1.06x faster │
│ QQuery 42    │    39.32ms │    39.75ms │     no change │
└──────────────┴────────────┴────────────┴───────────────┘

config with 0.1 and 0

┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       main ┃  threshold ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │     0.41ms │     0.49ms │  1.21x slower │
│ QQuery 1     │    42.61ms │    42.34ms │     no change │
│ QQuery 2     │    75.89ms │    81.91ms │  1.08x slower │
│ QQuery 3     │    72.24ms │    69.94ms │     no change │
│ QQuery 4     │   399.04ms │   365.30ms │ +1.09x faster │
│ QQuery 5     │   663.20ms │   656.21ms │     no change │
│ QQuery 6     │    37.91ms │    38.24ms │     no change │
│ QQuery 7     │    42.71ms │    40.27ms │ +1.06x faster │
│ QQuery 8     │   638.09ms │   590.64ms │ +1.08x faster │
│ QQuery 9     │   678.82ms │   666.57ms │     no change │
│ QQuery 10    │   200.65ms │   196.94ms │     no change │
│ QQuery 11    │   233.85ms │   220.90ms │ +1.06x faster │
│ QQuery 12    │   718.91ms │   652.58ms │ +1.10x faster │
│ QQuery 13    │   926.86ms │   941.22ms │     no change │
│ QQuery 14    │   873.60ms │   749.50ms │ +1.17x faster │
│ QQuery 15    │   498.70ms │   495.09ms │     no change │
│ QQuery 16    │  1305.01ms │  1247.99ms │     no change │
│ QQuery 17    │  1190.26ms │  1158.71ms │     no change │
│ QQuery 18    │  3347.18ms │  2008.84ms │ +1.67x faster │
│ QQuery 19    │    55.77ms │    54.94ms │     no change │
│ QQuery 20    │   943.13ms │   913.04ms │     no change │
│ QQuery 21    │  1206.45ms │  1183.49ms │     no change │
│ QQuery 22    │  3230.71ms │  3181.47ms │     no change │
│ QQuery 23    │  8186.73ms │  7900.35ms │     no change │
│ QQuery 24    │   500.91ms │   501.30ms │     no change │
│ QQuery 25    │   502.13ms │   481.08ms │     no change │
│ QQuery 26    │   561.53ms │   571.92ms │     no change │
│ QQuery 27    │  1338.06ms │  1451.63ms │  1.08x slower │
│ QQuery 28    │ 10507.08ms │ 11396.72ms │  1.08x slower │
│ QQuery 29    │   398.16ms │   392.46ms │     no change │
│ QQuery 30    │   760.72ms │   658.78ms │ +1.15x faster │
│ QQuery 31    │   712.91ms │   702.45ms │     no change │
│ QQuery 32    │  3567.36ms │  3520.95ms │     no change │
│ QQuery 33    │  4863.25ms │  3527.36ms │ +1.38x faster │
│ QQuery 34    │  4115.97ms │  3731.86ms │ +1.10x faster │
│ QQuery 35    │   998.40ms │   964.65ms │     no change │
│ QQuery 36    │   147.99ms │   136.33ms │ +1.09x faster │
│ QQuery 37    │   101.82ms │   102.70ms │     no change │
│ QQuery 38    │   108.77ms │   105.97ms │     no change │
│ QQuery 39    │   327.29ms │   267.66ms │ +1.22x faster │
│ QQuery 40    │    33.56ms │    34.77ms │     no change │
│ QQuery 41    │    34.09ms │    31.76ms │ +1.07x faster │
│ QQuery 42    │    39.32ms │    40.56ms │     no change │
└──────────────┴────────────┴────────────┴───────────────┘

pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's interesting why 0.1 value makes things faster 🤔

Is aggregation for simple cases (e.g. single integer) slower than repartitioning x10 rows?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is aggregation for simple cases (e.g. single integer) slower than repartitioning x10 rows?

Given the benchmark result, I think so. Especially if we have millions of distinct integer.


/// Should DataFusion use row number estimates at the input to decide
/// whether increasing parallelism is beneficial or not. By default,
Expand Down
17 changes: 17 additions & 0 deletions datafusion/core/tests/data/aggregate_mixed_type.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
c1,c2
1,'a'
2,'b'
3,'c'
4,'d'
1,'a'
2,'b'
3,'c'
4,'d'
4,'d'
3,'c'
3,'c'
5,'e'
6,'f'
7,'g'
8,'a'
9,'b'
23 changes: 16 additions & 7 deletions datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use tokio::task::JoinSet;

const BATCH_SIZE: usize = 50;

/// Tests that streaming aggregate and batch (non streaming) aggregate produce
/// same results
#[tokio::test(flavor = "multi_thread")]
Expand All @@ -60,13 +62,14 @@ async fn streaming_aggregate_test() {
];
let n = 300;
let distincts = vec![10, 20];
let len = 1000;
for distinct in distincts {
let mut join_set = JoinSet::new();
for i in 0..n {
let test_idx = i % test_cases.len();
let group_by_columns = test_cases[test_idx].clone();
join_set.spawn(run_aggregate_test(
make_staggered_batches::<true>(1000, distinct, i as u64),
make_staggered_batches::<true>(len, distinct, i as u64),
group_by_columns,
));
}
Expand All @@ -77,13 +80,19 @@ async fn streaming_aggregate_test() {
}
}

fn new_ctx() -> SessionContext {
let session_config = SessionConfig::new()
.with_batch_size(BATCH_SIZE)
// Ensure most of the fuzzing test cases doesn't skip the partial aggregation
.with_skip_partial_aggregation_probe_ratio_threshold(1.0);
SessionContext::new_with_config(session_config)
}

/// Perform batch and streaming aggregation with same input
/// and verify outputs of `AggregateExec` with pipeline breaking stream `GroupedHashAggregateStream`
/// and non-pipeline breaking stream `BoundedAggregateStream` produces same result.
async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str>) {
let schema = input1[0].schema();
let session_config = SessionConfig::new().with_batch_size(50);
let ctx = SessionContext::new_with_config(session_config);
let mut sort_keys = vec![];
for ordering_col in ["a", "b", "c"] {
sort_keys.push(PhysicalSortExpr {
Expand Down Expand Up @@ -141,14 +150,15 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
.unwrap(),
) as Arc<dyn ExecutionPlan>;

let task_ctx = ctx.task_ctx();
let task_ctx = new_ctx().task_ctx();
let collected_usual = collect(aggregate_exec_usual.clone(), task_ctx.clone())
.await
.unwrap();

let collected_running = collect(aggregate_exec_running.clone(), task_ctx.clone())
.await
.unwrap();

assert!(collected_running.len() > 2);
// Running should produce more chunk than the usual AggregateExec.
// Otherwise it means that we cannot generate result in running mode.
Expand Down Expand Up @@ -232,7 +242,7 @@ pub(crate) fn make_staggered_batches<const STREAM: bool>(
let mut batches = vec![];
if STREAM {
while remainder.num_rows() > 0 {
let batch_size = rng.gen_range(0..50);
let batch_size = rng.gen_range(0..BATCH_SIZE);
if remainder.num_rows() < batch_size {
break;
}
Expand Down Expand Up @@ -287,8 +297,7 @@ async fn group_by_string_test(
let expected = compute_counts(&input, column_name);

let schema = input[0].schema();
let session_config = SessionConfig::new().with_batch_size(50);
let ctx = SessionContext::new_with_config(session_config);
let ctx = new_ctx();

let provider = MemTable::try_new(schema.clone(), vec![input]).unwrap();
let provider = if sorted {
Expand Down
11 changes: 11 additions & 0 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,17 @@ impl SessionConfig {
self
}

/// Set the threshold for skip partial aggregation ratio
pub fn with_skip_partial_aggregation_probe_ratio_threshold(
mut self,
threshold: f64,
) -> Self {
self.options
.execution
.skip_partial_aggregation_probe_ratio_threshold = threshold;
self
}

/// Returns true if record batches will be examined between each operator
/// and small batches will be coalesced into larger batches.
pub fn coalesce_batches(&self) -> bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
&mut self,
cols: &[ArrayRef],
groups: &mut Vec<usize>,
_batch_hashes: &[u64],
) -> datafusion_common::Result<()> {
assert_eq!(cols.len(), 1);

Expand Down Expand Up @@ -108,7 +109,7 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {

self.num_groups = 0;
let mut group_indexes = vec![];
self.intern(&[remaining_group_values], &mut group_indexes)?;
self.intern(&[remaining_group_values], &mut group_indexes, &[])?;

// Verify that the group indexes were assigned in the correct order
assert_eq!(0, group_indexes[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl GroupValues for GroupValuesBytesView {
&mut self,
cols: &[ArrayRef],
groups: &mut Vec<usize>,
_batch_hashes: &[u64],
) -> datafusion_common::Result<()> {
assert_eq!(cols.len(), 1);

Expand Down Expand Up @@ -109,7 +110,7 @@ impl GroupValues for GroupValuesBytesView {

self.num_groups = 0;
let mut group_indexes = vec![];
self.intern(&[remaining_group_values], &mut group_indexes)?;
self.intern(&[remaining_group_values], &mut group_indexes, &[])?;

// Verify that the group indexes were assigned in the correct order
assert_eq!(0, group_indexes[0]);
Expand Down
21 changes: 6 additions & 15 deletions datafusion/physical-plan/src/aggregates/group_values/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use crate::aggregates::group_values::group_column::{
ByteGroupValueBuilder, GroupColumn, PrimitiveGroupValueBuilder,
};
use crate::aggregates::group_values::GroupValues;
use ahash::RandomState;
use arrow::compute::cast;
use arrow::datatypes::{
Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type,
Expand All @@ -28,7 +27,6 @@ use arrow::datatypes::{
use arrow::record_batch::RecordBatch;
use arrow_array::{Array, ArrayRef};
use arrow_schema::{DataType, Schema, SchemaRef};
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::{not_impl_err, DataFusionError, Result};
use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
use datafusion_expr::EmitTo;
Expand Down Expand Up @@ -68,9 +66,6 @@ pub struct GroupValuesColumn {

/// reused buffer to store hashes
hashes_buffer: Vec<u64>,

/// Random state for creating hashes
random_state: RandomState,
}

impl GroupValuesColumn {
Expand All @@ -83,7 +78,6 @@ impl GroupValuesColumn {
map_size: 0,
group_values: vec![],
hashes_buffer: Default::default(),
random_state: Default::default(),
})
}

Expand Down Expand Up @@ -143,9 +137,12 @@ macro_rules! instantiate_primitive {
}

impl GroupValues for GroupValuesColumn {
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
let n_rows = cols[0].len();

fn intern(
&mut self,
cols: &[ArrayRef],
groups: &mut Vec<usize>,
batch_hashes: &[u64],
) -> Result<()> {
if self.group_values.is_empty() {
let mut v = Vec::with_capacity(cols.len());

Expand Down Expand Up @@ -195,12 +192,6 @@ impl GroupValues for GroupValuesColumn {
// tracks to which group each of the input rows belongs
groups.clear();

// 1.1 Calculate the group keys for the group values
let batch_hashes = &mut self.hashes_buffer;
batch_hashes.clear();
batch_hashes.resize(n_rows, 0);
create_hashes(cols, &self.random_state, batch_hashes)?;

for (row, &target_hash) in batch_hashes.iter().enumerate() {
let entry = self.map.get_mut(target_hash, |(exist_hash, group_idx)| {
// Somewhat surprisingly, this closure can be called even if the
Expand Down
7 changes: 6 additions & 1 deletion datafusion/physical-plan/src/aggregates/group_values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,12 @@ pub trait GroupValues: Send {
/// If a row has the same value as a previous row, the same group id is
/// assigned. If a row has a new value, the next available group id is
/// assigned.
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()>;
fn intern(
&mut self,
cols: &[ArrayRef],
groups: &mut Vec<usize>,
batch_hashes: &[u64],
) -> Result<()>;

/// Returns the number of bytes of memory used by this [`GroupValues`]
fn size(&self) -> usize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ impl<T: ArrowPrimitiveType> GroupValues for GroupValuesPrimitive<T>
where
T::Native: HashValue,
{
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
fn intern(
&mut self,
cols: &[ArrayRef],
groups: &mut Vec<usize>,
_batch_hashes: &[u64],
) -> Result<()> {
assert_eq!(cols.len(), 1);
groups.clear();

Expand Down
7 changes: 6 additions & 1 deletion datafusion/physical-plan/src/aggregates/group_values/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,12 @@ impl GroupValuesRows {
}

impl GroupValues for GroupValuesRows {
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
fn intern(
&mut self,
cols: &[ArrayRef],
groups: &mut Vec<usize>,
_batch_hashes: &[u64],
) -> Result<()> {
// Convert the group keys into the row format
let group_rows = &mut self.rows_buffer;
group_rows.clear();
Expand Down
Loading