Skip to content

Commit

Permalink
Simplify update_skip_aggregation_probe method (#12332)
Browse files Browse the repository at this point in the history
* Simplify update_skip_aggregation_probe method

* Add assert & doc

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
lewiszlw and alamb authored Sep 26, 2024
1 parent 26c8004 commit 524e56d
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl GroupsAccumulatorAdapter {
opt_filter.as_ref().map(|f| f.as_boolean()),
offsets,
)?;
(f)(state.accumulator.as_mut(), &values_to_accumulate)?;
f(state.accumulator.as_mut(), &values_to_accumulate)?;

// clear out the state so they are empty for next
// iteration
Expand Down
22 changes: 4 additions & 18 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,6 @@ impl SkipAggregationProbe {
self.should_skip
}

/// Provides an ability to externally set `should_skip` flag
/// to `false` and prohibit further state updates
fn forbid_skipping(&mut self) {
self.should_skip = false;
self.is_locked = true;
}

/// Record the number of rows that were output directly without aggregation
fn record_skipped(&mut self, batch: &RecordBatch) {
self.skipped_aggregation_rows.add(batch.num_rows());
Expand Down Expand Up @@ -1009,19 +1002,12 @@ impl GroupedHashAggregateStream {
}

/// Updates skip aggregation probe state.
///
/// In case stream has any spills, the probe is forcefully set to
/// forbid aggregation skipping, and locked, since spilling resets
/// total number of unique groups.
///
/// Note: currently spilling is not supported for Partial aggregation
fn update_skip_aggregation_probe(&mut self, input_rows: usize) {
if let Some(probe) = self.skip_aggregation_probe.as_mut() {
if !self.spill_state.spills.is_empty() {
probe.forbid_skipping();
} else {
probe.update_state(input_rows, self.group_values.len());
}
// Skip aggregation probe is not supported if stream has any spills,
// currently spilling is not supported for Partial aggregation
assert!(self.spill_state.spills.is_empty());
probe.update_state(input_rows, self.group_values.len());
};
}

Expand Down

0 comments on commit 524e56d

Please sign in to comment.