Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move PartitionSearchMode into datafusion_physical_plan, rename to InputOrderMode #8364

Merged
merged 6 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::windows::{
get_best_fitting_window, BoundedWindowAggExec, WindowAggExec,
};
use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
use crate::physical_plan::{
with_new_children_if_necessary, Distribution, ExecutionPlan, PartitionSearchMode,
};

use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::{plan_err, DataFusionError};
use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};

use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::windows::PartitionSearchMode;
use itertools::izip;

/// This rule inspects [`SortExec`]'s in the given physical plan and removes the
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::create_window_expr;
use crate::physical_plan::{ExecutionPlan, Partitioning};
use crate::physical_plan::{ExecutionPlan, PartitionSearchMode, Partitioning};
use crate::prelude::{CsvReadOptions, SessionContext};

use arrow_schema::{Schema, SchemaRef, SortOptions};
Expand All @@ -44,7 +44,6 @@ use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunction};
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use datafusion_physical_plan::windows::PartitionSearchMode;

use async_trait::async_trait;

Expand Down
8 changes: 3 additions & 5 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,10 @@ use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::unnest::UnnestExec;
use crate::physical_plan::values::ValuesExec;
use crate::physical_plan::windows::{
BoundedWindowAggExec, PartitionSearchMode, WindowAggExec,
};
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{
aggregates, displayable, udaf, windows, AggregateExpr, ExecutionPlan, Partitioning,
PhysicalExpr, WindowExpr,
aggregates, displayable, udaf, windows, AggregateExpr, ExecutionPlan,
PartitionSearchMode, Partitioning, PhysicalExpr, WindowExpr,
};

use arrow::compute::SortOptions;
Expand Down
8 changes: 3 additions & 5 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use arrow::util::pretty::pretty_format_batches;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::windows::{
create_window_expr, BoundedWindowAggExec, PartitionSearchMode, WindowAggExec,
create_window_expr, BoundedWindowAggExec, WindowAggExec,
};
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::physical_plan::{collect, ExecutionPlan, PartitionSearchMode};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::type_coercion::aggregates::coerce_types;
Expand All @@ -43,9 +43,7 @@ use hashbrown::HashMap;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};

use datafusion_physical_plan::windows::PartitionSearchMode::{
Linear, PartiallySorted, Sorted,
};
use datafusion_physical_plan::PartitionSearchMode::{Linear, PartiallySorted, Sorted};

