Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement DISTINCT ON from Postgres #7981

Merged
merged 23 commits into from
Nov 13, 2023
Merged

Conversation

gruuya
Copy link
Contributor

@gruuya gruuya commented Oct 30, 2023

Which issue does this PR close?

Closes #7827.
Closes #8008.

Rationale for this change

While in principle the output of DISTINCT ON can be emulated by combining several aggregation, projection and sorting queries/operators, in practice this is very verbose and impractical for users, unlike DISTINCT ON which is quite succinct. On the other hand that does mean that DISTINCT ON can be optimized into the equivalent representation under the hood during logical planning, while at the same time providing the compact syntactical interface.

What changes are included in this PR?

  • A new logical plan sub-variety for handling the DISTINCT ON from Postgres.
  • Parsing and planning for it
  • Optimization which converts it into the equivalent representation using the more common logical plan primitives
  • Proto serialization
  • Tests

Are these changes tested?

Yes, there are also new SLTs.

Are there any user-facing changes?

SQL support extends to encompass DISTINCT ON queries.

@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) substrait labels Oct 30, 2023
gruuya

This comment was marked as resolved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All tests here were compared for accuracy against Postgres.

Note that, while the docs state that in the absence of the ORDER BY clause the output is unpredictable, in practice I've noticed it is stable on Postgres for the table below, whereas that isn't the case for DataFusion, hence why I omitted those tests for now.

Any ideas on what else can be tested here are welcome (cc @universalmind303).

