Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine Equivalence and Ordering equivalence to simplify state #8006

Merged
merged 140 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
140 commits
Select commit Hold shift + click to select a range
44ee85c
combine equivalence and ordering equivalence
mustafasrepo Sep 20, 2023
56b92e1
Remove EquivalenceProperties struct
mustafasrepo Sep 22, 2023
620dcce
Minor changes
mustafasrepo Sep 22, 2023
944029b
all tests pass
mustafasrepo Sep 25, 2023
98f491b
Refactor oeq
mustafasrepo Sep 26, 2023
f2a4600
Simplifications
mustafasrepo Oct 2, 2023
346e64e
Resolve linter errors
mustafasrepo Oct 2, 2023
9ca970b
Minor changes
mustafasrepo Oct 2, 2023
722c2c2
Minor changes
mustafasrepo Oct 2, 2023
a243d35
Merge branch 'apache_main' into refactor/oeq_eq_properties
mustafasrepo Oct 2, 2023
f3f76d6
Add new tests
mustafasrepo Oct 2, 2023
9707b35
Simplifications window mode selection
mustafasrepo Oct 2, 2023
c4d7c99
Simplifications
mustafasrepo Oct 2, 2023
aa7de00
Use set_satisfy api
mustafasrepo Oct 2, 2023
6dbc85b
Use utils for aggregate
mustafasrepo Oct 3, 2023
c1656c7
Minor changes
mustafasrepo Oct 3, 2023
d77a6e0
Minor changes
mustafasrepo Oct 3, 2023
fb08cce
Minor changes
mustafasrepo Oct 3, 2023
8ce33be
All tests pass
mustafasrepo Oct 3, 2023
e1b1488
Simplifications
mustafasrepo Oct 3, 2023
aa019b3
Simplifications
mustafasrepo Oct 3, 2023
055dc91
Minor changes
mustafasrepo Oct 3, 2023
2f63ac7
Simplifications
mustafasrepo Oct 3, 2023
e383e46
All tests pass, fix bug
mustafasrepo Oct 3, 2023
85890c1
Remove unnecessary code
mustafasrepo Oct 3, 2023
9c703bd
Simplifications
mustafasrepo Oct 3, 2023
93b7dc2
Minor changes
mustafasrepo Oct 3, 2023
79b72b9
Simplifications
mustafasrepo Oct 4, 2023
c70ac05
Move oeq join to methods
mustafasrepo Oct 4, 2023
f6f9d47
Simplifications
mustafasrepo Oct 4, 2023
7bd3d23
Remove redundant code
mustafasrepo Oct 4, 2023
a6f18a6
Minor changes
mustafasrepo Oct 4, 2023
8dcc6cd
Minor changes
mustafasrepo Oct 4, 2023
68ff6ba
Simplifications
mustafasrepo Oct 4, 2023
552d447
Simplifications
mustafasrepo Oct 4, 2023
38f3101
Simplifications
mustafasrepo Oct 4, 2023
062823f
Move window to util from method, simplifications
mustafasrepo Oct 5, 2023
4a6ecd9
Simplifications
mustafasrepo Oct 5, 2023
795bf5b
Propagate meet in the union
mustafasrepo Oct 5, 2023
7d3d4f2
Simplifications
mustafasrepo Oct 5, 2023
a2f989a
Minor changes, rename
mustafasrepo Oct 5, 2023
597ce82
Address berkay reviews
mustafasrepo Oct 6, 2023
0d80709
Merge branch 'apache_main' into refactor/oeq_eq_properties
mustafasrepo Oct 9, 2023
1cb14c5
Simplifications
mustafasrepo Oct 11, 2023
8dafda2
Add new buggy test
mustafasrepo Oct 11, 2023
e8089c2
Add data test for sort requirement
mustafasrepo Oct 11, 2023
091bf80
Add experimental check
mustafasrepo Oct 11, 2023
85e9624
Add random test
mustafasrepo Oct 11, 2023
a94f2c7
Minor changes
mustafasrepo Oct 11, 2023
36c4835
Random test gives error
mustafasrepo Oct 11, 2023
aef6dc1
Fix missing test case
mustafasrepo Oct 11, 2023
b5022d1
Minor changes
mustafasrepo Oct 12, 2023
7687642
Minor changes
mustafasrepo Oct 12, 2023
988522f
Simplifications
mustafasrepo Oct 12, 2023
3cdf545
Merge branch 'apache_main' into refactor/oeq_eq_properties
mustafasrepo Oct 12, 2023
8c17c05
Minor changes
mustafasrepo Oct 12, 2023
62a1a25
Add new test case
mustafasrepo Oct 12, 2023
8cd941d
Minor changes
mustafasrepo Oct 12, 2023
4b2d6c1
Address reviews
mustafasrepo Oct 13, 2023
3309567
Minor changes
mustafasrepo Oct 13, 2023
535e1e8
Increase coverage of random tests
mustafasrepo Oct 13, 2023
eb60b5a
Remove redundant code
mustafasrepo Oct 13, 2023
b6d5077
Simplifications
mustafasrepo Oct 13, 2023
c454b35
Merge branch 'apache_main' into refactor/oeq_eq_properties
mustafasrepo Oct 13, 2023
39240b0
Simplifications
mustafasrepo Oct 13, 2023
a370104
Refactor on tests
metesynnada Oct 13, 2023
b1d8261
Solving clippy errors
metesynnada Oct 13, 2023
4122f6e
prune_lex improvements
metesynnada Oct 13, 2023
d1887b9
Merge branch 'apache_main' into refactor/oeq_eq_properties
mustafasrepo Oct 16, 2023
7b10397
Fix failing tests
mustafasrepo Oct 16, 2023
f34c724
Update get_finer and get_meet
mustafasrepo Oct 16, 2023
745e7cc
Fix window lex ordering implementation
mustafasrepo Oct 16, 2023
e007b98
Buggy state
mustafasrepo Oct 16, 2023
acc6c15
Do not use output ordering in the aggregate
mustafasrepo Oct 17, 2023
8692dce
Add union test
mustafasrepo Oct 17, 2023
1d53900
Update comment
mustafasrepo Oct 17, 2023
6f1b242
Merge branch 'apache_main' into refactor/oeq_eq_properties
mustafasrepo Oct 17, 2023
28f5066
Merge branch 'apache_main' into refactor/oeq_eq_properties
mustafasrepo Oct 17, 2023
9630081
Fix bug, when batch_size is small
mustafasrepo Oct 18, 2023
f165ace
Review Part 1
ozankabak Oct 19, 2023
edd42e3
Review Part 2
ozankabak Oct 20, 2023
94626ee
Change union meet implementation
mustafasrepo Oct 20, 2023
d8f9d46
Merge branch 'refactor/oeq_eq_properties' of https://github.com/synna…
mustafasrepo Oct 20, 2023
18d1d6e
Update comments
mustafasrepo Oct 20, 2023
6a6136d
Remove redundant check
mustafasrepo Oct 20, 2023
3d50b68
Simplify project out_expr function
mustafasrepo Oct 20, 2023
e6ec769
Remove Option<Vec<_>> API.
mustafasrepo Oct 20, 2023
c8eecad
Do not use project_out_expr
mustafasrepo Oct 23, 2023
76f4921
Simplifications
mustafasrepo Oct 23, 2023
36c65b8
Review Part 3
ozankabak Oct 23, 2023
1d30d89
Review Part 4
ozankabak Oct 23, 2023
f5d3d98
Review Part 5
ozankabak Oct 23, 2023
9aea383
Merge remote-tracking branch 'origin/apache_main' into refactor/oeq_e…
ozankabak Oct 23, 2023
a1087a1
Review Part 6
ozankabak Oct 23, 2023
111bb15
Review Part 7
ozankabak Oct 23, 2023
ac34dcc
Review Part 8
ozankabak Oct 23, 2023
4ad1006
Merge branch 'apache_main' into refactor/oeq_eq_properties
mustafasrepo Oct 24, 2023
3721f74
Update comments
mustafasrepo Oct 24, 2023
c1f9e17
Add new unit tests, simplifications
mustafasrepo Oct 24, 2023
a78a918
Resolve linter errors
mustafasrepo Oct 24, 2023
db3b4b5
Merge remote-tracking branch 'origin/apache_main' into refactor/oeq_e…
ozankabak Oct 24, 2023
f86e1f7
Merge remote-tracking branch 'origin/apache_main' into refactor/oeq_e…
ozankabak Oct 24, 2023
18e4e11
Simplify test codes
mustafasrepo Oct 25, 2023
f0b1052
Review Part 9
ozankabak Oct 25, 2023
4e845e4
Add unit tests for remove_redundant entries
mustafasrepo Oct 25, 2023
3979371
Simplifications
mustafasrepo Oct 25, 2023
fef62d6
Review Part 10
ozankabak Oct 25, 2023
bf01154
Fix test
mustafasrepo Oct 25, 2023
dfd2060
Add new test case, fix implementation
mustafasrepo Oct 25, 2023
29c8ae2
Review Part 11
ozankabak Oct 25, 2023
da18b45
Review Part 12
ozankabak Oct 25, 2023
378d5ae
Merge remote-tracking branch 'origin/apache_main' into refactor/oeq_e…
ozankabak Oct 25, 2023
155736a
Update comments
mustafasrepo Oct 26, 2023
4bd7a8d
Review Part 13
ozankabak Oct 26, 2023
299b758
Review Part 14
ozankabak Oct 27, 2023
e18ed11
Review Part 15
ozankabak Oct 27, 2023
3581703
Merge branch 'apache_main' into refactor/oeq_eq_properties
ozankabak Oct 27, 2023
9c47b4f
Review Part 16
ozankabak Oct 28, 2023
d111db3
Review Part 17
ozankabak Oct 28, 2023
1206af1
Review Part 18
ozankabak Oct 29, 2023
36ba8a9
Review Part 19
ozankabak Oct 29, 2023
5b47a2b
Review Part 20
ozankabak Oct 29, 2023
ace5beb
Review Part 21
ozankabak Oct 29, 2023
fa9850c
Review Part 22
ozankabak Oct 29, 2023
3134ee4
Review Part 23
ozankabak Oct 29, 2023
174c973
Review Part 24
ozankabak Oct 29, 2023
2c562c3
Do not construct idx and sort_expr unnecessarily, Update comments, Un…
mustafasrepo Oct 30, 2023
6ab2763
Review Part 25
ozankabak Oct 30, 2023
1a8fb60
Review Part 26
ozankabak Oct 30, 2023
47b42b8
Merge branch 'apache_main' into refactor/oeq_eq_properties
mustafasrepo Oct 31, 2023
ed80652
Name Changes, comment updates
mustafasrepo Oct 31, 2023
2a37ed8
Review Part 27
ozankabak Oct 31, 2023
53e44ac
Merge branch 'apache_main' into refactor/oeq_eq_properties
mustafasrepo Nov 2, 2023
9553168
Add issue links
mustafasrepo Nov 2, 2023
2e52d40
Merge branch 'apache_main' into refactor/oeq_eq_properties
mustafasrepo Nov 3, 2023
25f698d
Address reviews
mustafasrepo Nov 3, 2023
4de5540
Fix failing test
mustafasrepo Nov 3, 2023
f10c5c1
Merge branch 'main' into refactor/oeq_eq_properties
ozankabak Nov 3, 2023
b582cda
Update comments
mustafasrepo Nov 3, 2023
c7f0206
SortPreservingMerge, SortPreservingRepartition only preserves given e…
mustafasrepo Nov 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ If there are user-facing changes then we may require documentation to be updated

<!--
If there are any breaking changes to public APIs, please add the `api change` label.
-->
-->
9 changes: 3 additions & 6 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@ use crate::physical_plan::{
use arrow_schema::SchemaRef;
use datafusion_common::Statistics;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{
ordering_equivalence_properties_helper, LexOrdering, OrderingEquivalenceProperties,
PhysicalSortExpr,
};
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr};

use futures::StreamExt;
use object_store::{GetResultPayload, ObjectStore};
Expand Down Expand Up @@ -106,8 +103,8 @@ impl ExecutionPlan for ArrowExec {
.map(|ordering| ordering.as_slice())
}

fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
ordering_equivalence_properties_helper(
fn equivalence_properties(&self) -> EquivalenceProperties {
EquivalenceProperties::new_with_orderings(
self.schema(),
&self.projected_output_ordering,
)
Expand Down
8 changes: 3 additions & 5 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ use crate::physical_plan::{

use arrow::datatypes::SchemaRef;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{
ordering_equivalence_properties_helper, LexOrdering, OrderingEquivalenceProperties,
};
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};

/// Execution plan for scanning Avro data source
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -101,8 +99,8 @@ impl ExecutionPlan for AvroExec {
.map(|ordering| ordering.as_slice())
}

fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
ordering_equivalence_properties_helper(
fn equivalence_properties(&self) -> EquivalenceProperties {
EquivalenceProperties::new_with_orderings(
self.schema(),
&self.projected_output_ordering,
)
Expand Down
8 changes: 3 additions & 5 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ use crate::physical_plan::{
use arrow::csv;
use arrow::datatypes::SchemaRef;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{
ordering_equivalence_properties_helper, LexOrdering, OrderingEquivalenceProperties,
};
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};

use bytes::{Buf, Bytes};
use datafusion_common::config::ConfigOptions;
Expand Down Expand Up @@ -159,8 +157,8 @@ impl ExecutionPlan for CsvExec {
.map(|ordering| ordering.as_slice())
}

fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
ordering_equivalence_properties_helper(
fn equivalence_properties(&self) -> EquivalenceProperties {
EquivalenceProperties::new_with_orderings(
self.schema(),
&self.projected_output_ordering,
)
Expand Down
8 changes: 3 additions & 5 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ use crate::physical_plan::{
use arrow::json::ReaderBuilder;
use arrow::{datatypes::SchemaRef, json};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{
ordering_equivalence_properties_helper, LexOrdering, OrderingEquivalenceProperties,
};
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};

use bytes::{Buf, Bytes};
use futures::{ready, stream, StreamExt, TryStreamExt};
Expand Down Expand Up @@ -122,8 +120,8 @@ impl ExecutionPlan for NdJsonExec {
.map(|ordering| ordering.as_slice())
}

fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
ordering_equivalence_properties_helper(
fn equivalence_properties(&self) -> EquivalenceProperties {
EquivalenceProperties::new_with_orderings(
self.schema(),
&self.projected_output_ordering,
)
Expand Down
7 changes: 3 additions & 4 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ use crate::{
use arrow::datatypes::{DataType, SchemaRef};
use arrow::error::ArrowError;
use datafusion_physical_expr::{
ordering_equivalence_properties_helper, LexOrdering, OrderingEquivalenceProperties,
PhysicalExpr, PhysicalSortExpr,
EquivalenceProperties, LexOrdering, PhysicalExpr, PhysicalSortExpr,
};

use bytes::Bytes;
Expand Down Expand Up @@ -315,8 +314,8 @@ impl ExecutionPlan for ParquetExec {
.map(|ordering| ordering.as_slice())
}

fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
ordering_equivalence_properties_helper(
fn equivalence_properties(&self) -> EquivalenceProperties {
EquivalenceProperties::new_with_orderings(
self.schema(),
&self.projected_output_ordering,
)
Expand Down
104 changes: 61 additions & 43 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,11 @@ use crate::physical_plan::{
use arrow::compute::SortOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_expr::logical_plan::JoinType;
use datafusion_physical_expr::equivalence::EquivalenceProperties;
use datafusion_physical_expr::expressions::{Column, NoOp};
use datafusion_physical_expr::utils::{
map_columns_before_projection, ordering_satisfy_requirement_concrete,
use datafusion_physical_expr::utils::map_columns_before_projection;
use datafusion_physical_expr::{
physical_exprs_equal, EquivalenceProperties, PhysicalExpr,
};
use datafusion_physical_expr::{expr_list_eq_strict_order, PhysicalExpr};
use datafusion_physical_plan::unbounded_output;
use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec};

Expand Down Expand Up @@ -498,7 +497,7 @@ fn reorder_aggregate_keys(

if parent_required.len() != output_exprs.len()
|| !agg_exec.group_by().null_expr().is_empty()
|| expr_list_eq_strict_order(&output_exprs, parent_required)
|| physical_exprs_equal(&output_exprs, parent_required)
{
Ok(PlanWithKeyRequirements::new(agg_plan))
} else {
Expand Down Expand Up @@ -564,13 +563,11 @@ fn reorder_aggregate_keys(
Arc::new(Column::new(
name,
agg_schema.index_of(name).unwrap(),
))
as Arc<dyn PhysicalExpr>,
)) as _,
name.to_owned(),
)
})
.collect::<Vec<_>>();
let agg_schema = new_final_agg.schema();
let agg_fields = agg_schema.fields();
for (idx, field) in
agg_fields.iter().enumerate().skip(output_columns.len())
Expand Down Expand Up @@ -706,10 +703,9 @@ pub(crate) fn reorder_join_keys_to_inputs(
) {
if !new_positions.is_empty() {
let new_join_on = new_join_conditions(&left_keys, &right_keys);
let mut new_sort_options = vec![];
for idx in 0..sort_options.len() {
new_sort_options.push(sort_options[new_positions[idx]])
}
let new_sort_options = (0..sort_options.len())
.map(|idx| sort_options[new_positions[idx]])
.collect();
return Ok(Arc::new(SortMergeJoinExec::try_new(
left.clone(),
right.clone(),
Expand Down Expand Up @@ -757,39 +753,40 @@ fn try_reorder(
expected: &[Arc<dyn PhysicalExpr>],
equivalence_properties: &EquivalenceProperties,
) -> Option<(JoinKeyPairs, Vec<usize>)> {
let eq_groups = equivalence_properties.eq_group();
let mut normalized_expected = vec![];
let mut normalized_left_keys = vec![];
let mut normalized_right_keys = vec![];
if join_keys.left_keys.len() != expected.len() {
return None;
}
if expr_list_eq_strict_order(expected, &join_keys.left_keys)
|| expr_list_eq_strict_order(expected, &join_keys.right_keys)
if physical_exprs_equal(expected, &join_keys.left_keys)
|| physical_exprs_equal(expected, &join_keys.right_keys)
{
return Some((join_keys, vec![]));
} else if !equivalence_properties.classes().is_empty() {
} else if !equivalence_properties.eq_group().is_empty() {
normalized_expected = expected
.iter()
.map(|e| equivalence_properties.normalize_expr(e.clone()))
.map(|e| eq_groups.normalize_expr(e.clone()))
.collect::<Vec<_>>();
assert_eq!(normalized_expected.len(), expected.len());

normalized_left_keys = join_keys
.left_keys
.iter()
.map(|e| equivalence_properties.normalize_expr(e.clone()))
.map(|e| eq_groups.normalize_expr(e.clone()))
.collect::<Vec<_>>();
assert_eq!(join_keys.left_keys.len(), normalized_left_keys.len());

normalized_right_keys = join_keys
.right_keys
.iter()
.map(|e| equivalence_properties.normalize_expr(e.clone()))
.map(|e| eq_groups.normalize_expr(e.clone()))
.collect::<Vec<_>>();
assert_eq!(join_keys.right_keys.len(), normalized_right_keys.len());

if expr_list_eq_strict_order(&normalized_expected, &normalized_left_keys)
|| expr_list_eq_strict_order(&normalized_expected, &normalized_right_keys)
if physical_exprs_equal(&normalized_expected, &normalized_left_keys)
|| physical_exprs_equal(&normalized_expected, &normalized_right_keys)
{
return Some((join_keys, vec![]));
}
Expand Down Expand Up @@ -870,7 +867,7 @@ fn new_join_conditions(
r_key.as_any().downcast_ref::<Column>().unwrap().clone(),
)
})
.collect::<Vec<_>>()
.collect()
}

/// Updates `dist_onward` such that, to keep track of
Expand Down Expand Up @@ -935,9 +932,9 @@ fn add_roundrobin_on_top(
let should_preserve_ordering = input.output_ordering().is_some();

let partitioning = Partitioning::RoundRobinBatch(n_target);
let repartition = RepartitionExec::try_new(input, partitioning)?
.with_preserve_order(should_preserve_ordering);
let new_plan = Arc::new(repartition) as Arc<dyn ExecutionPlan>;
let repartition = RepartitionExec::try_new(input, partitioning)?;
let new_plan = Arc::new(repartition.with_preserve_order(should_preserve_ordering))
as Arc<dyn ExecutionPlan>;

// update distribution onward with new operator
update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
Expand Down Expand Up @@ -1011,9 +1008,9 @@ fn add_hash_on_top(
input
};
let partitioning = Partitioning::Hash(hash_exprs, n_target);
let repartition = RepartitionExec::try_new(new_plan, partitioning)?
.with_preserve_order(should_preserve_ordering);
new_plan = Arc::new(repartition) as _;
let repartition = RepartitionExec::try_new(new_plan, partitioning)?;
new_plan =
Arc::new(repartition.with_preserve_order(should_preserve_ordering)) as _;

// update distribution onward with new operator
update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
Expand Down Expand Up @@ -1302,16 +1299,12 @@ fn ensure_distribution(

// There is an ordering requirement of the operator:
if let Some(required_input_ordering) = required_input_ordering {
let existing_ordering = child.output_ordering().unwrap_or(&[]);
// Either:
// - Ordering requirement cannot be satisfied by preserving ordering through repartitions, or
// - using order preserving variant is not desirable.
let ordering_satisfied = ordering_satisfy_requirement_concrete(
existing_ordering,
required_input_ordering,
|| child.equivalence_properties(),
|| child.ordering_equivalence_properties(),
);
let ordering_satisfied = child
.equivalence_properties()
.ordering_satisfy_requirement(required_input_ordering);
if !ordering_satisfied || !order_preserving_variants_desirable {
replace_order_preserving_variants(&mut child, dist_onward)?;
// If ordering requirements were satisfied before repartitioning,
Expand Down Expand Up @@ -3763,14 +3756,14 @@ mod tests {
fn repartition_transitively_past_sort_with_filter() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
expr: col("a", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan = sort_exec(sort_key, filter_exec(parquet_exec()), false);

let expected = &[
"SortPreservingMergeExec: [c@2 ASC]",
"SortExec: expr=[c@2 ASC]",
"SortPreservingMergeExec: [a@0 ASC]",
"SortExec: expr=[a@0 ASC]",
// Expect repartition on the input to the sort (as it can benefit from additional parallelism)
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
Expand All @@ -3780,7 +3773,7 @@ mod tests {
assert_optimized!(expected, plan.clone(), true);

let expected_first_sort_enforcement = &[
"SortExec: expr=[c@2 ASC]",
"SortExec: expr=[a@0 ASC]",
"CoalescePartitionsExec",
"FilterExec: c@2 = 0",
// Expect repartition on the input of the filter (as it can benefit from additional parallelism)
Expand Down Expand Up @@ -4357,29 +4350,54 @@ mod tests {
fn do_not_preserve_ordering_through_repartition() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
expr: col("a", &schema).unwrap(),
options: SortOptions::default(),
}];
let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
let physical_plan = sort_preserving_merge_exec(sort_key, filter_exec(input));

let expected = &[
"SortPreservingMergeExec: [c@2 ASC]",
"SortExec: expr=[c@2 ASC]",
"SortPreservingMergeExec: [a@0 ASC]",
"SortExec: expr=[a@0 ASC]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
];

assert_optimized!(expected, physical_plan.clone(), true);

let expected = &[
"SortExec: expr=[c@2 ASC]",
"SortExec: expr=[a@0 ASC]",
"CoalescePartitionsExec",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
];
assert_optimized!(expected, physical_plan, false);

Ok(())
}

#[test]
fn no_need_for_sort_after_filter() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
let physical_plan = sort_preserving_merge_exec(sort_key, filter_exec(input));

let expected = &[
// After CoalescePartitionsExec c is still constant. Hence c@2 ASC ordering is already satisfied.
"CoalescePartitionsExec",
// Since after this stage c is constant. c@2 ASC ordering is already satisfied.
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];

assert_optimized!(expected, physical_plan.clone(), true);
assert_optimized!(expected, physical_plan, false);

Ok(())
Expand Down
Loading