Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations #5171

Merged
merged 35 commits into from
Feb 8, 2023

Conversation

mustafasrepo
Copy link
Contributor

Which issue does this PR close?

Closes #5100.

Rationale for this change

We want to have the ability to leverage parallelism in sorts when appropriate, which requires EnforceSorting to be partition aware. We also want to further decouple EnforceSorting and EnforceDistribution rules, with the end goal being complete orthogonality. Currently, there is coupling in both directions: In order to produce valid plans, one needs to apply the EnforceDistribution rule before applying EnforceSorting, and the EnforceDistribution breaks sorting requirements and produces invalid plans on its own.

With this PR, coupling is broken in one direction (EnforceSorting does not require EnforceDistribution to work correctly anymore), but not the other way yet (EnforceDistribution may still invalidate sort correctness, so one still needs to call EnforceSorting after that). Note that it is on our TODO list to fix the other direction too.

What changes are included in this PR?

This PR adds a couple of functionalities to the EnforceSorting rule.

  • Adds parallelize option to theEnforceSorting rule. If this flag is enabled. Physical plans of the form

    "SortExec: [t1_id@0 ASC NULLS LAST]",
    "  CoalescePartitionsExec",
    "    RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",

    will turn into the plan below:

    "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
    "  SortExec: [t1_id@0 ASC NULLS LAST]",
    "    RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",

    This increases speed in multi-threaded environments. You can see time comparisons below:

    n_row distinct batch_size Repartition Mode(8) Rule On (mean) Rule On(Median) Rule Off (mean) Rule Off(Median)
    1000 100 100 RoundRobin 3.015032ms 2.988125ms 5.213412ms 4.515166ms
    1000 100 100 Hash 6.00907ms 5.895166ms 10.636287ms 10.3025ms
    10_000 100 100 RoundRobin 24.429487ms 24.042291ms 84.282599ms 84.117083ms
    10_000 100 100 Hash 48.779278ms 48.970708ms 517.195637ms 520.449625ms
    20_000 100 100 RoundRobin 47.297341ms 47.141041ms 289.02502ms 290.58625ms
    20_000 100 100 Hash 114.230287ms 107.842291ms 1.992879062s 2.002725333s
    50_000 100 100 RoundRobin 117.185937ms 115.300583ms 1.604551912s 1.609479958s
    50_000 100 100 Hash 425.755595ms 430.346625ms 12.683566324s 12.502681458s
  • Issue #5100 is closed, fixing the limit bug.

  • The EnforceSorting rule now also considers SortPreservingMergeExecs when performing optimizations. This enables us to optimize more queries.

  • We handle cases where a UnionExec partially maintains the ordering of some of its children, such as the case below:

    "  UnionExec",
    "    SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
    "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
    "    ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",

    In this case, output ordering of UnionExec is nullable_col@0 ASC. It maintains the ordering of its second child and partially maintains the ordering of its first child. The EnforceSorting rule now considers such partially-order-maintaining paths during optimization.

  • In order to handle multi-children operators like UnionExecs properly, the assumption that ordering only comes from a single path in the physical plan tree is removed. Now ordering can come from multiple paths.

Are these changes tested?

We have added multiple tests to cover above mentioned functionalities. Approximately 550 lines of the changes come from test or test utils.

Are there any user-facing changes?

No.

@alamb
Copy link
Contributor

alamb commented Feb 3, 2023

I hope to review this PR carefully today

@alamb
Copy link
Contributor

alamb commented Feb 5, 2023

I plan to review this PR carefully tomorrow

@alamb
Copy link
Contributor

alamb commented Feb 6, 2023

I am now actually reviewing this PR

@mingmwang
Copy link
Contributor

@mingmwang, I don't follow why you think there is a a bug in this example. The query does not specify a top level ordering, so limiting the top or bottom 5 are both fair game from a query perspective. Looking at it from a logical vs. physical plan perspective, I still do not see a problem. Order annotations in logical plan aggregations are local to them, they don't have a bearing on output ordering AFAIK. These annotations specify the frame, but that's where their scope ends. If they had any bearing on output ordering, they would have the same effect at the query level too, but they don't.

The query does not specify a top level ordering explicitly, but the final result ordering is impacted by the ordering expressions in WindowExecs, I think with/without the (reverse window expr) optimization, this SQL will give different results, this is something we should avoid.

@ozankabak
Copy link
Contributor

ozankabak commented Feb 7, 2023

I don't think this is a bug. Let's think about the converse scenario: The non-optimized query could have produced the other order (which would be valid), or the user could have changed the order of columns, and in that case we would have the illusion of preserving "the order" during optimization. In general, whenever there are multiple possibilities for what constitutes a valid query, there will always be some configurations where non-optimized plans and optimized plans differ (or agree) in under-constrained aspects.

At the end of day, the optimizer's prime job is to end up with more efficient plans that obey the specification, not to conform to arbitrary behaviors of the non-optimized plan. In this case, there is simply no order in the specification, so I don't see a bug. The result is indeed correct.

This being said, I think I understand the general suggestion you are making: In my words, I would put it this way: When there are multiple equivalent optimizations, it is a good idea to choose the one that resembles the non-optimized query the most. I agree with this, and making progress towards this desideratum in follow-ons, refactors etc. would be very nice.

@alamb
Copy link
Contributor

alamb commented Feb 7, 2023

 SELECT
    c9,
    SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
    SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
    FROM aggregate_test_100
    LIMIT 5

I think the output of this query is "implementation defined" according to the SQL spec -- it is correct to produce any arbitrary 5 rows out of the window calculation because there is no ORDER BY on the outer query. If the user wants a specific set of rows they need to add an explicit ORDER BY.

 SELECT
    c9,
    SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
    SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
    FROM aggregate_test_100
    ORDER BY c9 ASC -- NEEDED to make this query deterministic, assuming c9 has sufficient distinct values
    LIMIT 5

This is a pretty bad UX choice on the part of SQL in my opinion, and causes all sorts of confusion (e.g. when query output changes from run to run as noted by @mingmwang) but that is my understanding of what the SQL spec says.

I don't think this is a bug.

I agree. I think it is acceptable for the optimizer to rewrite the plan so the output happens to be different

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me -- thank you @mustafasrepo and @ozankabak

I think we should wait for an OK from @mingmwang prior to merging this but I think it is ready to go from my perspective

@@ -290,6 +290,17 @@ config_namespace! {
/// functions in parallel using the provided `target_partitions` level"
pub repartition_windows: bool, default = true

/// Should DataFusion parallelize Sort during physical plan creation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling this option repartition_sorts is probably more consistent with repartition_windows, repartition_joins, etc -- however, I think we can rename it in a follow on PR as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, just sent a commit with this renaming. There seems to be an intermittent CI failure, so if you can kick it off again when you get the chance that'd be great.

"SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
" ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as rn1]",
" BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
" SortExec: [c1@0 ASC NULLS LAST]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a pretty clever plan (to repartition first, then do the sort in parallel, and then merge the results at the end) 👍

" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }]",
" SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this double repartition still looks strange to me, but I understand it was not introduced by this PR

Copy link
Contributor

@ozankabak ozankabak Feb 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is an unrelated behavior which only makes sense when cost of hashing is significant. It is on our roadmap to make EnforceDistribution smarter, maybe we can touch on this within that scope and make a single multi-threaded hash repartitioner that achieves the same purpose.

Comment on lines +2527 to +2529
" GlobalLimitExec: skip=0, fetch=1",
" SortExec: [c13@0 ASC NULLS LAST]",
" ProjectionExec: expr=[c13@0 as c13]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to fix it in this PR, but this plan could be a lot better (it should be using a limit I think in the sort)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, let's do this in a follow-on PR!

@alamb
Copy link
Contributor

alamb commented Feb 7, 2023

Regarding the rule applying ordering, since DataFusion optimization framework is still a traditional heuristic style framework, the rule applying orders always matter, we can not assume one rule can work independently without the others.

Specifically, the EnforceDistribution rule is responsible for handling the global distribution requirements.
And EnforceSorting rule is responsible for handling the local sort requirements. It's also responsible for removing
unnecessary global sort and local sort. The global distribution requirements need to be handled first, after that we can handle the local sort(inner-partition) requirements.

Thank you @mingmwang -- I think part of what is confusing here is that two different things are happening as "optimization" passes.

  1. "Fixing up the plan for correctness" (aka "EnforceSorting"), which I think is a very similar at a high level to what the TypeCoercion logical optimizer rule does (coerces types in expressions so they are compatible even if that was not the case in the input plan)
  2. "Keep the same semantics of the plan, but rewrite it for better performance" (aka GlobalSortSelection / OptimizeSorts)

I think @liukun4515 helped the logical optimizer greatly by identifying this difference, and pulling all the type coercion to the beginning of the optimizer passes (source link). We probably could have gone farther and made it clear that the TypeCoercion pass is not an optimizer but rather required for correctness.

Maybe such clarity in this case could help too

@ozankabak
Copy link
Contributor

ozankabak commented Feb 7, 2023

I think it makes sense to explain the evolution of this so that everybody has a richer context. We actually started with two rules, one enforcement rule and one optimization rule (remember optimize_sorts?). As we worked on that design over time, the optimization rule grew to carry out the following tasks:

  • Optimization rule sometimes removes sorts (only).
  • Optimization rule sometimes moves a sort somewhere else.
  • Optimization rule sometimes ends up removing multiple sorts but doing so requires adding fewer sorts elsewhere.

Here, you see examples of a general pattern where there could be N additions and M removals. Actually, simple enforcement corresponds to the case M = 0, and the optimization rule can end up with any of the remaining cases. Since doing the latter requires almost 95% of the code of a general any N/any M solution, we ended up merging the rules. This greatly reduced code duplication and IMO made the rule easier to use.

All in all, I would agree with @alamb from a separation-of-concern standpoint in general, but in this case, this experience/pattern seems to whisper to us that a joint design is a better fit (for the specifics of this case).

Obviously, this is only a deduction from the experience so far. A refactor PR that simplifies the code using a non-joint approach while still passing all the unit tests can easily prove me wrong 🤣

@alamb
Copy link
Contributor

alamb commented Feb 7, 2023

Obviously, this is only a deduction from the experience so far. A refactor PR that simplifies the code using a non-joint approach while still passing all the unit tests can easily prove me wrong 🤣

Yes I agree that having an existing implementation and test suite is ideal -- if / when we find better ways to express the algorithm that still does the same behavior, we can always update the code to be more beautiful

@yahoNanJing
Copy link
Contributor

Thanks @mustafasrepo for this impressive improvement. I'm also very interested in this part and will review this PR carefully today.

@yahoNanJing
Copy link
Contributor

I think this is because that global sort + CoalescePartitionsExec were added later by the two enforcement rules.
An easy way to get ride from this is to run the GlobalSortSelection rule again after the two enforcement rules. I would prefer still let the GlobalSortSelection rule handle this optimization. Need to be enhance GlobalSortSelection rule to handle the SortExec + CoalescePartitionsExec combination.

If we end up handling this combination there, and running it twice; it really diminishes the value of that approach. Maybe there is a way to do it elegantly, I will think about it in detail. If we (or you) can figure out a way to do this elegantly, we can go back to this approach; but for now, it doesn't look too good to me.

Another approach I can think is maybe we can have a specific handling in EnforceDistribution rule, if the plan 's distribution requirement is Distribution::SinglePartition and the plan also has some sorting requirements, add the prefer-parallel-sort configuration is on, add SortPreservingMergeExec + SortExec. If the SortExec is unnecessary, it will be removed later by the EnforceSorting rule

I think this is interesting and sounds more promising to me. I will think about this today, maybe we can do this in a follow-on PR.

I also prefer the second approach to handle the choice between "global sort + CoalescePartitionsExec" and "SortPreservingMergeExec + local SortExec" in EnforceDistribution.

@yahoNanJing
Copy link
Contributor

yahoNanJing commented Feb 8, 2023

I don't think this is a bug.

I also agree. However, we may need to refine the UT to handle this non-deterministic ordering.

@mingmwang
Copy link
Contributor

I don't think this is a bug.

I also agree. However, we may need to refine the UT to handle this non-deterministic ordering.

If you guys all agree this is not a bug, I accept this.

Comment on lines +506 to +523
if !sort_exec.preserve_partitioning()
&& sort_input.output_partitioning().partition_count() > 1
{
// Replace the sort with a sort-preserving merge:
let new_plan: Arc<dyn ExecutionPlan> =
Arc::new(SortPreservingMergeExec::new(
sort_exec.expr().to_vec(),
sort_input,
));
let new_tree = ExecTree {
idx: 0,
plan: new_plan.clone(),
children: sort_onwards.iter().flat_map(|e| e.clone()).collect(),
};
PlanWithCorrespondingSort {
plan: new_plan,
sort_onwards: vec![Some(new_tree)],
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please explain the purpose of these lines of code ? The original code just remove the SortExec. Looks like the new change try to handle a case that the current SortExec is a global Sort and the sort_input is actually local Sort, instead of removing the global Sort, replace it with a SortPreservingMergeExec. But I think we should not see such physical plan tree. This is because for a global Sort, after EnforceDistribution rule, a CoalescePartitionsExec will be added as the input of that global Sort, and CoalescePartitionsExec can not propagate any sort properties. the ordering_satisfy check will become false. So I think we do not need specific handling for global Sort here. Please correct me if I am wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, However, we want to decrease rule dependency as much as possible. With this check, EnforceSorting rule doesn't require EnforceDistribution rule to be called before it.

Comment on lines +668 to +679
let coalesce_input = coalesce_onwards.plan.children()[0].clone();
if let Some(sort_exec) = sort_exec {
let sort_expr = sort_exec.expr();
if !ordering_satisfy(
coalesce_input.output_ordering(),
Some(sort_expr),
|| coalesce_input.equivalence_properties(),
) {
return add_sort_above_child(&coalesce_input, sort_expr.to_vec());
}
}
coalesce_input
Copy link
Contributor

@mingmwang mingmwang Feb 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe most of the time the global SortExec + CoalescePartitionsExec are adjacent nodes. But is it possible that there will be some Projections between the SortExec and CoalescePartitionsExec which will make the SortExec has totally different exprs/columns with the CoalescePartitionsExec's input plan ? If that is the case, ordering_satisfy will return false definitely and add SortExec on top of the coalesce_input will generate a invalid plan.

@mingmwang
Copy link
Contributor

Overall the PR LGTM and I think it is good to merge. I will need more time to take a more closer look but I think it is good to go since there are enough UTs to verify all the changes. Some review comments can be addressed in following PRs for the global sort related enhancement.

@mustafasrepo @ozankabak Thanks a lot!

@ozankabak
Copy link
Contributor

ozankabak commented Feb 8, 2023

Thanks for all the detailed reviews everybody, we really appreciate it! The code has substantially improved thanks to you all. Rest assured we will be maintaining this and fix any bugs should we discover them in the future.

We will also take care of the follow-ons identified during the review.

@alamb, from our perspective, this is ready to go. Feel free to merge when you feel appropriate.

Edit: Couldn't resist sending one last minor commit to remove two unnecessary clones 🙂 , can merge after CI.

@alamb alamb merged commit dee9fd7 into apache:master Feb 8, 2023
@alamb
Copy link
Contributor

alamb commented Feb 8, 2023

Thanks all -- this was a great and epic collaboration. I love it. Onwards and upwards

@ursabot
Copy link

ursabot commented Feb 8, 2023

Benchmark runs are scheduled for baseline = e166500 and contender = dee9fd7. dee9fd7 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@mustafasrepo mustafasrepo deleted the feature/union_exec_handling branch February 10, 2023 06:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Sort operator disappear in physical_plan
6 participants