Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement quantile_cont()/quantile_disc() aggregate functions #7337

Closed
wants to merge 3 commits into from

Conversation

2010YOUY01
Copy link
Contributor

@2010YOUY01 2010YOUY01 commented Aug 18, 2023

Which issue does this PR close?

NA

Rationale for this change

This PR would like to support quantile_cont() and quantile_disc() statistical aggregate functions.

select quantile_cont(column1, 0.3) from (values (0), (50), (100));
+-------------------------------------+
| QUANTILE_CONT(column1,Float64(0.3)) |
+-------------------------------------+
| 30                                  |
+-------------------------------------+
1 row in set. Query took 0.010 seconds.

It will sort column1 in ASC order and calculate its 30% percentile.
It has been implemented in DuckDB: https://duckdb.org/docs/sql/aggregates.html#statistical-aggregates

The more common equivalent syntax is:

SELECT percentile_cont(0.3) WITHIN GROUP (ORDER BY column1)
FROM test_scores;

this within group syntax is available in PostgreSQL, Spark, etc.
https://www.postgresql.org/docs/9.5/functions-aggregate.html

What changes are included in this PR?

Reuse the original Accumulator in median() aggregate function (moved from median.rs->percentile.rs) to implement quantile_*() functions.
Now the PercentileAccumulator is used in median/quantile_disc/quantile_cont aggregate functions.

I think this is one of the trickier aggregate functions: regular aggregate functions only maintain O(1) internal states inside Accumulator (e.g. AvgAccumulator only have to keep sum and count), while PercentileAccumulator have to keep a O(n) internal state.
It will cache all data during aggregation, and sort the data and fetch the target percentile at final evaluation.
This PR only extends the functionality to support new aggregate functions, but the underlying Accumulator is worth taking a look at for optimizations in the future.

Are these changes tested?

sqllogictests

Are there any user-facing changes?

No

@github-actions github-actions bot added logical-expr Logical plan and expressions physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt) labels Aug 18, 2023
// = 20
// `quantile_disc()` choose the closer dp (pick one with lower percentile if equally close)
// `target_percentile` is closer to dp1's percentile, result = 0
macro_rules! interpolate_logic {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This complex macro is used 1. Reduce float arithmetic errors 2. Avoid potential overflows
Looks like can be avoided if multiply and divide operation on ScalarValues like this one is available
https://github.com/apache/arrow-datafusion/blob/672f5bdfe4df2270f5ad7b1a49bb1135acd5f9e7/datafusion/common/src/scalar.rs#L2057-L2060C4
seem they're available in arrow-rs? But require to add them into datafusion

@2010YOUY01 2010YOUY01 changed the title Implement quantile_cont()/quantile_disc() aggregate functions feat: Implement quantile_cont()/quantile_disc() aggregate functions Aug 18, 2023
@alamb
Copy link
Contributor

alamb commented Aug 22, 2023

I plan to review this PR today

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @2010YOUY01 . This PR, as all your others, is well written, documented and tested and is easy to read and understand. Thank you so much.

Sorting

I started looking at this implementation and I agree with you that trying to store all the values for the group is unlikely to work well once the data gets to a certain size

However, I think you can probably have DataFusion do the sorting for you (and reuse all the existing fast multi core sorting and sort based optimizations) by specifying an ORDER BY in the aggregate argument

❯ create table foo(x integer) as values (0), (50), (100);
0 rows in set. Query took 0.007 seconds.

❯ select * from foo;
+-----+
| x   |
+-----+
| 0   |
| 50  |
| 100 |
+-----+
3 rows in set. Query took 0.002 seconds.

❯ select first_value(x order by x) from foo;
+--------------------+
| FIRST_VALUE(foo.x) |
+--------------------+
| 0                  |
+--------------------+
1 row in set. Query took 0.005 seconds.

❯ select first_value(x order by x DESC) from foo;
+--------------------+
| FIRST_VALUE(foo.x) |
+--------------------+
| 100                |
+--------------------+
1 row in set. Query took 0.005 seconds.

It does not yet support the WITHIN GROUP syntax but I think that would be a relatively straightforward extension.

❯ select first_value(x) WITHIN GROUP (ORDER BY x DESC) FROM foo;  🤔 Invalid statement: sql parser error: Expected end of statement, found: GROUP

Maybe instead of having to sort the values explicitly you could rewrite the query to put an ORDER by in automatically

select quantile_cont(c2, 0.3) from ...

to

select quantile_cont(c2 ORDER BY c2, 0.3) from ...

And then implement the aggregate assuming the input is already sorted

Does this belong in Datafusion core? Or does it belong as an add on?

With this level of specialization required, I wonder where shall we stop adding built in aggregate functions and where will we begin adding add on packages. I worry there is currently no agreed upon boundary yet.

Do you have time to think about breaking these more specialized aggregates into a separate crate (it could be in the datafusion repo), something like #7110 ?

@2010YOUY01
Copy link
Contributor Author

Thank you @2010YOUY01 . This PR, as all your others, is well written, documented and tested and is easy to read and understand. Thank you so much.

Sorting

I started looking at this implementation and I agree with you that trying to store all the values for the group is unlikely to work well once the data gets to a certain size

Thank you for this very detailed review feedback!

This order by approach is what I was thinking about, will try to do this approach.
I'm trying to figure out how to do this query rewrite, do you remember any reference PR that did a query rewrite similar to this order by rewrite?

@2010YOUY01
Copy link
Contributor Author

Does this belong in Datafusion core? Or does it belong as an add on?

With this level of specialization required, I wonder where shall we stop adding built in aggregate functions and where will we begin adding add on packages. I worry there is currently no agreed upon boundary yet.

Do you have time to think about breaking these more specialized aggregates into a separate crate (it could be in the datafusion repo), something like #7110 ?

Though how to separate existing functions requires further discussion, I think it is necessary to build an interface to put a set of functions into a separate crate. Besides reducing binary size, it can also make the extension management for UDFs more organized and user friendly. Will think about this issue later and see if there is any good way to do it

@2010YOUY01 2010YOUY01 marked this pull request as draft August 23, 2023 02:30
@alamb
Copy link
Contributor

alamb commented Aug 23, 2023

I'm trying to figure out how to do this query rewrite, do you remember any reference PR that did a query rewrite similar to this order by rewrite?

Perhaps https://github.com/apache/arrow-datafusion/blob/main/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs or https://github.com/apache/arrow-datafusion/blob/main/datafusion/optimizer/src/unwrap_cast_in_comparison.rs would serve as good inspirations

Though how to separate existing functions requires further discussion, I think it is necessary to build an interface to put a set of functions into a separate crate. Besides reducing binary size, it can also make the extension management for UDFs more organized and user friendly. Will think about this issue later and see if there is any good way to do it

Thank you

@2010YOUY01
Copy link
Contributor Author

#7376 did several smart optimizations for median()
For example a O(n) quick select in the final evaluate step for aggregation

For select median(l_partkey) from lineitem using sf10 parquet TPCH data:
Before -- ~20s
After -- ~4s
Use multi-core sorting -- estimated ~2s

Now multi-core sorting approach seems unnecessary, however, the above query only spends ~1% of time doing quick select, and most time is spent doing data type conversion/copying
I'll experiment if there is any way to make median() faster before finishing this PR

Copy link

github-actions bot commented May 4, 2024

Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days.

@github-actions github-actions bot added the Stale PR has not had any activity for some time label May 4, 2024
@github-actions github-actions bot closed this May 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
logical-expr Logical plan and expressions physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt) Stale PR has not had any activity for some time
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants