Skip to content

Commit

Permalink
[MINOR]: Simplify enforce_distribution, minor changes (#7924)
Browse files Browse the repository at this point in the history
* Initial commit

* Simplifications

* Cleanup imports

* Review

---------

Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
  • Loading branch information
mustafasrepo and ozankabak authored Oct 25, 2023
1 parent 12a6316 commit 128d7c6
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 179 deletions.
196 changes: 78 additions & 118 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs

Large diffs are not rendered by default.

22 changes: 11 additions & 11 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

//! EnforceSorting optimizer rule inspects the physical plan with respect
//! to local sorting requirements and does the following:
//! - Adds a [SortExec] when a requirement is not met,
//! - Removes an already-existing [SortExec] if it is possible to prove
//! - Adds a [`SortExec`] when a requirement is not met,
//! - Removes an already-existing [`SortExec`] if it is possible to prove
//! that this sort is unnecessary
//! The rule can work on valid *and* invalid physical plans with respect to
//! sorting requirements, but always produces a valid physical plan in this sense.
Expand Down Expand Up @@ -496,9 +496,10 @@ fn ensure_sorting(
{
// This SortPreservingMergeExec is unnecessary, input already has a
// single partition.
sort_onwards.truncate(1);
return Ok(Transformed::Yes(PlanWithCorrespondingSort {
plan: children[0].clone(),
sort_onwards: vec![sort_onwards[0].clone()],
plan: children.swap_remove(0),
sort_onwards,
}));
}
Ok(Transformed::Yes(PlanWithCorrespondingSort {
Expand Down Expand Up @@ -649,7 +650,7 @@ fn remove_corresponding_coalesce_in_sub_plan(
&& is_repartition(&new_plan)
&& is_repartition(parent)
{
new_plan = new_plan.children()[0].clone()
new_plan = new_plan.children().swap_remove(0)
}
new_plan
} else {
Expand Down Expand Up @@ -689,7 +690,7 @@ fn remove_corresponding_sort_from_sub_plan(
) -> Result<Arc<dyn ExecutionPlan>> {
// A `SortExec` is always at the bottom of the tree.
let mut updated_plan = if is_sort(&sort_onwards.plan) {
sort_onwards.plan.children()[0].clone()
sort_onwards.plan.children().swap_remove(0)
} else {
let plan = &sort_onwards.plan;
let mut children = plan.children();
Expand All @@ -703,12 +704,12 @@ fn remove_corresponding_sort_from_sub_plan(
}
// Replace with variants that do not preserve order.
if is_sort_preserving_merge(plan) {
children[0].clone()
children.swap_remove(0)
} else if let Some(repartition) = plan.as_any().downcast_ref::<RepartitionExec>()
{
Arc::new(
RepartitionExec::try_new(
children[0].clone(),
children.swap_remove(0),
repartition.partitioning().clone(),
)?
.with_preserve_order(false),
Expand All @@ -730,7 +731,7 @@ fn remove_corresponding_sort_from_sub_plan(
updated_plan,
));
} else {
updated_plan = Arc::new(CoalescePartitionsExec::new(updated_plan.clone()));
updated_plan = Arc::new(CoalescePartitionsExec::new(updated_plan));
}
}
Ok(updated_plan)
Expand Down Expand Up @@ -777,8 +778,7 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::Result;
use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::expressions::{col, NotExpr};
use datafusion_physical_expr::expressions::{col, Column, NotExpr};

fn create_test_schema() -> Result<SchemaRef> {
let nullable_column = Field::new("nullable_col", DataType::Int32, true);
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunction};
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use datafusion_physical_plan::windows::PartitionSearchMode;

use async_trait::async_trait;

Expand Down Expand Up @@ -239,7 +240,7 @@ pub fn bounded_window_exec(
.unwrap()],
input.clone(),
vec![],
crate::physical_plan::windows::PartitionSearchMode::Sorted,
PartitionSearchMode::Sorted,
)
.unwrap(),
)
Expand Down
17 changes: 9 additions & 8 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,33 @@ use arrow::compute::{concat_batches, SortOptions};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use hashbrown::HashMap;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};

use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::windows::{
create_window_expr, BoundedWindowAggExec, PartitionSearchMode, WindowAggExec,
};
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::type_coercion::aggregates::coerce_types;
use datafusion_expr::{
AggregateFunction, BuiltInWindowFunction, WindowFrame, WindowFrameBound,
WindowFrameUnits, WindowFunction,
};

use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::type_coercion::aggregates::coerce_types;
use datafusion_physical_expr::expressions::{cast, col, lit};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use test_utils::add_empty_batches;

use hashbrown::HashMap;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};

#[cfg(test)]
mod tests {
use super::*;
use datafusion::physical_plan::windows::PartitionSearchMode::{

use datafusion_physical_plan::windows::PartitionSearchMode::{
Linear, PartiallySorted, Sorted,
};

Expand Down
7 changes: 2 additions & 5 deletions datafusion/physical-expr/src/aggregate/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@ use crate::{
reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr,
};

use arrow::array::ArrayRef;
use arrow::compute;
use arrow::compute::{lexsort_to_indices, SortColumn};
use arrow::array::{Array, ArrayRef, AsArray, BooleanArray};
use arrow::compute::{self, lexsort_to_indices, SortColumn};
use arrow::datatypes::{DataType, Field};
use arrow_array::cast::AsArray;
use arrow_array::{Array, BooleanArray};
use arrow_schema::SortOptions;
use datafusion_common::utils::{compare_rows, get_arrayref_at_indices, get_row_at_idx};
use datafusion_common::{DataFusionError, Result, ScalarValue};
Expand Down
71 changes: 58 additions & 13 deletions datafusion/physical-expr/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::fmt::{Debug, Display};
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use crate::intervals::Interval;
use crate::sort_properties::SortProperties;
use crate::utils::scatter;
Expand All @@ -27,11 +32,6 @@ use datafusion_common::utils::DataPtr;
use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
use datafusion_expr::ColumnarValue;

use std::any::Any;
use std::fmt::{Debug, Display};
use std::hash::{Hash, Hasher};
use std::sync::Arc;

/// Expression that can be evaluated against a RecordBatch
/// A Physical expression knows its type, nullability and how to evaluate itself.
pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq<dyn Any> {
Expand All @@ -54,13 +54,12 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq<dyn Any> {
let tmp_batch = filter_record_batch(batch, selection)?;

let tmp_result = self.evaluate(&tmp_batch)?;
// All values from the `selection` filter are true.

if batch.num_rows() == tmp_batch.num_rows() {
return Ok(tmp_result);
}
if let ColumnarValue::Array(a) = tmp_result {
let result = scatter(selection, a.as_ref())?;
Ok(ColumnarValue::Array(result))
// All values from the `selection` filter are true.
Ok(tmp_result)
} else if let ColumnarValue::Array(a) = tmp_result {
scatter(selection, a.as_ref()).map(ColumnarValue::Array)
} else {
Ok(tmp_result)
}
Expand Down Expand Up @@ -216,8 +215,8 @@ pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
}
}

/// It is similar to contains method of vector.
/// Finds whether `expr` is among `physical_exprs`.
/// This function is similar to the `contains` method of `Vec`. It finds
/// whether `expr` is among `physical_exprs`.
pub fn physical_exprs_contains(
physical_exprs: &[Arc<dyn PhysicalExpr>],
expr: &Arc<dyn PhysicalExpr>,
Expand All @@ -226,3 +225,49 @@ pub fn physical_exprs_contains(
.iter()
.any(|physical_expr| physical_expr.eq(expr))
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use crate::expressions::{Column, Literal};
use crate::physical_expr::{physical_exprs_contains, PhysicalExpr};

use datafusion_common::{Result, ScalarValue};

#[test]
fn test_physical_exprs_contains() -> Result<()> {
let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
as Arc<dyn PhysicalExpr>;
let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
as Arc<dyn PhysicalExpr>;
let lit4 =
Arc::new(Literal::new(ScalarValue::Int32(Some(4)))) as Arc<dyn PhysicalExpr>;
let lit2 =
Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
let lit1 =
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
let col_a_expr = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
let col_c_expr = Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>;

// lit(true), lit(false), lit(4), lit(2), Col(a), Col(b)
let physical_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
lit_true.clone(),
lit_false.clone(),
lit4.clone(),
lit2.clone(),
col_a_expr.clone(),
col_b_expr.clone(),
];
// below expressions are inside physical_exprs
assert!(physical_exprs_contains(&physical_exprs, &lit_true));
assert!(physical_exprs_contains(&physical_exprs, &lit2));
assert!(physical_exprs_contains(&physical_exprs, &col_b_expr));

// below expressions are not inside physical_exprs
assert!(!physical_exprs_contains(&physical_exprs, &col_c_expr));
assert!(!physical_exprs_contains(&physical_exprs, &lit1));
Ok(())
}
}
21 changes: 11 additions & 10 deletions datafusion/physical-expr/src/scalar_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,25 @@
//! This module also has a set of coercion rules to improve user experience: if an argument i32 is passed
//! to a function that supports f64, it is coerced to f64.

use std::any::Any;
use std::fmt::Debug;
use std::fmt::{self, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use crate::functions::out_ordering;
use crate::physical_expr::down_cast_any_ref;
use crate::sort_properties::SortProperties;
use crate::utils::expr_list_eq_strict_order;
use crate::PhysicalExpr;

use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_expr::expr_vec_fmt;
use datafusion_expr::BuiltinScalarFunction;
use datafusion_expr::ColumnarValue;
use datafusion_expr::FuncMonotonicity;
use datafusion_expr::ScalarFunctionImplementation;
use std::any::Any;
use std::fmt::Debug;
use std::fmt::{self, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use datafusion_expr::{
expr_vec_fmt, BuiltinScalarFunction, ColumnarValue, FuncMonotonicity,
ScalarFunctionImplementation,
};

/// Physical expression of a scalar function
pub struct ScalarFunctionExpr {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl PhysicalGroupBy {
}

/// Return grouping expressions as they occur in the output schema.
fn output_exprs(&self) -> Vec<Arc<dyn PhysicalExpr>> {
pub fn output_exprs(&self) -> Vec<Arc<dyn PhysicalExpr>> {
self.expr
.iter()
.enumerate()
Expand Down
12 changes: 1 addition & 11 deletions datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::expressions::PhysicalSortExpr;
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use crate::windows::{
calc_requirements, get_ordered_partition_by_indices, window_ordering_equivalence,
PartitionSearchMode,
};
use crate::{
ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
Expand Down Expand Up @@ -68,17 +69,6 @@ use hashbrown::raw::RawTable;
use indexmap::IndexMap;
use log::debug;

#[derive(Debug, Clone, PartialEq)]
/// Specifies partition column properties in terms of input ordering
pub enum PartitionSearchMode {
/// None of the columns among the partition columns is ordered.
Linear,
/// Some columns of the partition columns are ordered but not all
PartiallySorted(Vec<usize>),
/// All Partition columns are ordered (Also empty case)
Sorted,
}

/// Window execution plan
#[derive(Debug)]
pub struct BoundedWindowAggExec {
Expand Down
12 changes: 11 additions & 1 deletion datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,23 @@ mod bounded_window_agg_exec;
mod window_agg_exec;

pub use bounded_window_agg_exec::BoundedWindowAggExec;
pub use bounded_window_agg_exec::PartitionSearchMode;
pub use window_agg_exec::WindowAggExec;

pub use datafusion_physical_expr::window::{
BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
};

#[derive(Debug, Clone, PartialEq)]
/// Specifies partition column properties in terms of input ordering
pub enum PartitionSearchMode {
/// None of the columns among the partition columns is ordered.
Linear,
/// Some columns of the partition columns are ordered but not all
PartiallySorted(Vec<usize>),
/// All Partition columns are ordered (Also empty case)
Sorted,
}

/// Create a physical expression for window function
pub fn create_window_expr(
fun: &WindowFunction,
Expand Down

0 comments on commit 128d7c6

Please sign in to comment.