-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations #5171
Conversation
I hope to review this PR carefully today |
I plan to review this PR carefully tomorrow |
I am now actually reviewing this PR |
The query does not specify a top level ordering explicitly, but the final result ordering is impacted by the ordering expressions in WindowExecs, I think with/without the (reverse window expr) optimization, this SQL will give different results, this is something we should avoid. |
I don't think this is a bug. Let's think about the converse scenario: The non-optimized query could have produced the other order (which would be valid), or the user could have changed the order of columns, and in that case we would have the illusion of preserving "the order" during optimization. In general, whenever there are multiple possibilities for what constitutes a valid query, there will always be some configurations where non-optimized plans and optimized plans differ (or agree) in under-constrained aspects. At the end of day, the optimizer's prime job is to end up with more efficient plans that obey the specification, not to conform to arbitrary behaviors of the non-optimized plan. In this case, there is simply no order in the specification, so I don't see a bug. The result is indeed correct. This being said, I think I understand the general suggestion you are making: In my words, I would put it this way: When there are multiple equivalent optimizations, it is a good idea to choose the one that resembles the non-optimized query the most. I agree with this, and making progress towards this desideratum in follow-ons, refactors etc. would be very nice. |
SELECT
c9,
SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
FROM aggregate_test_100
LIMIT 5 I think the output of this query is "implementation defined" according to the SQL spec -- it is correct to produce any arbitrary 5 rows out of the window calculation because there is no SELECT
c9,
SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
FROM aggregate_test_100
ORDER BY c9 ASC -- NEEDED to make this query deterministic, assuming c9 has sufficient distinct values
LIMIT 5 This is a pretty bad UX choice on the part of SQL in my opinion, and causes all sorts of confusion (e.g. when query output changes from run to run as noted by @mingmwang) but that is my understanding of what the SQL spec says.
I agree. I think it is acceptable for the optimizer to rewrite the plan so the output happens to be different |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me -- thank you @mustafasrepo and @ozankabak
I think we should wait for an OK from @mingmwang prior to merging this but I think it is ready to go from my perspective
datafusion/common/src/config.rs
Outdated
@@ -290,6 +290,17 @@ config_namespace! { | |||
/// functions in parallel using the provided `target_partitions` level" | |||
pub repartition_windows: bool, default = true | |||
|
|||
/// Should DataFusion parallelize Sort during physical plan creation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling this option repartition_sorts
is probably more consistent with repartition_windows
, repartition_joins
, etc -- however, I think we can rename it in a follow on PR as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, just sent a commit with this renaming. There seems to be an intermittent CI failure, so if you can kick it off again when you get the chance that'd be great.
"SortPreservingMergeExec: [c1@0 ASC NULLS LAST]", | ||
" ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as rn1]", | ||
" BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]", | ||
" SortExec: [c1@0 ASC NULLS LAST]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a pretty clever plan (to repartition first, then do the sort in parallel, and then merge the results at the end) 👍
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }]", | ||
" SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]", | ||
" CoalesceBatchesExec: target_batch_size=8192", | ||
" RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this double repartition still looks strange to me, but I understand it was not introduced by this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is an unrelated behavior which only makes sense when cost of hashing is significant. It is on our roadmap to make EnforceDistribution
smarter, maybe we can touch on this within that scope and make a single multi-threaded hash repartitioner that achieves the same purpose.
" GlobalLimitExec: skip=0, fetch=1", | ||
" SortExec: [c13@0 ASC NULLS LAST]", | ||
" ProjectionExec: expr=[c13@0 as c13]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to fix it in this PR, but this plan could be a lot better (it should be using a limit I think in the sort)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, let's do this in a follow-on PR!
Thank you @mingmwang -- I think part of what is confusing here is that two different things are happening as "optimization" passes.
I think @liukun4515 helped the logical optimizer greatly by identifying this difference, and pulling all the type coercion to the beginning of the optimizer passes (source link). We probably could have gone farther and made it clear that the TypeCoercion pass is not an optimizer but rather required for correctness. Maybe such clarity in this case could help too |
I think it makes sense to explain the evolution of this so that everybody has a richer context. We actually started with two rules, one enforcement rule and one optimization rule (remember
Here, you see examples of a general pattern where there could be All in all, I would agree with @alamb from a separation-of-concern standpoint in general, but in this case, this experience/pattern seems to whisper to us that a joint design is a better fit (for the specifics of this case). Obviously, this is only a deduction from the experience so far. A refactor PR that simplifies the code using a non-joint approach while still passing all the unit tests can easily prove me wrong 🤣 |
Yes I agree that having an existing implementation and test suite is ideal -- if / when we find better ways to express the algorithm that still does the same behavior, we can always update the code to be more beautiful |
Thanks @mustafasrepo for this impressive improvement. I'm also very interested in this part and will review this PR carefully today. |
I also prefer the second approach to handle the choice between "global sort + CoalescePartitionsExec" and "SortPreservingMergeExec + local SortExec" in EnforceDistribution. |
I also agree. However, we may need to refine the UT to handle this non-deterministic ordering. |
If you guys all agree this is not a bug, I accept this. |
if !sort_exec.preserve_partitioning() | ||
&& sort_input.output_partitioning().partition_count() > 1 | ||
{ | ||
// Replace the sort with a sort-preserving merge: | ||
let new_plan: Arc<dyn ExecutionPlan> = | ||
Arc::new(SortPreservingMergeExec::new( | ||
sort_exec.expr().to_vec(), | ||
sort_input, | ||
)); | ||
let new_tree = ExecTree { | ||
idx: 0, | ||
plan: new_plan.clone(), | ||
children: sort_onwards.iter().flat_map(|e| e.clone()).collect(), | ||
}; | ||
PlanWithCorrespondingSort { | ||
plan: new_plan, | ||
sort_onwards: vec![Some(new_tree)], | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please explain the purpose of these lines of code ? The original code just remove the SortExec
. Looks like the new change try to handle a case that the current SortExec
is a global Sort and the sort_input is actually local Sort
, instead of removing the global Sort
, replace it with a SortPreservingMergeExec
. But I think we should not see such physical plan tree. This is because for a global Sort, after EnforceDistribution
rule, a CoalescePartitionsExec
will be added as the input of that global Sort, and CoalescePartitionsExec
can not propagate any sort properties. the ordering_satisfy
check will become false. So I think we do not need specific handling for global Sort here. Please correct me if I am wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, However, we want to decrease rule dependency as much as possible. With this check, EnforceSorting
rule doesn't require EnforceDistribution
rule to be called before it.
let coalesce_input = coalesce_onwards.plan.children()[0].clone(); | ||
if let Some(sort_exec) = sort_exec { | ||
let sort_expr = sort_exec.expr(); | ||
if !ordering_satisfy( | ||
coalesce_input.output_ordering(), | ||
Some(sort_expr), | ||
|| coalesce_input.equivalence_properties(), | ||
) { | ||
return add_sort_above_child(&coalesce_input, sort_expr.to_vec()); | ||
} | ||
} | ||
coalesce_input |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe most of the time the global SortExec
+ CoalescePartitionsExec
are adjacent nodes. But is it possible that there will be some Projections between the SortExec
and CoalescePartitionsExec
which will make the SortExec has totally different exprs/columns with the CoalescePartitionsExec's
input plan ? If that is the case, ordering_satisfy
will return false definitely and add SortExec
on top of the coalesce_input
will generate a invalid plan.
Overall the PR LGTM and I think it is good to merge. I will need more time to take a more closer look but I think it is good to go since there are enough UTs to verify all the changes. Some review comments can be addressed in following PRs for the global sort related enhancement. @mustafasrepo @ozankabak Thanks a lot! |
Thanks for all the detailed reviews everybody, we really appreciate it! The code has substantially improved thanks to you all. Rest assured we will be maintaining this and fix any bugs should we discover them in the future. We will also take care of the follow-ons identified during the review. @alamb, from our perspective, this is ready to go. Feel free to merge when you feel appropriate. Edit: Couldn't resist sending one last minor commit to remove two unnecessary clones 🙂 , can merge after CI. |
Thanks all -- this was a great and epic collaboration. I love it. Onwards and upwards |
Benchmark runs are scheduled for baseline = e166500 and contender = dee9fd7. dee9fd7 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #5100.
Rationale for this change
We want to have the ability to leverage parallelism in sorts when appropriate, which requires
EnforceSorting
to be partition aware. We also want to further decoupleEnforceSorting
andEnforceDistribution
rules, with the end goal being complete orthogonality. Currently, there is coupling in both directions: In order to produce valid plans, one needs to apply theEnforceDistribution
rule before applyingEnforceSorting
, and theEnforceDistribution
breaks sorting requirements and produces invalid plans on its own.With this PR, coupling is broken in one direction (
EnforceSorting
does not requireEnforceDistribution
to work correctly anymore), but not the other way yet (EnforceDistribution
may still invalidate sort correctness, so one still needs to callEnforceSorting
after that). Note that it is on our TODO list to fix the other direction too.What changes are included in this PR?
This PR adds a couple of functionalities to the
EnforceSorting
rule.Adds parallelize option to the
EnforceSorting
rule. If this flag is enabled. Physical plans of the formwill turn into the plan below:
This increases speed in multi-threaded environments. You can see time comparisons below:
Issue #5100 is closed, fixing the limit bug.
The
EnforceSorting
rule now also considersSortPreservingMergeExec
s when performing optimizations. This enables us to optimize more queries.We handle cases where a
UnionExec
partially maintains the ordering of some of its children, such as the case below:In this case, output ordering of
UnionExec
isnullable_col@0 ASC
. It maintains the ordering of its second child and partially maintains the ordering of its first child. TheEnforceSorting
rule now considers such partially-order-maintaining paths during optimization.In order to handle multi-children operators like
UnionExec
s properly, the assumption that ordering only comes from a single path in the physical plan tree is removed. Now ordering can come from multiple paths.Are these changes tested?
We have added multiple tests to cover above mentioned functionalities. Approximately 550 lines of the changes come from test or test utils.
Are there any user-facing changes?
No.