Skip to content

Commit

Permalink
Calculate ordering equivalence for expressions (rather than just colu…
Browse files Browse the repository at this point in the history
…mns) (#8281)

* Complex exprs requirement support (#215)

* Discover ordering of complex expressions in group by and window partition by

* Remove unnecessary tests

* Update comments

* Minor changes

* Better projection support complex expression support

* Fix failing test

* Simplifications

* Simplifications

* Add is end flag

* Simplifications

* Simplifications

* Simplifications

* Minor changes

* Minor changes

* Minor changes

* All tests pass

* Change implementation of find_longest_permutation

* Minor changes

* Minor changes

* Remove projection section

* Remove projection implementation

* Fix linter errors

* Remove projection sections

* Minor changes

* Add docstring comments

* Add comments

* Minor changes

* Minor changes

* Add comments

* simplifications

* Minor changes

* Review Part 1

* Add new tests

* Review Part 2

* Address review feedback

* Remove error message check in the test

---------

Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
  • Loading branch information
mustafasrepo and ozankabak authored Nov 23, 2023
1 parent 98f1bc0 commit 9619f02
Show file tree
Hide file tree
Showing 6 changed files with 1,155 additions and 305 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3787,7 +3787,7 @@ pub(crate) mod tests {
fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
expr: col("a", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan = sort_exec(
Expand All @@ -3804,9 +3804,9 @@ pub(crate) mod tests {
);

let expected = &[
"SortPreservingMergeExec: [c@2 ASC]",
"SortPreservingMergeExec: [a@0 ASC]",
// Expect repartition on the input to the sort (as it can benefit from additional parallelism)
"SortExec: expr=[c@2 ASC]",
"SortExec: expr=[a@0 ASC]",
"ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
"FilterExec: c@2 = 0",
// repartition is lowest down
Expand All @@ -3817,7 +3817,7 @@ pub(crate) mod tests {
assert_optimized!(expected, plan.clone(), true);

let expected_first_sort_enforcement = &[
"SortExec: expr=[c@2 ASC]",
"SortExec: expr=[a@0 ASC]",
"CoalescePartitionsExec",
"ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
"FilterExec: c@2 = 0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,15 +357,19 @@ mod tests {
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);

let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
let expected_input = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
Expand Down Expand Up @@ -434,19 +438,20 @@ mod tests {
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);

let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
let expected_input = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",

];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
Expand All @@ -466,19 +471,23 @@ mod tests {
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);

let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
let expected_input = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
Expand All @@ -499,21 +508,25 @@ mod tests {
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);

let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
let expected_input = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
Expand All @@ -531,18 +544,22 @@ mod tests {
let physical_plan: Arc<dyn ExecutionPlan> =
coalesce_partitions_exec(coalesce_batches_exec);

let expected_input = ["CoalescePartitionsExec",
let expected_input = [
"CoalescePartitionsExec",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
let expected_optimized = ["CoalescePartitionsExec",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"CoalescePartitionsExec",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
Expand Down Expand Up @@ -570,7 +587,7 @@ mod tests {
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
Expand Down Expand Up @@ -603,16 +620,20 @@ mod tests {
sort,
);

let expected_input = ["SortPreservingMergeExec: [c@1 ASC]",
let expected_input = [
"SortPreservingMergeExec: [c@1 ASC]",
" SortExec: expr=[c@1 ASC]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
let expected_optimized = ["SortPreservingMergeExec: [c@1 ASC]",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"SortPreservingMergeExec: [c@1 ASC]",
" SortExec: expr=[c@1 ASC]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
Expand All @@ -628,15 +649,19 @@ mod tests {
let physical_plan =
sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions, false);

let expected_input = ["SortExec: expr=[a@0 ASC NULLS LAST]",
let expected_input = [
"SortExec: expr=[a@0 ASC NULLS LAST]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
Expand Down Expand Up @@ -766,15 +791,19 @@ mod tests {
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);

let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
let expected_input = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);
Ok(())
}
Expand Down
Loading

0 comments on commit 9619f02

Please sign in to comment.