Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve contradictory requirements by conversion of ordering sensitive aggregators #6482

Merged
merged 98 commits into from
Jun 3, 2023
Merged

Resolve contradictory requirements by conversion of ordering sensitive aggregators #6482

merged 98 commits into from
Jun 3, 2023

Conversation

mustafasrepo
Copy link
Contributor

@mustafasrepo mustafasrepo commented May 30, 2023

Which issue does this PR close?

Closes #.

Rationale for this change

Some of the seemingly conflicting ordering requirements can be resolved by reversing the aggregation function if possible.
For instance, consider query below

SELECT country, FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
  LAST_VALUE(amount ORDER BY amount DESC) AS fv2,
  ARRAY_AGG(amount ORDER BY amount DESC) AS amounts
  FROM sales_global
  GROUP BY country

requirements amount ASC by FIRST_VALUE, and amount DESC by LAST_VALUE are conflicting. However, FIRST_VALUE(amount ORDER BY amount ASC) AS fv1 is equivalent to LAST_VALUE(amount ORDER BY amount DESC) AS fv1, in terms of behavior. Hence by converting query above to its equivalent form below

SELECT country, LAST_VALUE(amount ORDER BY amount DESC) AS fv1,
  LAST_VALUE(amount ORDER BY amount DESC) AS fv2,
  ARRAY_AGG(amount ORDER BY amount DESC) AS amounts
  FROM sales_global
  GROUP BY country

we can resolve conflicting requirement, and successfully run equivalent version.

As another use case assume that, sales_global table is ordered by ts ASC. Query below,

SELECT country, LAST_VALUE(amount ORDER BY ts DESC) AS lv1
  FROM sales_global
  GROUP BY country

It produces following plan

ProjectionExec: expr=[country@0 as country, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as lv1]
  AggregateExec: mode=Single, gby=[country@0 as country], aggr=[LAST_VALUE(sales_global.amount)]
    SortExec: expr=[ts@1 DESC]
      MemoryExec: partitions=1, partition_sizes=[1], output_ordering=[ts ASC]

However, if we were to treat above query in its equivalent for like below

SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS lv1
  FROM sales_global
  GROUP BY country

we can produce following plan

ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as lv1]
  AggregateExec: mode=Single, gby=[country@0 as country], aggr=[LAST_VALUE(sales_global.amount)]
    MemoryExec: partitions=1, partition_sizes=[1], output_ordering=[ts ASC]

What changes are included in this PR?

This PR analyzes reverse ordering requirements, both to resolve conflicting ordering requirements and also remove away SortExecs from the physical plan, if it is possible.

Are these changes tested?

Yes new tests are added to groupby.slt file.

Are there any user-facing changes?

mustafasrepo and others added 30 commits May 3, 2023 14:46
# Conflicts:
#	datafusion/core/tests/sqllogictests/test_files/aggregate.slt
# Conflicts:
#	datafusion/core/src/physical_plan/planner.rs
#	datafusion/core/tests/sqllogictests/test_files/aggregate.slt
#	datafusion/expr/src/expr.rs
#	datafusion/expr/src/tree_node/expr.rs
#	datafusion/expr/src/udaf.rs
#	datafusion/optimizer/src/analyzer/type_coercion.rs
#	datafusion/optimizer/src/common_subexpr_eliminate.rs
#	datafusion/proto/src/logical_plan/from_proto.rs
#	datafusion/proto/src/logical_plan/mod.rs
#	datafusion/proto/src/logical_plan/to_proto.rs
#	datafusion/sql/src/expr/function.rs
#	datafusion/sql/src/utils.rs
…rst_last_aggregate

# Conflicts:
#	datafusion/expr/Cargo.toml
mustafasrepo and others added 7 commits May 27, 2023 17:47
# Conflicts:
#	datafusion/core/src/physical_plan/aggregates/mod.rs
#	datafusion/core/tests/sqllogictests/test_files/groupby.slt
#	datafusion/core/tests/sqllogictests/test_files/window.slt
#	datafusion/expr/src/aggregate_function.rs
#	datafusion/expr/src/window_function.rs
#	datafusion/physical-expr/src/lib.rs
@github-actions github-actions bot added core Core DataFusion crate physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt) labels May 30, 2023
0 1 25
1 2 50
1 3 75
0 0 24
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since ordering requirement column a is inside the group by expressions. Each group consists of single value for column a. Hence previous result, and this result are both correct. However, with the changes in this PR, result changes.

0 1 49
1 2 74
1 3 99
0 0 0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar things with above test apply to this test case also.

/// Reverses the ORDER BY expression, which is useful during equivalent window
/// expression construction. For instance, 'ORDER BY a ASC, NULLS LAST' turns into
/// 'ORDER BY a DESC, NULLS FIRST'.
pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr]) -> Vec<PhysicalSortExpr> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverse_order_bys is moved under sort_expr.rs to be able to use it in src/physical_plan/aggregates/mod.rs. With this change scope of the function is increased.

@alamb
Copy link
Contributor

alamb commented May 30, 2023

I plan to review this PR carefully tomorrow

mustafasrepo and others added 2 commits May 31, 2023 14:08
# Conflicts:
#	datafusion/physical-expr/src/lib.rs
#	datafusion/physical-expr/src/sort_expr.rs
@alamb
Copy link
Contributor

alamb commented May 31, 2023

in terms of behavior. Hence by converting query above to its equivalent form below

SELECT country, LAST_VALUE(amount ORDER BY amount DESC) AS fv1,
  LAST_VALUE(amount ORDER BY amount DESC) AS fv2,
  ARRAY_AGG(amount ORDER BY amount ASC) AS amounts
  FROM sales_global
  GROUP BY country

Did you mean like this (change LAST_VALUE to FIRST_VALUE and switch the order):

SELECT country, LAST_VALUE(amount ORDER BY amount DESC) AS fv1,
  FIRST_VALUE(amount ORDER BY amount ASC) AS fv2,
  ARRAY_AGG(amount ORDER BY amount ASC) AS amounts
  FROM sales_global
  GROUP BY country

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mustafasrepo -- I have a question about the plans but I think it is just my own misunderstanding

datafusion/core/src/physical_plan/aggregates/mod.rs Outdated Show resolved Hide resolved
----TableScan: sales_global projection=[country, amount]
physical_plan
ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2]
--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this plan -- if the sort is by amount1 DESC then isn't

I am expecting that these are the same:

  FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
  LAST_VALUE(amount ORDER BY amount DESC) AS fv2

So when the orderby is rewritten to be ORDER BY amount1 DESC I expect to see that

  LAST_VALUE(amount ORDER BY amount DESC) AS fv1, -- this got switched
  LAST_VALUE(amount ORDER BY amount DESC) AS fv2

However, the query seems to get the right answer, so I must be missing something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AggregateExpr has a fn name(&self) -> &str method. This method returns string representation of the expression. This is given during initialization from outside(It is not calculated according to function and arguments). During reversal we use the name as is. However, it is misleading, as you observed (even though behind the scene appropriate version is working). However, I couldn't find an easy fix for this problem

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I wonder if during rewrite we could rewrite the value of name() somehow

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can replace FIRST_VALUE with LAST_VALUE vice versa inside the string. This is not a nice solution, but this would solve our problem, what do you think about it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, we can rewrite name similar to the version that is given during initialization. This way we may not overfit on string convention. I will experiment with rewriting the value of name(), and let you know about my observations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb I have changed the name implementation. It is now calculated inside the aggregator, it is not given from outside during initialization. The display of the AggregateExec changed from
--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] to
--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(amount@1), LAST_VALUE(amount@1)]

Since argument expressions are Arc<dyn PhysicalExpr>, output display of the argument is changed from sales_global.amount to amount@1 for FIRST_VALUE AND LAST_VALUE.

This change, fortunately lead me to recognize a bug during order-sensitive aggregation when partition number is greater than 1.
In multi partition cases, it is not guaranteed that Final or FinalPartitioned stages in the AggregateExec to receive data in-order.
Hence I constrained order-sensitive aggregation to single partition cases.
I will add support for order-sensitive aggregation at multiple partitions, in following PRs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we have two options:

  1. Simple string replacement of FIRST_VALUE to LAST_VALUE (and vice versa) and order direction when reversal happens.
  2. Generating name with the new mechanism @mustafasrepo experimented with.

The advantage of (1) is that it preserves a uniform display format. Its disadvantage is that this is a specialized solution to FIRST_VALUE/LAST_VALUE and may not generalize to other functions in the future.

The advantage of (2) is that is generalizes, but it has the drawback that we end up with an intermixed format; i.e. we have entries like ARRAY_AGG(sales_global.amount) along with entries like LAST_VALUE(amount@1).

Let's choose one for now and create an issue to fix this in a more general fashion in the future. I do not have a preference at this point -- @alamb, which one do you prefer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have any preference -- they both sound fine to me

datafusion/core/tests/sqllogictests/test_files/groupby.slt Outdated Show resolved Hide resolved
datafusion/physical-expr/src/sort_expr.rs Show resolved Hide resolved
# Please enter a commit message to explain why this merge is necessary,
# especially if it merges an updated upstream into a topic branch.
#
# Lines starting with '#' will be ignored, and an empty message aborts
# the commit.
@ozankabak
Copy link
Contributor

This LGTM, feel free to merge

@mustafasrepo mustafasrepo merged commit 5ddcbc4 into apache:main Jun 3, 2023
@berkaysynnada berkaysynnada deleted the feature/first_last_conversion branch June 20, 2023 07:12
@berkaysynnada berkaysynnada restored the feature/first_last_conversion branch June 20, 2023 07:12
@berkaysynnada berkaysynnada deleted the feature/first_last_conversion branch June 20, 2023 07:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants