Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EnforceSorting resorts the inout of UnionExec unnecessarily #4943

Closed
alamb opened this issue Jan 17, 2023 · 6 comments · Fixed by #4946
Closed

EnforceSorting resorts the inout of UnionExec unnecessarily #4943

alamb opened this issue Jan 17, 2023 · 6 comments · Fixed by #4946
Labels
bug Something isn't working

Comments

@alamb
Copy link
Contributor

alamb commented Jan 17, 2023

Describe the bug

Given the following input plan (I see this by enabling trace logging via RUST_LOG=trace:

SortExec: [tag@2 ASC NULLS LAST]
  ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time]
    DeduplicateExec: [tag@2 ASC,time@3 ASC]
      SortPreservingMergeExec: [tag@2 ASC,time@3 ASC]
        UnionExec
          ParquetExec: limit=None, partitions={1 group: [[d.parquet]]}, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time]
          SortExec: [tag@2 ASC,time@3 ASC]
            RecordBatchesExec: batches_groups=1 batches=1

Here is the input to enforce sorting:

Optimized physical plan by EnforceDistribution:
SortExec: [tag@2 ASC NULLS LAST]
  CoalescePartitionsExec
    ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time]
      RepartitionExec: partitioning=RoundRobinBatch(4)
        DeduplicateExec: [tag@2 ASC,time@3 ASC]
          SortPreservingMergeExec: [tag@2 ASC,time@3 ASC]
            UnionExec                                 <-- ** Note that the ParquetExec is already sorted correctly!
              ParquetExec: limit=None, partitions={1 group: [[d.parquet]]}, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time]
              SortExec: [tag@2 ASC,time@3 ASC]
                RecordBatchesExec: batches_groups=1 batches=1

And here is the output from EnforceSorting, where it has moved the SortExec up to the top of the union:

Optimized physical plan by EnforceSorting:
SortExec: [tag@2 ASC NULLS LAST]
  CoalescePartitionsExec
    ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time]
      RepartitionExec: partitioning=RoundRobinBatch(4)
        DeduplicateExec: [tag@2 ASC,time@3 ASC]
          SortPreservingMergeExec: [tag@2 ASC,time@3 ASC]
            SortExec: [tag@2 ASC,time@3 ASC]        <-- ** SortExec is moved to the output of Union, *resorting* the parquet file
              UnionExec
                ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/57d6a92a-314a-4a32-a633-33bc3e1fe7a3.parquet]]}, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time]
                RecordBatchesExec: batches_groups=1 batches=1

To Reproduce
I have a reproducer from IOx -- see https://github.com/influxdata/influxdb_iox/pull/6528#discussion_r1070632410

Expected behavior
I expect the SortExec to be left where it is (at the input to the

Additional context
I found this in the context of upgrading DataFusion in IOx: https://github.com/influxdata/influxdb_iox/pull/6528

@alamb alamb added the bug Something isn't working label Jan 17, 2023
@alamb
Copy link
Contributor Author

alamb commented Jan 17, 2023

cc @mustafasrepo and @mingmwang who I think have expertise in this area

@mustafasrepo
Copy link
Contributor

mustafasrepo commented Jan 17, 2023

I think the reason is that UnionExec doesn't implement fn maintains_input_order(&self) -> bool. But sometimes it maintains_input_order. If we implement fn maintains_input_order(&self) -> bool as below (Taken from self.output_ordering implementation). I think the problem will be solved.

fn maintains_input_order(&self) -> bool {
    let first_input_ordering = self.inputs[0].output_ordering();
    // If the Union is not partition aware and all the input ordering spec strictly equal with the first_input_ordering
    // `UnionExec` maintains input order
    //
    // It might be too strict here in the case that the input ordering are compatible but not exactly the same.
    // For example one input ordering has the ordering spec SortExpr('a','b','c') and the other has the ordering
    // spec SortExpr('a'), It is safe to derive the out ordering with the spec SortExpr('a').
    !self.partition_aware
        && first_input_ordering.is_some()
        && self
        .inputs
        .iter()
        .map(|plan| plan.output_ordering())
        .all(|ordering| {
            ordering.is_some()
                && sort_expr_list_eq_strict_order(
                ordering.unwrap(),
                first_input_ordering.unwrap(),
            )
        })
}

@alamb
Copy link
Contributor Author

alamb commented Jan 17, 2023

Thank you @mustafasrepo -- I will attempt to write a reproducer for this and try our your proposed fix

@alamb
Copy link
Contributor Author

alamb commented Jan 17, 2023

proposed PR: #4946

@mingmwang
Copy link
Contributor

Nice !!

@alamb
Copy link
Contributor Author

alamb commented Jan 18, 2023

Yes I am quite pleased with how sophisticated the sorting based optimizations are becoming

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants