-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement trait based API for define AggregateUDF #8733
Conversation
This looks amazing @guojidan -- thank you 🙏 -- let me know if I can help with moving this PR along |
The CI test failure is unrelated to this PR -- and @jayzhan211 fixed it in #8775. The CI should be clean with a merge up from main https://github.com/apache/arrow-datafusion/actions/runs/7429132424/job/20217318586?pr=8733 I'll try and review this PR tomorrow. THank you aain @guojidan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much @guojidan -- this is really well done. Thank you. I had a few minor suggestions about comments, but I don't think they are necessary to merge this PR.
I am also quite excited about this PR as it allows us to expose the GroupsAccumulator
trait
I will file a ticket about doing so shortly
} | ||
|
||
/// This is the accumulator factory; DataFusion uses it to create new accumulators. | ||
fn accumulator(&self, _arg: &DataType) -> Result<Box<dyn Accumulator>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
While this "advanced" usage isn't much more advanced than the current "simple" UDAF I think this PR now provides a home / plausible way to implement the full GroupsAccumulator
API for UDAFs (which is the powerful, very performant API used by built in aggregate functions in DataFusion)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed #8793 to track
}) | ||
} | ||
|
||
// Optimization hint: this trait also supports `update_batch` and `merge_batch`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this optimization hit seems out of place -- I think this comment should say something more like
// Merge the output of `Self::state()` from other instances of this accumulator
// into this accumulator's state
); | ||
let state_type = vec![DataType::Int64]; | ||
|
||
// let udaf = AggregateUDF::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean to leave this commented out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forget delete this old code 😅
datafusion/expr/src/udaf.rs
Outdated
/// This is the accumulator factory [`AccumulatorFactoryFunction`]; | ||
/// DataFusion uses it to create new accumulators. | ||
fn accumulator(&self, arg: &DataType) -> Result<Box<dyn Accumulator>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// This is the accumulator factory [`AccumulatorFactoryFunction`]; | |
/// DataFusion uses it to create new accumulators. | |
fn accumulator(&self, arg: &DataType) -> Result<Box<dyn Accumulator>>; | |
/// Return a new [`Accumulator`] that aggregates values for a specific | |
/// group during query execution. | |
fn accumulator(&self, arg: &DataType) -> Result<Box<dyn Accumulator>>; |
datafusion/expr/src/udaf.rs
Outdated
/// This is the description of the state. | ||
/// accumulator's state() must match the types here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// This is the description of the state. | |
/// accumulator's state() must match the types here. | |
/// Return the type used to serialize the [`Accumulator`]'s intermediate state. | |
/// See [`Accumulator::state()`] for more details |
&state_type, | ||
); | ||
let my_avg = AggregateUDF::from(SimpleAggregateUDF::new_with_signature( | ||
"MY_AVG".to_string(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since new_with_signature
takes impl Into<String>
I think this would also work:
"MY_AVG".to_string(), | |
"MY_AVG", |
@@ -899,23 +898,57 @@ mod test { | |||
|
|||
#[test] | |||
fn aggregate() -> Result<()> { | |||
#[derive(Debug, Clone)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could use SimpleAggregateUDF here as well. Not sure if that is any better / worse though
Thanks again @guojidan |
Which issue does this PR close?
Closes #8710 .
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?