-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhance Enforce Dist capabilities to fix, sub optimal bad plans #7671
Conversation
# Conflicts: # datafusion/core/src/physical_optimizer/enforce_distribution.rs # datafusion/sqllogictest/test_files/groupby.slt
# Conflicts: # datafusion/core/src/datasource/listing/table.rs # datafusion/core/src/physical_optimizer/enforce_distribution.rs
# Conflicts: # datafusion/core/src/datasource/listing/table.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This addition is quite powerful, as well as the code LGTM. Thanks @mustafasrepo
# Conflicts: # datafusion/core/src/physical_optimizer/enforce_distribution.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @mustafasrepo -- I think this is a great change. I had some small comments about the tests but they could be fixed as a follow on PR if needed
The name GlobalRequirements
initially implied to me that it tracked something global to the entire plan, but upon reading more I realize it is modeling the requirements of the output / root of the plan. Maybe a name like OutputRequirements
or RootRequirements
might better capture the notion.
cc @devinjdangelo I think this might be a way to allow an insert / write plan to specify how it wants the input to be ordered and distributed .
This is somewhat related to a discussion we had on how a TableProvider
could signal to the rest of the plan how it wanted its input partitioned / sorted: #6339 and #424
/// Helper function that adds an ancillary `GlobalRequirementExec` to the given plan. | ||
/// First entry in the tuple is resulting plan, second entry indicates whether any | ||
/// `GlobalRequirementExec` is added to the plan. | ||
fn require_top_ordering_helper( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this is basically trying to reverse engineer the top level requirements based on the pattern of the plan.
Given GlobalRequirementExec
can specify plan output requirements, I wonder if it would make more sense to directly create GlobalRequirementExec
as pat of the initial physical plan, for example in https://github.com/apache/arrow-datafusion/blob/7b12666ec87ea741c3f5b56ddf1647f6d794f9e3/datafusion/core/src/physical_planner.rs#L536
That would also offer a nice way to control how inputs to write plans might look 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an option also. I think we can add this during initial plan, then in a single pass remove this operator at the end of the optimizer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this design. I think there are a couple of reason why current design is better
- Currently
GlobalRequirementExec
is not necessarily the top executor in the physical plan. It is the executor above that defines output ordering.
Such as plan below
ProjectionExec
OutputRequirementExec
SortExec (specifies output ordering)
The reason for this strategy is that putting OutputRequirementExec
at the top has couple of problems.
Consider alternative strategy where plan above turns the following
OutputRequirementExec
ProjectionExec
SortExec (specifies output ordering)
In this new version OutputRequirementExec
may not have the necessary column to satisfy ordering desired at the output. Ordering should be satisfied before projection(These column may get lost during projection).
Consider another plan
WindowExec(OVER() sum())
Source (have an ordering)
If we were to require output ordering for it above plan would turn to
OutputRequirementExec
WindowExec(OVER() sum())
Source (have an ordering)
However, output ordering in the plan above is accidental. It is not a requirement by the query. And planner should be free to mess this output ordering, if if is helpful.
In short output ordering requirement only when query contains ORDER BY
clause at the end.
In this light, another strategy might be during plan creation we can insert OutputRequirementExec
when we see LogicalPlan::Sort
.
However, in following kind of queries
SELECT a, b
FROM (
SELECT a, b
FROM table1
ORDER BY a
)
ORDER BY b
This would introduce unnecessary requirement for the subquery.
Hence to require only appropriate absolutely necessary output ordering, we need to traverse physical plan from top to bottom, as long as ordering is maintained. Then put the OutputRequirementExec
on top of it (this is the current approach). For this we need to have a top down pass on the physical plan. This can be done in create_initial_plan
stage also. After initial physical plan is created. However, current implementation generally insert corresponding operator(s) for the logicalPlan
node. Hence it is not easy to integrate top-down traversal code here.
However, I think we can move OutputRequirementExec insertion code on top of here.
https://github.com/apache/arrow-datafusion/blob/7b12666ec87ea741c3f5b56ddf1647f6d794f9e3/datafusion/core/src/physical_planner.rs#L450
After initial plan is created. What do you think, about this place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the writeup and explanation @mustafasrepo -- I agree with your examples. Maybe the core problem is trying to figure out what the "required ordering" from the ExecutionPlan
where it could be simply an accident of the output of the plan, as in your example
WindowExec(OVER() sum())
Source (have an ordering)
I think the required output order can probably be calculated from the logical plan, so maybe we should move it there. 🤔
However, I think we can move OutputRequirementExec insertion code on top of here.
That makes sense to me. Shall I try it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the required output order can probably be calculated from the logical plan, so maybe we should move it there. 🤔
If we can reliably get the requirements at that stage, this should work too. Feel free to try and we can see if it helps simplify the code
@@ -3302,6 +3278,12 @@ mod tests { | |||
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]", | |||
]; | |||
assert_optimized!(expected, exec, true); | |||
let expected = &[ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a regression -- the comments in the code say "the optimizer should not add a new sort exec" but then in this code the optimizer has added the new sort exec.
Maybe we need to update the test to add a GlobalRequirements
node so that the sorts are not added?
the same comments apply to the other tests in this file
# Conflicts: # datafusion/core/src/physical_optimizer/enforce_distribution.rs # datafusion/core/src/physical_optimizer/utils.rs
Update comments Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
I agree that, current name is a bit vague. Changed name to |
# Please enter a commit message to explain why this merge is necessary, # especially if it merges an updated upstream into a topic branch. # # Lines starting with '#' will be ignored, and an empty message aborts # the commit.
@@ -3541,6 +3595,12 @@ mod tests { | |||
]; | |||
|
|||
assert_optimized!(expected, plan.clone(), true); | |||
|
|||
let expected = &[ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This behavior change is a regression from IOx's perspective as the plan is now resorting data that is already sorted. I will work on creating a reproducer / fix next week
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This plan is generated when the bounded_order_preserving_variants
configuration flag is false
. When this flag is true
, we get the sort-free result. The prior behavior was basically ignoring/overriding the flag.
For some detailed context: The flag basically lets the user choose whether they want SortPreservingMerge
s, or Repartition
/Coalesce
+Sort
cascades. We ran some benchmarks and there is no clearly dominating strategy, each alternative comes out ahead in certain cases. In non-streaming cases, the first alternative typically came out ahead, so we let the default flag value to be false
.
Since we are a stream-first platform, we set the flag to true
at Synnada. Maybe IOx also wants to set this flag to true
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, thank you for the hint. I will give it a try
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update here is that this suggested worked well -- thank you. I find the naming of bounded_order_preserving_variants
to be confusing so I have proposed a new name here #7723
…he#7671) * Extend capabilities of enforcedist * Simplifications * Fix test * Do not use hard coded partition number * Add comments, Fix with_new_children of CustomPlan * Use sub-rule as separate rule. * Add unbounded method * Final review * Move util code to exectree file * Update variables and comments * Apply suggestions from code review Update comments Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * Address reviews * Add new tests, do not satisfy requirement if not absolutely necessary enforce dist --------- Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Which issue does this PR close?
Closes #.
Rationale for this change
Current EnforceDistribution rule produces optimal plans, when physical plan at its input is single partitioned plan (E.g physical plan doesn't contain any distribution changing operator). This assumption is valid for datafusion physical optimizer.
However, others may want to use this rule with their existing plans, that is already distributed.
In these cases,
EnforceDistribution
used to produce valid plans, generated plans sometimes were not optimal.In this PR we extend capabilities of the
EnforceDistribution
rule, so that even if existing plan is already multi-partitioned, physical plan produced byEnforceDistribution
is still optimal.What changes are included in this PR?
The approach to accomplish this as follows.
a ASC
ordering etc.).GlobalRequiringExec
) so that requirement of the query is not lost across rules.EnforceDistribution
rule, ignore-remove any distribution changing operator at its input.Are these changes tested?
Yes, new tests are added. To show that intentionally bad, multi-partitioned physical plans can be fixed by
EnforceDistribution
rule.Are there any user-facing changes?