Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multiple partitions with SortExec (#362) #378

Merged
merged 2 commits into from
May 25, 2021

Conversation

tustvold
Copy link
Contributor

@tustvold tustvold commented May 21, 2021

Which issue does this PR close?

re #362

Rationale for this change

Once an order preserving merge operator is added as part of #362 it will be possible to combine multiple sorted partitions together into a single partition - effectively yielding partitioned sort. Loosening the restriction on SortExec to a single partition allows it to form the sort part of this.

What changes are included in this PR?

SortExec is no longer restricted to a single partition, instead preserving the partitioning of its inputs

@tustvold tustvold changed the title Add support for multiple partitions with SortExec Add support for multiple partitions with SortExec (#362) May 21, 2021
@Dandandan
Copy link
Contributor

Dandandan commented May 21, 2021

Currently sort needs a single partition as otherwise the partitions are not sorted. A mergeexec currently is added based on this requirement.

So this won't work I think untill we have the implementation to merge the sorted partitions which you are working on in #379

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @Dandandan pointed out, at the moment there is an assumption in the planner that a SortExec produces a single sorted partition as the output. When SortExec reports that it requires a single stream for its input, part of the planner puts a MergeExec (confusing name, I know) before to get a single stream. This is why the unit tests are failing on this PR (b/c the output order is wrong in some queries)

Perhaps one way to allow a SortExec to run on multiple partitions would be to pass in the desired output partitioning to the SortExec constructor and then use that request to decide what the input to the SortExec should be

@tustvold
Copy link
Contributor Author

I've added a new constructor that allows opting into the new behaviour. I wasn't aware of the way that MergeExec is plumbed into the plans and that this would break it.

I do wonder if instead of relying on an AddMergeExec optimisation pass, the plan conversion from LogicalPlan::Sort should just inspect the input partitioning and add the Merge if necessary. After all, it already has to inspect the partitioning for operators such as LogicalPlan::Limit, and so not just generating a valid plan from the outset seems a touch surprising to me...

@Dandandan
Copy link
Contributor

Dandandan commented May 24, 2021

I've added a new constructor that allows opting into the new behaviour. I wasn't aware of the way that MergeExec is plumbed into the plans and that this would break it.

I do wonder if instead of relying on an AddMergeExec optimisation pass, the plan conversion from LogicalPlan::Sort should just inspect the input partitioning and add the Merge if necessary. After all, it already has to inspect the partitioning for operators such as LogicalPlan::Limit, and so not just generating a valid plan from the outset seems a touch surprising to me...

I think you are right about that, we should not rely on the optimizer to make the execution plan correct. I think it would be better if the planner adds the MergeExecs for the appropriate nodes.

@tustvold
Copy link
Contributor Author

tustvold commented May 24, 2021

I did some more digging into this and created #412 to track the fact that PhysicalPlanner currently creates plans that are incorrect.

However, I think the issue is actually a bit more subtle than I first realised. Currently Repartition may insert RepartitionExec between an operator and its children, provided that operator doesn't require a single partition. It is then reliant on a later optimisation pass with AddMergeExec to join together the partitions if a operator further up the tree requires it.

This means that the operators inserted by PhysicalPlan must somehow remember the partitioning they need to be correct, in order to prevent the optimiser from breaking them, simply adding MergeExec when generating the initial plan is insufficient.

There are a couple of ways this gets handled that I can see:

  • Limit has two separate operators - GlobalLimitExec and LocalLimitExec
  • HashAggregateExec has an AggregateMode enumeration

I therefore think the addition of a preserve_partitioning flag to SortExec is necessary and has precedent.

However, it is unfortunately insufficient as nothing prevents Repartition from repartitioning a sorted partition (I think this might be an issue more generally). I need to think on this more, perhaps as @alamb mentioned on #379 there needs to be a concept of sorted-ness introduced for operators that optimisation passes such as Repartition and AddMergeExec would respect.

Going to mark this as a draft for now, as the above will have implications for what the best way forward for this is

@tustvold tustvold marked this pull request as draft May 24, 2021 16:19
@alamb
Copy link
Contributor

alamb commented May 24, 2021

I think you are right about that, we should not rely on the optimizer to make the execution plan correct. I think it would be better if the planner adds the MergeExecs for the appropriate nodes.

I agree with @tustvold and @Dandandan on this -- I think the plan should generate correct results without requiring optimizer passes being run. The optimizer passes should just (potentially) make the plans faster.

I therefore think the addition of a preserve_partitioning flag to SortExec is necessary and has precedent.

I agree

Currently Repartition may insert RepartitionExec between an operator and its children, provided that operator doesn't require a single partition. It is then reliant on a later optimisation pass with AddMergeExec to join together the partitions if a operator further up the tree requires it.

Is there any reason we can't call AddMergeExec multiple times? Once (and always) as part of creating the physical plans and then potentially again as part of Repartition?

@tustvold
Copy link
Contributor Author

tustvold commented May 25, 2021

I think this is "as correct as current master" and therefore marking this as ready for review. It is impacted by #423 (the issue alluded to above r.e. the Repartition pass), however, so is current master, and so I think this is a separate issue that can be fixed independently of this.

@tustvold tustvold marked this pull request as ready for review May 25, 2021 15:36
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing I think that would make this PR better is tests, but I believe tests are added in #362 so I think we should merge this PR in as is.

@Dandandan any thoughts?

Copy link
Contributor

@Dandandan Dandandan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's OK when the tests follow later 👍 thanks @tustvold

@alamb alamb merged commit 3593d1f into apache:master May 25, 2021
jimexist pushed a commit to jimexist/arrow-datafusion that referenced this pull request May 26, 2021
…e#378)

* Add support for multiple partitions with SortExec

* make SortExec partitioning optional
@houqp houqp added datafusion Changes in the datafusion crate enhancement New feature or request labels Jul 29, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants