From 5ddcbc42c1d601359a621590862605eac07470fc Mon Sep 17 00:00:00 2001 From: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Date: Sat, 3 Jun 2023 09:34:00 +0300 Subject: [PATCH] Resolve contradictory requirements by conversion of ordering sensitive aggregators (#6482) * Naive test pass i * Add new tests and simplifications * move tests to the .slt file * update requirement * update tests * Add support for partiallyOrdered aggregation sensitive. * Resolve linter errors * update comments * minor changes * retract changes in generated * update proto files * Simplifications * Make types consistent in schema, and data * Update todos * Convert API to vector * Convert get_finest to handle Vector inputs * simplifications, update comment * initial commit, add test * Add support for FIRST Aggregate function. * Add support for last aggregate * Update cargo.lock * Remove distinct, and limit from First and last aggregate. * Add reverse for First and Last Aggregator * Update cargo lock * Minor code simplifications * Update comment * Update documents * Fix projection pushdown bug * fix projection push down failure bug * combine first_agg and last_agg parsers * Update documentation * Update subproject * initial commit * Add test code * initial version * simplify prints * minor changes * sqllogictests pass * All tests pass * update proto function names * Minor changes * do not consider ordering requirement in ordering insensitive aggregators * Reject aggregate order by for window functions. * initial commit * Add new tests * Add reverse for sort removal * simplifications * simplifications * Bug fix, do not consider reverse requirement if it is not possible * Fix cargo lock file * Change reverse_order_by function place * Move required input ordering calculation logic to its own function * Add new tests * simplifications * Update comment * Rename aggregator first and last * minor change * Comment improvements * Remove count from First,Last accumulators * Remove clone * Simplifications * Simplifications, update comment * Improve comments * Move LexOrdering requirement to common place * Update comment, refactor implementation * bug fix: * Use naive requirement when reverse requirement is not helpful by convention. * Update test * Update comments * Change function place * Move get_finer to utls * change name of last, first impl, * Fix error message * change display of first and last --------- Co-authored-by: Mehmet Ozan Kabak Co-authored-by: berkaysynnada --- datafusion/core/src/execution/context.rs | 5 +- .../core/src/physical_plan/aggregates/mod.rs | 338 +++++++++++++----- .../sqllogictests/test_files/aggregate.slt | 4 +- .../sqllogictests/test_files/explain.slt | 2 +- .../sqllogictests/test_files/groupby.slt | 205 +++++++++++ .../physical-expr/src/aggregate/first_last.rs | 14 +- datafusion/physical-expr/src/aggregate/mod.rs | 11 + datafusion/physical-expr/src/lib.rs | 6 +- datafusion/physical-expr/src/sort_expr.rs | 5 +- datafusion/physical-expr/src/utils.rs | 38 ++ .../physical-expr/src/window/aggregate.rs | 6 +- .../physical-expr/src/window/built_in.rs | 4 +- .../src/window/sliding_aggregate.rs | 6 +- .../physical-expr/src/window/window_expr.rs | 13 - 14 files changed, 531 insertions(+), 126 deletions(-) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index f43d7c87c4c6..69f621bb4452 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1460,14 +1460,13 @@ impl SessionState { // The EnforceDistribution rule is for adding essential repartition to satisfy the required // distribution. Please make sure that the whole plan tree is determined before this rule. Arc::new(EnforceDistribution::new()), + // The CombinePartialFinalAggregate rule should be applied after the EnforceDistribution rule + Arc::new(CombinePartialFinalAggregate::new()), // The EnforceSorting rule is for adding essential local sorting to satisfy the required // ordering. Please make sure that the whole plan tree is determined before this rule. // Note that one should always run this rule after running the EnforceDistribution rule // as the latter may break local sorting requirements. Arc::new(EnforceSorting::new()), - // The CombinePartialFinalAggregate rule should be applied after the EnforceDistribution - // and EnforceSorting rules - Arc::new(CombinePartialFinalAggregate::new()), // The CoalesceBatches rule will not influence the distribution and ordering of the // whole plan tree. Therefore, to avoid influencing other rules, it should run last. Arc::new(CoalesceBatches::new()), diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index 9eba86929062..455a86660e1c 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -37,10 +37,10 @@ use datafusion_physical_expr::{ aggregate::row_accumulator::RowAccumulator, equivalence::project_equivalence_properties, expressions::{Avg, CastExpr, Column, Sum}, - normalize_out_expr_with_columns_map, - utils::{convert_to_expr, get_indices_of_matching_exprs, ordering_satisfy_concrete}, - AggregateExpr, OrderingEquivalenceProperties, PhysicalExpr, PhysicalSortExpr, - PhysicalSortRequirement, + normalize_out_expr_with_columns_map, reverse_order_bys, + utils::{convert_to_expr, get_indices_of_matching_exprs}, + AggregateExpr, LexOrdering, LexOrderingReq, OrderingEquivalenceProperties, + PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, }; use std::any::Any; use std::collections::HashMap; @@ -52,8 +52,11 @@ mod row_hash; mod utils; pub use datafusion_expr::AggregateFunction; +use datafusion_physical_expr::aggregate::is_order_sensitive; pub use datafusion_physical_expr::expressions::create_aggregate_expr; -use datafusion_physical_expr::expressions::{ArrayAgg, FirstValue, LastValue}; +use datafusion_physical_expr::utils::{ + get_finer_ordering, ordering_satisfy_requirement_concrete, +}; /// Hash aggregate modes #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -213,7 +216,7 @@ pub(crate) struct AggregationOrdering { /// expressions match input ordering. order_indices: Vec, /// Actual ordering information of the GROUP BY columns. - ordering: Vec, + ordering: LexOrdering, } /// Hash aggregate execution plan @@ -228,7 +231,7 @@ pub struct AggregateExec { /// FILTER (WHERE clause) expression for each aggregate expression pub(crate) filter_expr: Vec>>, /// (ORDER BY clause) expression for each aggregate expression - pub(crate) order_by_expr: Vec>>, + pub(crate) order_by_expr: Vec>, /// Input plan, could be a partial aggregate or the input to the aggregate pub(crate) input: Arc, /// Schema after the aggregate is applied @@ -244,7 +247,7 @@ pub struct AggregateExec { metrics: ExecutionPlanMetricsSet, /// Stores mode and output ordering information for the `AggregateExec`. aggregation_ordering: Option, - required_input_ordering: Option>, + required_input_ordering: Option, } /// Calculates the working mode for `GROUP BY` queries. @@ -339,6 +342,29 @@ fn output_group_expr_helper(group_by: &PhysicalGroupBy) -> Vec], + order_by_expr: &[Option], +) -> Option { + for (aggr_expr, fn_reqs) in aggr_expr.iter().zip(order_by_expr.iter()) { + // If the aggregation function is a non-reversible order-sensitive function + // and there is a hard requirement, choose first such requirement: + if is_order_sensitive(aggr_expr) + && aggr_expr.reverse_expr().is_none() + && fn_reqs.is_some() + { + return fn_reqs.clone(); + } + } + None +} + /// This function gets the finest ordering requirement among all the aggregation /// functions. If requirements are conflicting, (i.e. we can not compute the /// aggregations in a single [`AggregateExec`]), the function returns an error. @@ -346,33 +372,45 @@ fn get_finest_requirement< F: Fn() -> EquivalenceProperties, F2: Fn() -> OrderingEquivalenceProperties, >( - order_by_expr: &[Option>], + aggr_expr: &mut [Arc], + order_by_expr: &mut [Option], eq_properties: F, ordering_eq_properties: F2, -) -> Result>> { - let mut result: Option> = None; - for fn_reqs in order_by_expr.iter().flatten() { - if let Some(result) = &mut result { - if ordering_satisfy_concrete( - result, - fn_reqs, +) -> Result> { + let mut finest_req = get_init_req(aggr_expr, order_by_expr); + for (aggr_expr, fn_req) in aggr_expr.iter_mut().zip(order_by_expr.iter_mut()) { + let fn_req = if let Some(fn_req) = fn_req { + fn_req + } else { + continue; + }; + if let Some(finest_req) = &mut finest_req { + if let Some(finer) = get_finer_ordering( + finest_req, + fn_req, &eq_properties, &ordering_eq_properties, ) { - // Do not update the result as it already satisfies current - // function's requirement: + *finest_req = finer.to_vec(); continue; } - if ordering_satisfy_concrete( - fn_reqs, - result, - &eq_properties, - &ordering_eq_properties, - ) { - // Update result with current function's requirements, as it is - // a finer requirement than what we currently have. - *result = fn_reqs.clone(); - continue; + // If an aggregate function is reversible, analyze whether its reverse + // direction is compatible with existing requirements: + if let Some(reverse) = aggr_expr.reverse_expr() { + let fn_req_reverse = reverse_order_bys(fn_req); + if let Some(finer) = get_finer_ordering( + finest_req, + &fn_req_reverse, + &eq_properties, + &ordering_eq_properties, + ) { + // We need to update `aggr_expr` with its reverse, since only its + // reverse requirement is compatible with existing requirements: + *aggr_expr = reverse; + *finest_req = finer.to_vec(); + *fn_req = fn_req_reverse; + continue; + } } // If neither of the requirements satisfy the other, this means // requirements are conflicting. Currently, we do not support @@ -381,20 +419,107 @@ fn get_finest_requirement< "Conflicting ordering requirements in aggregate functions is not supported".to_string(), )); } else { - result = Some(fn_reqs.clone()); + finest_req = Some(fn_req.clone()); } } - Ok(result) + Ok(finest_req) } -/// Checks whether the given aggregate expression is order-sensitive. -/// For instance, a `SUM` aggregation doesn't depend on the order of its inputs. -/// However, a `FirstAgg` depends on the input ordering (if the order changes, -/// the first value in the list would change). -fn is_order_sensitive(aggr_expr: &Arc) -> bool { - aggr_expr.as_any().is::() - || aggr_expr.as_any().is::() - || aggr_expr.as_any().is::() +/// Calculate the required input ordering for the [`AggregateExec`] by considering +/// ordering requirements of order-sensitive aggregation functions. +fn calc_required_input_ordering( + input: &Arc, + aggr_expr: &mut [Arc], + aggregator_reqs: LexOrderingReq, + aggregator_reverse_reqs: Option, + aggregation_ordering: &Option, +) -> Result> { + let mut required_input_ordering = vec![]; + // Boolean shows that whether `required_input_ordering` stored comes from + // `aggregator_reqs` or `aggregator_reverse_reqs` + let mut reverse_req = false; + // If reverse aggregator is None, there is no way to run aggregators in reverse mode. Hence ignore it during analysis + let aggregator_requirements = + if let Some(aggregator_reverse_reqs) = aggregator_reverse_reqs { + // If existing ordering doesn't satisfy requirement, we should do calculations + // on naive requirement (by convention, otherwise the final plan will be unintuitive), + // even if reverse ordering is possible. + // Hence, while iterating consider naive requirement last, by this way + // we prioritize naive requirement over reverse requirement, when + // reverse requirement is not helpful with removing SortExec from the plan. + vec![(true, aggregator_reverse_reqs), (false, aggregator_reqs)] + } else { + vec![(false, aggregator_reqs)] + }; + for (is_reverse, aggregator_requirement) in aggregator_requirements.into_iter() { + if let Some(AggregationOrdering { + ordering, + // If the mode is FullyOrdered or PartiallyOrdered (i.e. we are + // running with bounded memory, without breaking the pipeline), + // then we append the aggregator ordering requirement to the existing + // ordering. This way, we can still run with bounded memory. + mode: GroupByOrderMode::FullyOrdered | GroupByOrderMode::PartiallyOrdered, + .. + }) = aggregation_ordering + { + // Get the section of the input ordering that enables us to run in + // FullyOrdered or PartiallyOrdered modes: + let requirement_prefix = + if let Some(existing_ordering) = input.output_ordering() { + &existing_ordering[0..ordering.len()] + } else { + &[] + }; + let mut requirement = + PhysicalSortRequirement::from_sort_exprs(requirement_prefix.iter()); + for req in aggregator_requirement { + if requirement.iter().all(|item| req.expr.ne(&item.expr)) { + requirement.push(req); + } + } + required_input_ordering = requirement; + } else { + required_input_ordering = aggregator_requirement; + } + // keep track of from which direction required_input_ordering is constructed + reverse_req = is_reverse; + // If all of the order-sensitive aggregate functions are reversible (such as all of the order-sensitive aggregators are + // either FIRST_VALUE or LAST_VALUE). We can run aggregate expressions both in the direction of naive required ordering + // (e.g finest requirement that satisfy each aggregate function requirement) and in its reversed (opposite) direction. + // We analyze these two possibilities, and use the version that satisfies existing ordering (This saves us adding + // unnecessary SortExec to the final plan). If none of the versions satisfy existing ordering, we use naive required ordering. + // In short, if running aggregators in reverse order, helps us to remove a `SortExec`, we do so. Otherwise, we use aggregators as is. + let existing_ordering = input.output_ordering().unwrap_or(&[]); + if ordering_satisfy_requirement_concrete( + existing_ordering, + &required_input_ordering, + || input.equivalence_properties(), + || input.ordering_equivalence_properties(), + ) { + break; + } + } + // If `required_input_ordering` is constructed using reverse requirement, we should reverse + // each `aggr_expr` to be able to correctly calculate their result in reverse order. + if reverse_req { + aggr_expr + .iter_mut() + .map(|elem| { + if is_order_sensitive(elem) { + if let Some(reverse) = elem.reverse_expr() { + *elem = reverse; + } else { + return Err(DataFusionError::Execution( + "Aggregate expression should have a reverse expression" + .to_string(), + )); + } + } + Ok(()) + }) + .collect::>>()?; + } + Ok((!required_input_ordering.is_empty()).then_some(required_input_ordering)) } impl AggregateExec { @@ -402,9 +527,9 @@ impl AggregateExec { pub fn try_new( mode: AggregateMode, group_by: PhysicalGroupBy, - aggr_expr: Vec>, + mut aggr_expr: Vec>, filter_expr: Vec>>, - mut order_by_expr: Vec>>, + mut order_by_expr: Vec>, input: Arc, input_schema: SchemaRef, ) -> Result { @@ -417,30 +542,53 @@ impl AggregateExec { )?; let schema = Arc::new(schema); - let mut aggregator_requirement = None; - // Ordering requirement makes sense only in Partial and Single modes. - // In other modes, all groups are collapsed, therefore their input schema - // can not contain expressions in the requirement. - if mode == AggregateMode::Partial || mode == AggregateMode::Single { - order_by_expr = aggr_expr - .iter() - .zip(order_by_expr.into_iter()) - .map(|(aggr_expr, fn_reqs)| { - // If aggregation function is ordering sensitive, keep ordering requirement as is; otherwise ignore requirement - if is_order_sensitive(aggr_expr) { - fn_reqs - } else { - None - } - }) - .collect::>(); + // Reset ordering requirement to `None` if aggregator is not order-sensitive + order_by_expr = aggr_expr + .iter() + .zip(order_by_expr.into_iter()) + .map(|(aggr_expr, fn_reqs)| { + // If aggregation function is ordering sensitive, keep ordering requirement as is; otherwise ignore requirement + if is_order_sensitive(aggr_expr) { + fn_reqs + } else { + None + } + }) + .collect::>(); + + let mut aggregator_reqs = vec![]; + let mut aggregator_reverse_reqs = None; + // Currently we support order-sensitive aggregation only in `Single` mode. + // For `Final` and `FinalPartitioned` modes, we cannot guarantee they will receive + // data according to ordering requirements. As long as we cannot produce correct result + // in `Final` mode, it is not important to produce correct result in `Partial` mode. + // We only support `Single` mode, where we are sure that output produced is final, and it + // is produced in a single step. + if mode == AggregateMode::Single { let requirement = get_finest_requirement( - &order_by_expr, + &mut aggr_expr, + &mut order_by_expr, || input.equivalence_properties(), || input.ordering_equivalence_properties(), )?; - aggregator_requirement = requirement + let aggregator_requirement = requirement + .as_ref() .map(|exprs| PhysicalSortRequirement::from_sort_exprs(exprs.iter())); + aggregator_reqs = aggregator_requirement.unwrap_or(vec![]); + // If all aggregate expressions are reversible, also consider reverse + // requirement(s). The reason is that existing ordering may satisfy the + // given requirement or its reverse. By considering both, we can generate better plans. + if aggr_expr + .iter() + .all(|expr| !is_order_sensitive(expr) || expr.reverse_expr().is_some()) + { + let reverse_agg_requirement = requirement.map(|reqs| { + PhysicalSortRequirement::from_sort_exprs( + reverse_order_bys(&reqs).iter(), + ) + }); + aggregator_reverse_reqs = reverse_agg_requirement; + } } // construct a map from the input columns to the output columns of the Aggregation @@ -455,37 +603,22 @@ impl AggregateExec { let aggregation_ordering = calc_aggregation_ordering(&input, &group_by); - let mut required_input_ordering = None; - if let Some(AggregationOrdering { - ordering, - // If the mode is FullyOrdered or PartiallyOrdered (i.e. we are - // running with bounded memory, without breaking pipeline), then - // we append aggregator ordering requirement to the existing - // ordering. This way, we can still run with bounded memory. - mode: GroupByOrderMode::FullyOrdered | GroupByOrderMode::PartiallyOrdered, - .. - }) = &aggregation_ordering + let required_input_ordering = calc_required_input_ordering( + &input, + &mut aggr_expr, + aggregator_reqs, + aggregator_reverse_reqs, + &aggregation_ordering, + )?; + + // If aggregator is working on multiple partitions and there is an order-sensitive aggregator with a requirement return error. + if input.output_partitioning().partition_count() > 1 + && order_by_expr.iter().any(|req| req.is_some()) { - if let Some(aggregator_requirement) = aggregator_requirement { - // Get the section of the input ordering that enables us to run in the - // FullyOrdered or PartiallyOrdered mode: - let requirement_prefix = - if let Some(existing_ordering) = input.output_ordering() { - existing_ordering[0..ordering.len()].to_vec() - } else { - vec![] - }; - let mut requirement = - PhysicalSortRequirement::from_sort_exprs(requirement_prefix.iter()); - for req in aggregator_requirement { - if requirement.iter().all(|item| req.expr.ne(&item.expr)) { - requirement.push(req); - } - } - required_input_ordering = Some(requirement); - } - } else { - required_input_ordering = aggregator_requirement; + return Err(DataFusionError::NotImplemented( + "Order-sensitive aggregators is not supported on multiple partitions" + .to_string(), + )); } Ok(AggregateExec { @@ -530,7 +663,7 @@ impl AggregateExec { } /// ORDER BY clause expression for each aggregate expression - pub fn order_by_expr(&self) -> &[Option>] { + pub fn order_by_expr(&self) -> &[Option] { &self.order_by_expr } @@ -642,7 +775,7 @@ impl ExecutionPlan for AggregateExec { } } - fn required_input_ordering(&self) -> Vec>> { + fn required_input_ordering(&self) -> Vec> { vec![self.required_input_ordering.clone()] } @@ -1065,7 +1198,7 @@ mod tests { use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion_physical_expr::expressions::{ - lit, ApproxDistinct, Column, Count, Median, + lit, ApproxDistinct, Column, Count, FirstValue, Median, }; use datafusion_physical_expr::{ AggregateExpr, EquivalenceProperties, OrderedColumn, @@ -1714,6 +1847,7 @@ mod tests { descending: false, nulls_first: false, }; + // This is the reverse requirement of options1 let options2 = SortOptions { descending: true, nulls_first: true, @@ -1729,8 +1863,7 @@ mod tests { &vec![OrderedColumn::new(col_a.clone(), options1)], &vec![OrderedColumn::new(col_c.clone(), options2)], )); - - let order_by_exprs = vec![ + let mut order_by_exprs = vec![ None, Some(vec![PhysicalSortExpr { expr: Arc::new(col_a.clone()), @@ -1746,7 +1879,7 @@ mod tests { }]), Some(vec![ PhysicalSortExpr { - expr: Arc::new(col_a), + expr: Arc::new(col_a.clone()), options: options1, }, PhysicalSortExpr { @@ -1754,9 +1887,22 @@ mod tests { options: options1, }, ]), + // Since aggregate expression is reversible (FirstValue), we should be able to resolve below + // contradictory requirement by reversing it. + Some(vec![PhysicalSortExpr { + expr: Arc::new(col_b.clone()), + options: options2, + }]), ]; + let aggr_expr = Arc::new(FirstValue::new( + Arc::new(col_a.clone()), + "first1", + DataType::Int32, + )) as _; + let mut aggr_exprs = vec![aggr_expr; order_by_exprs.len()]; let res = get_finest_requirement( - &order_by_exprs, + &mut aggr_exprs, + &mut order_by_exprs, || eq_properties.clone(), || ordering_eq_properties.clone(), )?; diff --git a/datafusion/core/tests/sqllogictests/test_files/aggregate.slt b/datafusion/core/tests/sqllogictests/test_files/aggregate.slt index ab3516e9e55b..fea1b6cb8ed9 100644 --- a/datafusion/core/tests/sqllogictests/test_files/aggregate.slt +++ b/datafusion/core/tests/sqllogictests/test_files/aggregate.slt @@ -65,7 +65,7 @@ statement error Error during planning: The percentile sample points count for Ap SELECT approx_percentile_cont(c3, 0.95, 111.1) FROM aggregate_test_100 # csv_query_array_agg_unsupported -statement error This feature is not implemented: ORDER BY not supported in ARRAY_AGG: c1 +statement error This feature is not implemented: Order-sensitive aggregators is not supported on multiple partitions SELECT array_agg(c13 ORDER BY c1) FROM aggregate_test_100 statement error This feature is not implemented: LIMIT not supported in ARRAY_AGG: 1 @@ -1169,7 +1169,7 @@ select c2, sum(c3) sum_c3, avg(c3) avg_c3, max(c3) max_c3, min(c3) min_c3, count 5 -194 -13.857142857143 118 -101 14 # csv_query_array_agg_unsupported -statement error This feature is not implemented: ORDER BY not supported in ARRAY_AGG: c1 +statement error This feature is not implemented: Order-sensitive aggregators is not supported on multiple partitions SELECT array_agg(c13 ORDER BY c1) FROM aggregate_test_100; # csv_query_array_cube_agg_with_overflow diff --git a/datafusion/core/tests/sqllogictests/test_files/explain.slt b/datafusion/core/tests/sqllogictests/test_files/explain.slt index 75002fecb1a2..6a9d07aba7ad 100644 --- a/datafusion/core/tests/sqllogictests/test_files/explain.slt +++ b/datafusion/core/tests/sqllogictests/test_files/explain.slt @@ -208,8 +208,8 @@ physical_plan after PipelineFixer SAME TEXT AS ABOVE physical_plan after repartition SAME TEXT AS ABOVE physical_plan after global_sort_selection SAME TEXT AS ABOVE physical_plan after EnforceDistribution SAME TEXT AS ABOVE -physical_plan after EnforceSorting SAME TEXT AS ABOVE physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE +physical_plan after EnforceSorting SAME TEXT AS ABOVE physical_plan after coalesce_batches SAME TEXT AS ABOVE physical_plan after PipelineChecker SAME TEXT AS ABOVE physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt b/datafusion/core/tests/sqllogictests/test_files/groupby.slt index 08523648dce0..9868e684ca10 100644 --- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt +++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt @@ -2384,3 +2384,208 @@ SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.country DESC, s.amount DESC) AS FRA [200.0, 50.0] 250 GRC [80.0, 30.0] 110 TUR [100.0, 75.0] 175 + +# test_reverse_aggregate_expr +# Some of the Aggregators can be reversed, by this way we can still run aggregators without re-ordering +# that have contradictory requirements at first glance. +query TT +EXPLAIN SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts, + FIRST_VALUE(amount ORDER BY amount ASC) AS fv1, + LAST_VALUE(amount ORDER BY amount DESC) AS fv2 + FROM sales_global + GROUP BY country +---- +logical_plan +Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST] AS amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST] AS fv2 +--Aggregate: groupBy=[[sales_global.country]], aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]]] +----TableScan: sales_global projection=[country, amount] +physical_plan +ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2] +--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +----SortExec: expr=[amount@1 DESC] +------MemoryExec: partitions=1, partition_sizes=[1] + +query T?RR +SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts, + FIRST_VALUE(amount ORDER BY amount ASC) AS fv1, + LAST_VALUE(amount ORDER BY amount DESC) AS fv2 + FROM sales_global + GROUP BY country +---- +FRA [200.0, 50.0] 50 50 +TUR [100.0, 75.0] 75 75 +GRC [80.0, 30.0] 30 30 + +# test_reverse_aggregate_expr2 +# Some of the Aggregators can be reversed, by this way we can still run aggregators without re-ordering +# that have contradictory requirements at first glance. +query TT +EXPLAIN SELECT country, ARRAY_AGG(amount ORDER BY amount ASC) AS amounts, + FIRST_VALUE(amount ORDER BY amount ASC) AS fv1, + LAST_VALUE(amount ORDER BY amount DESC) AS fv2 + FROM sales_global + GROUP BY country +---- +logical_plan +Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST] AS fv2 +--Aggregate: groupBy=[[sales_global.country]], aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]]] +----TableScan: sales_global projection=[country, amount] +physical_plan +ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2] +--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)] +----SortExec: expr=[amount@1 ASC NULLS LAST] +------MemoryExec: partitions=1, partition_sizes=[1] + +query T?RR +SELECT country, ARRAY_AGG(amount ORDER BY amount ASC) AS amounts, + FIRST_VALUE(amount ORDER BY amount ASC) AS fv1, + LAST_VALUE(amount ORDER BY amount DESC) AS fv2 + FROM sales_global + GROUP BY country +---- +GRC [30.0, 80.0] 30 30 +FRA [50.0, 200.0] 50 50 +TUR [75.0, 100.0] 75 75 + +# test_reverse_aggregate_expr3 +# Some of the Aggregators can be reversed, by this way we can still run aggregators without re-ordering +# that have contradictory requirements at first glance. This algorithm shouldn't depend +# on the order of the aggregation expressions. +query TT +EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY amount ASC) AS fv1, + LAST_VALUE(amount ORDER BY amount DESC) AS fv2, + ARRAY_AGG(amount ORDER BY amount ASC) AS amounts + FROM sales_global + GROUP BY country +---- +logical_plan +Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST] AS fv2, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS amounts +--Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]] +----TableScan: sales_global projection=[country, amount] +physical_plan +ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@2 as fv2, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@3 as amounts] +--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), ARRAY_AGG(sales_global.amount)] +----SortExec: expr=[amount@1 ASC NULLS LAST] +------MemoryExec: partitions=1, partition_sizes=[1] + +query TRR? +SELECT country, FIRST_VALUE(amount ORDER BY amount ASC) AS fv1, + LAST_VALUE(amount ORDER BY amount DESC) AS fv2, + ARRAY_AGG(amount ORDER BY amount ASC) AS amounts + FROM sales_global + GROUP BY country +---- +GRC 30 30 [30.0, 80.0] +FRA 50 50 [50.0, 200.0] +TUR 75 75 [75.0, 100.0] + +# test_reverse_aggregate_expr4 +# Ordering requirement by the ordering insensitive aggregators shouldn't have effect on +# final plan. Hence seemingly conflicting requirements by SUM and ARRAY_AGG shouldn't raise error. +query TT +EXPLAIN SELECT country, SUM(amount ORDER BY ts DESC) AS sum1, + ARRAY_AGG(amount ORDER BY amount ASC) AS amounts + FROM sales_global + GROUP BY country +---- +logical_plan +Projection: sales_global.country, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS sum1, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS amounts +--Aggregate: groupBy=[[sales_global.country]], aggr=[[SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]] +----TableScan: sales_global projection=[country, ts, amount] +physical_plan +ProjectionExec: expr=[country@0 as country, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as sum1, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as amounts] +--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[SUM(sales_global.amount), ARRAY_AGG(sales_global.amount)] +----SortExec: expr=[amount@2 ASC NULLS LAST] +------MemoryExec: partitions=1, partition_sizes=[1] + +query TR? +SELECT country, SUM(amount ORDER BY ts DESC) AS sum1, + ARRAY_AGG(amount ORDER BY amount ASC) AS amounts + FROM sales_global + GROUP BY country +---- +GRC 110 [30.0, 80.0] +FRA 250 [50.0, 200.0] +TUR 175 [75.0, 100.0] + +# test_reverse_aggregate_expr5 +# If all of the ordering sensitive aggregation functions are reversible +# we should be able to reverse requirements, if this helps to remove a SortExec. +# Hence in query below, FIRST_VALUE, and LAST_VALUE should be reversed to calculate its result according to `ts ASC` ordering. +# Please note that after `ts ASC` ordering because of inner query. There is no SortExec in the final plan. +query TT +EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1, + LAST_VALUE(amount ORDER BY ts DESC) as lv1, + SUM(amount ORDER BY ts DESC) as sum1 + FROM (SELECT * + FROM sales_global + ORDER BY ts ASC) + GROUP BY country +---- +logical_plan +Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS sum1 +--Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]] +----Sort: sales_global.ts ASC NULLS LAST +------TableScan: sales_global projection=[country, ts, amount] +physical_plan +ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 as sum1] +--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[LAST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), SUM(sales_global.amount)] +----SortExec: expr=[ts@1 ASC NULLS LAST] +------MemoryExec: partitions=1, partition_sizes=[1] + +query TRRR +SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1, + LAST_VALUE(amount ORDER BY ts DESC) as lv1, + SUM(amount ORDER BY ts DESC) as sum1 + FROM (SELECT * + FROM sales_global + ORDER BY ts ASC) + GROUP BY country +---- +GRC 80 30 110 +FRA 200 50 250 +TUR 100 75 175 + +# If existing ordering doesn't satisfy requirement, we should do calculations +# on naive requirement (by convention, otherwise the final plan will be unintuitive), +# even if reverse ordering is possible. +# hence, below query should add `SortExec(ts DESC)` to the final plan. +query TT +EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1, + LAST_VALUE(amount ORDER BY ts DESC) as lv1, + SUM(amount ORDER BY ts DESC) as sum1 + FROM sales_global + GROUP BY country +---- +logical_plan +Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS sum1 +--Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]] +----TableScan: sales_global projection=[country, ts, amount] +physical_plan +ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 as sum1] +--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), SUM(sales_global.amount)] +----SortExec: expr=[ts@1 DESC] +------MemoryExec: partitions=1, partition_sizes=[1] + +query TRRR +SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1, + LAST_VALUE(amount ORDER BY ts DESC) as lv1, + SUM(amount ORDER BY ts DESC) as sum1 + FROM sales_global + GROUP BY country +---- +TUR 100 75 175 +GRC 80 30 110 +FRA 200 50 250 + +# Run order-sensitive aggregators in multiple partitions +statement ok +set datafusion.execution.target_partitions = 2; + +# Currently, we do not support running order-sensitive aggregators in multiple partitions. +statement error This feature is not implemented: Order-sensitive aggregators is not supported on multiple partitions +SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts, + FIRST_VALUE(amount ORDER BY amount ASC) AS fv1, + LAST_VALUE(amount ORDER BY amount DESC) AS fv2 + FROM sales_global + GROUP BY country diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs b/datafusion/physical-expr/src/aggregate/first_last.rs index 5dd9620ce0a6..a350637c4882 100644 --- a/datafusion/physical-expr/src/aggregate/first_last.rs +++ b/datafusion/physical-expr/src/aggregate/first_last.rs @@ -84,9 +84,14 @@ impl AggregateExpr for FirstValue { } fn reverse_expr(&self) -> Option> { + let name = if self.name.starts_with("FIRST") { + format!("LAST{}", &self.name[5..]) + } else { + format!("LAST_VALUE({})", self.expr) + }; Some(Arc::new(LastValue::new( self.expr.clone(), - self.name.clone(), + name, self.data_type.clone(), ))) } @@ -214,9 +219,14 @@ impl AggregateExpr for LastValue { } fn reverse_expr(&self) -> Option> { + let name = if self.name.starts_with("LAST") { + format!("FIRST{}", &self.name[4..]) + } else { + format!("FIRST_VALUE({})", self.expr) + }; Some(Arc::new(FirstValue::new( self.expr.clone(), - self.name.clone(), + name, self.data_type.clone(), ))) } diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index 8da635cfb2ea..09fd9bcfc524 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -16,6 +16,7 @@ // under the License. use crate::aggregate::row_accumulator::RowAccumulator; +use crate::expressions::{ArrayAgg, FirstValue, LastValue}; use crate::PhysicalExpr; use arrow::datatypes::Field; use datafusion_common::{DataFusionError, Result}; @@ -130,3 +131,13 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq { ))) } } + +/// Checks whether the given aggregate expression is order-sensitive. +/// For instance, a `SUM` aggregation doesn't depend on the order of its inputs. +/// However, a `FirstValue` depends on the input ordering (if the order changes, +/// the first value in the list would change). +pub fn is_order_sensitive(aggr_expr: &Arc) -> bool { + aggr_expr.as_any().is::() + || aggr_expr.as_any().is::() + || aggr_expr.as_any().is::() +} diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index b54bcda601c7..494f35566dc2 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -56,9 +56,11 @@ pub use equivalence::{ pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr, PhysicalExprRef}; pub use planner::create_physical_expr; pub use scalar_function::ScalarFunctionExpr; -pub use sort_expr::{LexOrdering, PhysicalSortExpr, PhysicalSortRequirement}; +pub use sort_expr::{ + LexOrdering, LexOrderingReq, PhysicalSortExpr, PhysicalSortRequirement, +}; pub use utils::{ expr_list_eq_any_order, expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, normalize_out_expr_with_columns_map, - split_conjunction, + reverse_order_bys, split_conjunction, }; diff --git a/datafusion/physical-expr/src/sort_expr.rs b/datafusion/physical-expr/src/sort_expr.rs index 665a47e586e2..dc93b67fa655 100644 --- a/datafusion/physical-expr/src/sort_expr.rs +++ b/datafusion/physical-expr/src/sort_expr.rs @@ -214,5 +214,8 @@ fn to_str(options: &SortOptions) -> &str { } } -/// `LexOrdering` is a type alias for lexicographical ordering definition `Vec` +///`LexOrdering` is a type alias for lexicographical ordering definition`Vec` pub type LexOrdering = Vec; + +///`LexOrderingReq` is a type alias for lexicographical ordering requirement definition`Vec` +pub type LexOrderingReq = Vec; diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index d9009ca31ec2..9c28243bedba 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -679,6 +679,44 @@ pub fn reassign_predicate_columns( }) } +/// Reverses the ORDER BY expression, which is useful during equivalent window +/// expression construction. For instance, 'ORDER BY a ASC, NULLS LAST' turns into +/// 'ORDER BY a DESC, NULLS FIRST'. +pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr]) -> Vec { + order_bys + .iter() + .map(|e| PhysicalSortExpr { + expr: e.expr.clone(), + options: !e.options, + }) + .collect() +} + +/// Find the finer requirement among `req1` and `req2` +/// If `None`, this means that `req1` and `req2` are not compatible +/// e.g there is no requirement that satisfies both +pub fn get_finer_ordering< + 'a, + F: Fn() -> EquivalenceProperties, + F2: Fn() -> OrderingEquivalenceProperties, +>( + req1: &'a [PhysicalSortExpr], + req2: &'a [PhysicalSortExpr], + eq_properties: F, + ordering_eq_properties: F2, +) -> Option<&'a [PhysicalSortExpr]> { + if ordering_satisfy_concrete(req1, req2, &eq_properties, &ordering_eq_properties) { + // Finer requirement is `provided`, since it satisfies the other: + return Some(req1); + } + if ordering_satisfy_concrete(req2, req1, &eq_properties, &ordering_eq_properties) { + // Finer requirement is `req`, since it satisfies the other: + return Some(req2); + } + // Neither `provided` nor `req` satisfies one another, they are incompatible. + None +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index 95fd86148ac2..d0173949dd17 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -29,11 +29,13 @@ use datafusion_common::ScalarValue; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::{Accumulator, WindowFrame}; -use crate::window::window_expr::{reverse_order_bys, AggregateWindowExpr}; +use crate::window::window_expr::AggregateWindowExpr; use crate::window::{ PartitionBatches, PartitionWindowAggStates, SlidingAggregateWindowExpr, WindowExpr, }; -use crate::{expressions::PhysicalSortExpr, AggregateExpr, PhysicalExpr}; +use crate::{ + expressions::PhysicalSortExpr, reverse_order_bys, AggregateExpr, PhysicalExpr, +}; /// A window expr that takes the form of an aggregate function /// Aggregate Window Expressions that have the form diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/built_in.rs index 1576e8e9c403..59674257edf5 100644 --- a/datafusion/physical-expr/src/window/built_in.rs +++ b/datafusion/physical-expr/src/window/built_in.rs @@ -25,12 +25,12 @@ use super::window_frame_state::WindowFrameContext; use super::BuiltInWindowFunctionExpr; use super::WindowExpr; use crate::window::window_expr::{ - reverse_order_bys, BuiltinWindowState, NthValueKind, NthValueState, WindowFn, + BuiltinWindowState, NthValueKind, NthValueState, WindowFn, }; use crate::window::{ PartitionBatches, PartitionWindowAggStates, WindowAggState, WindowState, }; -use crate::{expressions::PhysicalSortExpr, PhysicalExpr}; +use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr}; use arrow::array::{new_empty_array, Array, ArrayRef}; use arrow::compute::SortOptions; use arrow::datatypes::Field; diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs index 7fa33d71ca44..8ce3f42bea60 100644 --- a/datafusion/physical-expr/src/window/sliding_aggregate.rs +++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs @@ -28,11 +28,13 @@ use arrow::record_batch::RecordBatch; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::{Accumulator, WindowFrame}; -use crate::window::window_expr::{reverse_order_bys, AggregateWindowExpr}; +use crate::window::window_expr::AggregateWindowExpr; use crate::window::{ PartitionBatches, PartitionWindowAggStates, PlainAggregateWindowExpr, WindowExpr, }; -use crate::{expressions::PhysicalSortExpr, AggregateExpr, PhysicalExpr}; +use crate::{ + expressions::PhysicalSortExpr, reverse_order_bys, AggregateExpr, PhysicalExpr, +}; /// A window expr that takes the form of an aggregate function /// Aggregate Window Expressions that have the form diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs index 140ce67f4a03..ec0d929b7dd4 100644 --- a/datafusion/physical-expr/src/window/window_expr.rs +++ b/datafusion/physical-expr/src/window/window_expr.rs @@ -253,19 +253,6 @@ pub trait AggregateWindowExpr: WindowExpr { } } -/// Reverses the ORDER BY expression, which is useful during equivalent window -/// expression construction. For instance, 'ORDER BY a ASC, NULLS LAST' turns into -/// 'ORDER BY a DESC, NULLS FIRST'. -pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr]) -> Vec { - order_bys - .iter() - .map(|e| PhysicalSortExpr { - expr: e.expr.clone(), - options: !e.options, - }) - .collect() -} - #[derive(Debug)] pub enum WindowFn { Builtin(Box),