#[tokio::test(flavor = "multi_thread", worker_threads = 16)]
async fn window_bounded_window_random_comparison() -> Result<()> {
Expand Down
8 changes: 4 additions & 4 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,9 @@ use crate::aggregates::{
};

use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::windows::{
get_ordered_partition_by_indices, get_window_mode, PartitionSearchMode,
};
use crate::windows::{get_ordered_partition_by_indices, get_window_mode};
use crate::{
DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
DisplayFormatType, Distribution, ExecutionPlan, PartitionSearchMode, Partitioning,
SendableRecordBatchStream, Statistics,
};

Expand Down Expand Up @@ -300,7 +298,9 @@ pub struct AggregateExec {
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
required_input_ordering: Option<LexRequirement>,
/// Describes how the input is ordered relative to the group by columns
partition_search_mode: PartitionSearchMode,
/// Describe how the output is ordered
output_ordering: Option<LexOrdering>,
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/aggregates/order/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use datafusion_physical_expr::{EmitTo, PhysicalSortExpr};
mod full;
mod partial;

use crate::windows::PartitionSearchMode;
use crate::PartitionSearchMode;
pub(crate) use full::GroupOrderingFull;
pub(crate) use partial::GroupOrderingPartial;

Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub mod joins;
pub mod limit;
pub mod memory;
pub mod metrics;
mod ordering;
pub mod projection;
pub mod repartition;
pub mod sorts;
Expand All @@ -72,6 +73,7 @@ pub mod windows;

pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
pub use crate::metrics::Metric;
pub use crate::ordering::PartitionSearchMode;
pub use crate::topk::TopK;
pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor};

Expand Down
51 changes: 51 additions & 0 deletions datafusion/physical-plan/src/ordering.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

/// Specifies how the input to an aggregation or window operator is ordered
/// relative to their `GROUP BY` or `PARTITION BY` expressions.
///
/// For example, if the existing ordering is `[a ASC, b ASC, c ASC]`
///
/// ## Window Functions
/// - A `PARTITION BY b` clause can use `Linear` mode.
/// - A `PARTITION BY a, c` or a `PARTITION BY c, a` can use
/// `PartiallySorted([0])` or `PartiallySorted([1])` modes, respectively.
/// (The vector stores the index of `a` in the respective PARTITION BY expression.)
/// - A `PARTITION BY a, b` or a `PARTITION BY b, a` can use `Sorted` mode.
///
/// ## Aggregations
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought making the GROUP BY correspondence made this example clearer, even though there is non trivial redundancy

/// - A `GROUP BY b` clause can use `Linear` mode.
/// - A `GROUP BY a, c` or a `GROUP BY BY c, a` can use
/// `PartiallySorted([0])` or `PartiallySorted([1])` modes, respectively.
/// (The vector stores the index of `a` in the respective PARTITION BY expression.)
/// - A `GROUP BY a, b` or a `GROUP BY b, a` can use `Sorted` mode.
///
/// Note these are the same examples as above, but with `GROUP BY` instead of
/// `PARTITION BY` to make the examples easier to read.
#[derive(Debug, Clone, PartialEq)]
pub enum PartitionSearchMode {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to rename this to something related to ordering rather than Partitioning if possible. Maybe InputOrder or InputOrderMode 🤔

Maybe @ozankabak has some thoughts

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like InputOrderMode 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed in 4f4120e

/// There is no partial permutation of the expressions satisfying the
/// existing ordering.
Linear,
/// There is a partial permutation of the expressions satisfying the
/// existing ordering. Indices describing the longest partial permutation
/// are stored in the vector.
PartiallySorted(Vec<usize>),
/// There is a (full) permutation of the expressions satisfying the
/// existing ordering.
Sorted,
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ use crate::expressions::PhysicalSortExpr;
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use crate::windows::{
calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs,
window_equivalence_properties, PartitionSearchMode,
window_equivalence_properties,
};
use crate::{
ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr,
PartitionSearchMode, Partitioning, RecordBatchStream, SendableRecordBatchStream,
Statistics, WindowExpr,
};

use arrow::{
Expand Down
26 changes: 1 addition & 25 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
cume_dist, dense_rank, lag, lead, percent_rank, rank, Literal, NthValue, Ntile,
PhysicalSortExpr, RowNumber,
},
udaf, unbounded_output, ExecutionPlan, PhysicalExpr,
udaf, unbounded_output, ExecutionPlan, PartitionSearchMode, PhysicalExpr,
};

use arrow::datatypes::Schema;
Expand All @@ -54,30 +54,6 @@ pub use datafusion_physical_expr::window::{
BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
};

#[derive(Debug, Clone, PartialEq)]
/// Specifies aggregation grouping and/or window partitioning properties of a
/// set of expressions in terms of the existing ordering.
/// For example, if the existing ordering is `[a ASC, b ASC, c ASC]`:
/// - A `PARTITION BY b` clause will result in `Linear` mode.
/// - A `PARTITION BY a, c` or a `PARTITION BY c, a` clause will result in
/// `PartiallySorted([0])` or `PartiallySorted([1])` modes, respectively.
/// The vector stores the index of `a` in the respective PARTITION BY expression.
/// - A `PARTITION BY a, b` or a `PARTITION BY b, a` clause will result in
/// `Sorted` mode.
/// Note that the examples above are applicable for `GROUP BY` clauses too.
pub enum PartitionSearchMode {
/// There is no partial permutation of the expressions satisfying the
/// existing ordering.
Linear,
/// There is a partial permutation of the expressions satisfying the
/// existing ordering. Indices describing the longest partial permutation
/// are stored in the vector.
PartiallySorted(Vec<usize>),
/// There is a (full) permutation of the expressions satisfying the
/// existing ordering.
Sorted,
}

/// Create a physical expression for window function
pub fn create_window_expr(
fun: &WindowFunction,
Expand Down
7 changes: 3 additions & 4 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,10 @@ use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion::physical_plan::union::UnionExec;
use datafusion::physical_plan::windows::{
BoundedWindowAggExec, PartitionSearchMode, WindowAggExec,
};
use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use datafusion::physical_plan::{
udaf, AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, WindowExpr,
udaf, AggregateExpr, ExecutionPlan, PartitionSearchMode, Partitioning, PhysicalExpr,
WindowExpr,
};
use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
use prost::bytes::BufMut;
Expand Down