-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simplify GroupByHash implementation (to prepare for more work) #4972
Conversation
4ab7e5f
to
addd71d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewing this PR with whitespace blind diff I think makes it easier to see what changed: https://github.com/apache/arrow-datafusion/pull/4972/files?w=1
|
||
/// Actual implementation of [`GroupedHashAggregateStream`]. | ||
/// | ||
/// This is wrapped into yet another struct because we need to interact with the async memory management subsystem |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment about another struct for memory management is out of date and so I folded GroupedHashAggregateStreamInner
directly into GroupedHashAggregateStream
@@ -115,6 +106,14 @@ struct GroupedHashAggregateStreamInner { | |||
indices: [Vec<Range<usize>>; 2], | |||
} | |||
|
|||
#[derive(Debug)] | |||
/// tracks what phase the aggregation is in | |||
enum ExecutionState { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This used to be tracked using several multi-level match
statement and a fused
inner stream. Now it is represented explicitly in this stream
/// | ||
/// If successfull, this returns the additional number of bytes that were allocated during this process. | ||
/// | ||
/// TODO: Make this a member function of [`GroupedHashAggregateStream`] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DONE!
@@ -576,138 +568,131 @@ impl std::fmt::Debug for RowAggregationState { | |||
} | |||
} | |||
|
|||
/// Create a RecordBatch with all group keys and accumulator' states or values. | |||
#[allow(clippy::too_many_arguments)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
likewise here, moved from a free function to a member function on GroupedHashAggregateStream
|
||
// seems like some consumers call this stream even after it returned `None`, so let's fuse the stream. | ||
let stream = stream.fuse(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We used to fuse the stream implicitly -- but it is now handled via ExecutionState::Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, only one minor inline comment. This part of the code is really getting tidied up 🙂
match result.and_then(|allocated| { | ||
this.row_aggr_state.reservation.try_grow(allocated) | ||
self.row_aggr_state.reservation.try_grow(allocated) | ||
}) { | ||
Ok(_) => continue, | ||
Err(e) => Err(ArrowError::ExternalError(Box::new(e))), | ||
Ok(_) => {} | ||
Err(e) => { | ||
return Poll::Ready(Some(Err( | ||
ArrowError::ExternalError(Box::new(e)), | ||
))) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the Ok
case is a no-op, an if let Err(e) = ...
seems to be more idiomatic here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree -- changed in 06847b5. Thank you for the suggestion
I plan to merge this tomorrow unless anyone would like more time to review or comment |
Benchmark runs are scheduled for baseline = f5439c8 and contender = 350cb47. 350cb47 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Draft as it builds on #4924Which issue does this PR close?
re #4973
Rationale for this change
Follow on to #4924 work from @mustafasrepo and @ozankabak
As we prepare to improve group by performance even more, we will be working on this code going forward.
There are several TODOs in the group by hash code as well as some out of date comments that make it harder to work with. Given the thinking / plans to improve this code it is important it remains relatively easy to work with
Since I had all the code paged in anyways as I was reviewing #4924 I figured I would add my comments here
What changes are included in this PR?
GroupedHashAggregateStreamInner
group_aggregate_batch
andcreate_batch_from_map
member functions rather than free functions (and remove clippy warnings)Are these changes tested?
Existing tests cover these cases (this is a refactor)
Are there any user-facing changes?
No
Benchmark results
I think the benchmarks show no significant changes (other than noise)
Click me