Comment on lines 240 to 252
if affected_id.is_empty() {
// Alias aggregation epxressions if they have changed
// TODO: This should have really been identified above and handled in the `else` branch
let aggr_exprs = new_aggr_expr
.iter()
.zip(aggr_expr.iter())
.map(|(new_expr, old_expr)| {
new_expr.clone().alias_if_changed(old_expr.display_name()?)
})
.collect::<Result<Vec<Expr>>>()?;
// Since group_epxr changes, schema changes also. Use try_new method.
Aggregate::try_new(Arc::new(new_input), new_group_expr, new_aggr_expr)
Aggregate::try_new(Arc::new(new_input), new_group_expr, aggr_exprs)
.map(LogicalPlan::Aggregate)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't yet understand why this wasn't identified/aliased automatically, but I found this part to be necessary in order to get DISTINCT ON with complex expressions to work.

Copy link
Contributor Author

@gruuya gruuya Nov 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think I understand what is going on:

  • in general the root aggregation expressions need not be rewritten/aliased above since they can only appear once (i.e. they're not common), so this is ok
  • however, some of their child expressions may get rewritten; an example is the query select distinct on (c1 + 1) c2 from test order by c1 + 1, c3, where the initial aggregation of the sole projected field gets rewritten from FIRST_VALUE(test.column2) ORDER BY [test.column1 + Int64(1) ASC NULLS LAST, test.column3 ASC NULLS LAST] to FIRST_VALUE(test.column2) ORDER BY [test.column1 + Int64(1)Int64(1)test.column1 AS test.column1 + Int64(1) ASC NULLS LAST, test.column3 ASC NULLS LAST] (i.e. c1 + 1 is a common expression of it and the ON expression so it gets rewritten)
  • consequently the new schema will be different from the original one

I think this explains the behavior I've encountered here, and that this behavior is not anomalous. The only question is whether there is a better way to align the new aggregation schema with the old one besides aliasing as above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe @waynexia has some thoughts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I've now clarified this comment since it was misleading, i.e. the aliasing should really be done in build_recover_project_plan once qualified aliasing is enabled.

) -> Result<Self> {
let on_expr = normalize_cols(on_expr, input.as_ref())?;

// Create fields with any qualifier stuffed in the name itself
Copy link
Contributor Author

@gruuya gruuya Oct 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is sort of a hack, but the reason I was forced to do it is so that the plan schema remains unchanged after replace_distinct_aggregate. In particular, in that optimizer I alias the final projected expressions with the selection expression display name, but that always results in a schema without qualifiers (with any original qualifier crammed into the field name).

I feel like this case should be handled by Projection::try_new_with_schema over at replace_distinct_aggregate, but it seems that after #7919 the custom schema doesn't propagate through the optimization chain.

UPDATE: I've opened a related issue for this: #8008

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is sort of a hack, but the reason I was forced to do it is so that the plan schema remains unchanged after replace_distinct_aggregate.

Note #7997 may be related

@gruuya gruuya marked this pull request as ready for review October 31, 2023 12:04
@alamb
Copy link
Contributor

alamb commented Nov 1, 2023

I am sorry -- I have seen this PR but I am somewhat backed up on reviews

@waynexia waynexia added the api change Changes the API exposed to users of the crate label Nov 2, 2023
Comment on lines +166 to +167
LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add api-change label for this diff

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These cases are excellent 👍 . It might be better if we could include some negative cases. Here are two I got from PG:

SELECT DISTINCT ON () c1, c2 FROM aggregate_test_100 ORDER BY c1, c3;
ERROR:  syntax error at or near ")"
LINE 1: SELECT DISTINCT ON () c1, c2 FROM aggregate_test_100 ORDER B...

SELECT DISTINCT ON (c2) c1, c2 FROM aggregate_test_100 ORDER BY c1, c3;
ERROR:  SELECT DISTINCT ON expressions must match initial ORDER BY expressions
LINE 1: SELECT DISTINCT ON (c2) c1, c2 FROM aggregate_test_100 ORDER...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Also good catch, the first example was throwing a panic; added handling and a test case for it.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gruuya -- this is looking pretty close.

pub on_expr: Vec<Expr>,
/// The selected projection expression list
pub select_expr: Vec<Expr>,
/// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the same list of exprs is included in both the select_list and sort_exprs I wonder if it would be less error prone to have references rather than keep three parallel lists:

    /// The selected projection expression list
    pub select_expr: Vec<Expr>,
    /// The sort expressions
    pub sort_expr: Vec<Expr>,
    /// the number of prefix columns from `sort_expr` that form the `ON` clause used for deduplicating

Copy link
Contributor Author

@gruuya gruuya Nov 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, on_expr and sort_expr should have a (partial) overlap (select_expr and sort_expr are arbitrary and different in general).

Condensing on_expr and sort_expr into one is doable, though it seems to me it would introduce more complexity, and make the logic harder to grasp. For starters sort_expr is not an exact super-set of on_expr, but instead wraps them inside Expr::Sorts to capture asc and nulls_first, but it can also be omitted (unlike ON exprs). On the other hand, these two are parsed at different places: first the on_expr is parsed in select_to_plan, and then sort_expr is extracted in order_by.

So in order to keep them both in one vec we'd have to: a) add an additional field to DistinctOn to track the length of the ON expressions as you mention (e.g. on_expr_count), b) if there are no actual sort_expr provided keep the ON expressions in them, otherwise validate that the wrapped sorting expressions match the existing ON expressions and replace the vector with the sorting expressions, and then c) inside replace_distinct_aggregate deconstruct that. That would mean take on_expr_count first expressions from sort_expr, and unwrapping the underlying expressions from Expr::Sort in case sort expression length is > on_expr_count. In case sort expression length is equal to on_expr_count it could mean no ORDER BY was provided (no need for unwrapping), or the ORDER BY expressions match the ON expressions modulo the aforementioned sorting specifiers (asc and nulls_first).

This strikes me as less clear all in all, though if you'd prefer it I'll make that change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your explanation makes sense. Maybe we can take some of this rationale (e.g. that sort_exprs are not a superset as they are wrapped by Expr::Sort) and put them into the doc comments. We can do this as a follow on PR

Comment on lines 240 to 252
if affected_id.is_empty() {
// Alias aggregation epxressions if they have changed
// TODO: This should have really been identified above and handled in the `else` branch
let aggr_exprs = new_aggr_expr
.iter()
.zip(aggr_expr.iter())
.map(|(new_expr, old_expr)| {
new_expr.clone().alias_if_changed(old_expr.display_name()?)
})
.collect::<Result<Vec<Expr>>>()?;
// Since group_epxr changes, schema changes also. Use try_new method.
Aggregate::try_new(Arc::new(new_input), new_group_expr, new_aggr_expr)
Aggregate::try_new(Arc::new(new_input), new_group_expr, aggr_exprs)
.map(LogicalPlan::Aggregate)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe @waynexia has some thoughts

Projection::try_new(col_exprs, Arc::new(input))
.expect("Cannot build projection plan from an invalid schema"),
)
Ok(LogicalPlan::Projection(Projection::try_new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -32,6 +36,22 @@ use datafusion_expr::{Aggregate, Distinct, LogicalPlan};
/// ```text
/// SELECT a, b FROM tab GROUP BY a, b
/// ```
///
/// On the other hand, for a `DISTINCT ON` query the replacement is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you -- this is very helpful to understand what is going on

# order by the third
# TODO: Note that the value for 'a' is actually degenerate, and while Postgres
# returns the actual first match (4), DataFusion returns the last one, presumably
# because `first_value` aggregation function doesn't have a dedicated `GroupsAccumulator`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't make sense to me -- we should get the correct values, regardless of the presence of a groups accumulator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will investigate in more depth and report back. (It's indicative that the equivalent query without using DISTINCT ON returns the same results.)

Copy link
Contributor Author

@gruuya gruuya Nov 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see what this is, it's another case of unstable sort, i.e. the sorted rows that have equal ordering expressions evaluations do not preserve the original order of those rows (but they're stable in the sense that output is always the same):

select c1, c2, c3 from test order by c1, c3;
+----+----+------+
| c1 | c2 | c3   |
+----+----+------+
| a  | 5  | -101 |
| a  | 4  | -101 |
| a  | 1  | -85  |
| a  | 3  | -72  |
| a  | 1  | -56  |
...

I've removed the misleading comment now.

@alamb
Copy link
Contributor

alamb commented Nov 2, 2023

I am a little worried about the TODO and the seemingly incorrect results,

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @gruuya -- I think this PR is looking very nice. My only hesitation I have is the schema rewriting code. My concern is that changing the schema in the LogicalPlan nodes when the node doesn't logically change the schema seems like be breaks an invariant and thus will be brittle (aka this feature may break with combined with other complex queries).

I read #8008 and I think we should figure out what is going on there before merging this

I actually tried merging this branch up from master to get the fix for #7997 and removing the workaround, and sadly it still errored

pub on_expr: Vec<Expr>,
/// The selected projection expression list
pub select_expr: Vec<Expr>,
/// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your explanation makes sense. Maybe we can take some of this rationale (e.g. that sort_exprs are not a superset as they are wrapped by Expr::Sort) and put them into the doc comments. We can do this as a follow on PR


# Basic example + omit ON column from selection
query I
SELECT DISTINCT ON (c1) c2 FROM aggregate_test_100 ORDER BY c1, c3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you maybe also include an explain plan here so we can validate the plan?

Something like

EXPLAIN SELECT DISTINCT ON (c1) c3, c2 FROM aggregate_test_100 ORDER BY c1, c3;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, adding that test as well!

// this on it's own isn't enough to guarantee the proper output order of the grouping
// (`ON`) expression, so we need to sort those as well.
LogicalPlanBuilder::from(plan)
.sort(sort_expr[..on_expr.len()].to_vec())?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to sort the input prior to aggregating it? It seems like sorting to do the aggregate and then sorting again is not as efficient as being able to just sort once and reuse the output.

In theory the optimizer should handle this rewrite, but it might be better to just start with the sort under the aggregate

Copy link
Contributor Author

@gruuya gruuya Nov 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, my thinking was that the first_value aggregation actually doesn't do any real sorting and is more like top-1, i.e. it only works on the incoming stream and picks up one row per-grouping set (ON clause), updating it if it's a better match according to the sorting expressions:
https://github.com/apache/arrow-datafusion/blob/2af326a0bdbb60e7d7420089c7aa9ff22c7319ee/datafusion/physical-expr/src/aggregate/first_last.rs#L236-L244
The subsequent actual sort works on the grouped data, so it should have a much lower cardinality.

Whereas if we sorted prior to aggregation then, I think we would materialize all the data initially (whether in memory or by spilling) only to throw away most of it, but I could be wrong.

Granted I haven't done any perf testing between the two approaches for now, this is just from my reading of the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see now that my assumptions were wrong, and that indeed sorting gets pulled down beneath aggregation during physical planning anyway. I do think that in the case of FIRST_VALUE with ORDER BY this is sub-optimal, since that problem reduces to a partitioned top-1 problem, which doesn't need to work on pre-sorted input I think.

Either way, even when I pull sorting beneath the aggregation, I still find I end up needing another post-aggregation sort to get the correct order for the ON clause expressions, since presumably AggregateExec doesn't preserve order:

❯ explain select distinct on (c1) c1, c2 from test order by c1, c3;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                            |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: FIRST_VALUE(test.c1) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST] AS test.c1, FIRST_VALUE(test.c2) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST] AS test.c2                |
|               |   Aggregate: groupBy=[[test.c1]], aggr=[[FIRST_VALUE(test.c1) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST], FIRST_VALUE(test.c2) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST]]]       |
|               |     Sort: test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST                                                                                                                                                        |
|               |       TableScan: test projection=[c1, c2, c3]                                                                                                                                                                   |
| physical_plan | ProjectionExec: expr=[FIRST_VALUE(test.c1) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST]@1 as test.c1, FIRST_VALUE(test.c2) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST]@2 as test.c2] |
|               |   AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[FIRST_VALUE(test.c1), FIRST_VALUE(test.c2)], ordering_mode=Sorted                                                                               |
|               |     SortExec: expr=[c1@0 ASC NULLS LAST]                                                                                                                                                                        |
|               |       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                               |
|               |         RepartitionExec: partitioning=Hash([c1@0], 10), input_partitions=10                                                                                                                                     |
|               |           AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[FIRST_VALUE(test.c1), FIRST_VALUE(test.c2)], ordering_mode=Sorted                                                                                |
|               |             RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1                                                                                                                               |
|               |               SortExec: expr=[c1@0 ASC NULLS LAST,c3@2 ASC NULLS LAST]                                                                                                                                          |
|               |                 CsvExec: file_groups={1 group: [[Users/markogrujic/Splitgraph/arrow-datafusion/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], has_header=true                             |
|               |                                                                                                                                                                                                                 |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

compare that with the currently generated plans which do get the order correct:

❯ explain select distinct on (c1) c1, c2 from test order by c1, c3;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                            |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: FIRST_VALUE(test.c1) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST] AS test.c1, FIRST_VALUE(test.c2) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST] AS test.c2                |
|               |   Sort: test.c1 ASC NULLS LAST                                                                                                                                                                                  |
|               |     Aggregate: groupBy=[[test.c1]], aggr=[[FIRST_VALUE(test.c1) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST], FIRST_VALUE(test.c2) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST]]]     |
|               |       TableScan: test projection=[c1, c2, c3]                                                                                                                                                                   |
| physical_plan | ProjectionExec: expr=[FIRST_VALUE(test.c1) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST]@1 as test.c1, FIRST_VALUE(test.c2) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST]@2 as test.c2] |
|               |   SortPreservingMergeExec: [c1@0 ASC NULLS LAST]                                                                                                                                                                |
|               |     SortExec: expr=[c1@0 ASC NULLS LAST]                                                                                                                                                                        |
|               |       AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[FIRST_VALUE(test.c1), FIRST_VALUE(test.c2)]                                                                                                 |
|               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                             |
|               |           RepartitionExec: partitioning=Hash([c1@0], 10), input_partitions=10                                                                                                                                   |
|               |             AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[FIRST_VALUE(test.c1), FIRST_VALUE(test.c2)], ordering_mode=Sorted                                                                              |
|               |               SortExec: expr=[c1@0 ASC NULLS LAST,c3@2 ASC NULLS LAST]                                                                                                                                          |
|               |                 RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1                                                                                                                           |
|               |                   CsvExec: file_groups={1 group: [[Users/markogrujic/Splitgraph/arrow-datafusion/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], has_header=true                           |
|               |                                                                                                                                                                                                                 |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

@gruuya
Copy link
Contributor Author

gruuya commented Nov 4, 2023

@alamb thanks for bearing with me here! Yeah, I agree, ideally #8008 gets some kind of a resolution prior to merging this.

@gruuya
Copy link
Contributor Author

gruuya commented Nov 10, 2023

Ok, the last commit (8ebdc13) resolves #8008, so I think this might actually be mergable now.

@alamb
Copy link
Contributor

alamb commented Nov 10, 2023

Thank you @gruuya -- this is on my review queue

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @gruuya -- this looks really nice to me. Thank you 🙏 Thanks for sticking with the qualified Alias stuff. I think that is a much nicer design now

The only thing I saw was a comment that might be out of date and there is now conflict with this PR.

Thanks again and sorry for the delay in review

@@ -238,6 +238,16 @@ impl CommonSubexprEliminate {
let rewritten = pop_expr(&mut rewritten)?;

if affected_id.is_empty() {
// Alias aggregation expressions if they have changed
// TODO: This should be handled in `build_recover_project_plan` once qualified aliases
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment is out of date

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, removed and merged latest main.

Thanks for your patience on this one! :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your patience on this one! :)

It is I who should be thanking you.

Thanks again @gruuya

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate logical-expr Logical plan and expressions optimizer Optimizer rules sql SQL Planner sqllogictest SQL Logic Tests (.slt) substrait
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement qualified expression alias Add sql support for DISTINCT ON
4 participants