diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 5df8e20ea606..436bce5952bd 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -630,6 +630,9 @@ pub struct ExecutionConfig { /// Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel /// using the provided `concurrency` level pub repartition_aggregations: bool, + /// Should DataFusion repartition data using the partition keys to execute window functions in + /// parallel using the provided `concurrency` level + pub repartition_windows: bool, } impl Default for ExecutionConfig { @@ -659,6 +662,7 @@ impl Default for ExecutionConfig { information_schema: false, repartition_joins: true, repartition_aggregations: true, + repartition_windows: true, } } } @@ -749,11 +753,18 @@ impl ExecutionConfig { self.repartition_joins = enabled; self } + /// Enables or disables the use of repartitioning for aggregations to improve parallelism pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self { self.repartition_aggregations = enabled; self } + + /// Enables or disables the use of repartitioning for window functions to improve parallelism + pub fn with_repartition_windows(mut self, enabled: bool) -> Self { + self.repartition_windows = enabled; + self + } } /// Holds per-execution properties and data (such as starting timestamps, etc). diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index c3bb9a80136f..75f15653ba46 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -44,7 +44,7 @@ use crate::physical_plan::{ }; use crate::prelude::JoinType; use crate::scalar::ScalarValue; -use crate::sql::utils::generate_sort_key; +use crate::sql::utils::{generate_sort_key, window_expr_common_partition_keys}; use crate::variable::VarType; use crate::{ error::{DataFusionError, Result}, @@ -264,6 +264,38 @@ impl DefaultPhysicalPlanner { "Impossibly got empty window expression".to_owned(), )); } + + let input_exec = self.create_initial_plan(input, ctx_state)?; + + // at this moment we are guaranteed by the logical planner + // to have all the window_expr to have equal sort key + let partition_keys = window_expr_common_partition_keys(window_expr)?; + + let can_repartition = !partition_keys.is_empty() + && ctx_state.config.concurrency > 1 + && ctx_state.config.repartition_windows; + + let input_exec = if can_repartition { + let partition_keys = partition_keys + .iter() + .map(|e| { + self.create_physical_expr( + e, + input.schema(), + &input_exec.schema(), + ctx_state, + ) + }) + .collect::>>>()?; + Arc::new(RepartitionExec::try_new( + input_exec, + Partitioning::Hash(partition_keys, ctx_state.config.concurrency), + )?) + } else { + input_exec + }; + + // add a sort phase let get_sort_keys = |expr: &Expr| match expr { Expr::WindowFunction { ref partition_by, @@ -272,7 +304,6 @@ impl DefaultPhysicalPlanner { } => generate_sort_key(partition_by, order_by), _ => unreachable!(), }; - let sort_keys = get_sort_keys(&window_expr[0]); if window_expr.len() > 1 { debug_assert!( @@ -283,7 +314,6 @@ impl DefaultPhysicalPlanner { ); } - let input_exec = self.create_initial_plan(input, ctx_state)?; let logical_input_schema = input.schema(); let input_exec = if sort_keys.is_empty() { @@ -310,7 +340,11 @@ impl DefaultPhysicalPlanner { _ => unreachable!(), }) .collect::>>()?; - Arc::new(SortExec::try_new(sort_keys, input_exec)?) + Arc::new(if can_repartition { + SortExec::new_with_partitioning(sort_keys, input_exec, true) + } else { + SortExec::try_new(sort_keys, input_exec)? + }) }; let physical_input_schema = input_exec.schema(); diff --git a/datafusion/src/physical_plan/windows.rs b/datafusion/src/physical_plan/windows.rs index 89263767c72a..cd603fd5134e 100644 --- a/datafusion/src/physical_plan/windows.rs +++ b/datafusion/src/physical_plan/windows.rs @@ -404,11 +404,22 @@ impl ExecutionPlan for WindowAggExec { /// Get the output partitioning of this plan fn output_partitioning(&self) -> Partitioning { - Partitioning::UnknownPartitioning(1) + // because we can have repartitioning using the partition keys + // this would be either 1 or more than 1 depending on the presense of + // repartitioning + self.input.output_partitioning() } fn required_child_distribution(&self) -> Distribution { - Distribution::SinglePartition + if self + .window_expr() + .iter() + .all(|expr| expr.partition_by().is_empty()) + { + Distribution::SinglePartition + } else { + Distribution::UnspecifiedDistribution + } } fn with_new_children( @@ -428,22 +439,7 @@ impl ExecutionPlan for WindowAggExec { } async fn execute(&self, partition: usize) -> Result { - if 0 != partition { - return Err(DataFusionError::Internal(format!( - "WindowAggExec invalid partition {}", - partition - ))); - } - - // window needs to operate on a single partition currently - if 1 != self.input.output_partitioning().partition_count() { - return Err(DataFusionError::Internal( - "WindowAggExec requires a single input partition".to_owned(), - )); - } - let input = self.input.execute(partition).await?; - let stream = Box::pin(WindowAggStream::new( self.schema.clone(), self.window_expr.clone(), @@ -580,38 +576,6 @@ mod tests { Ok((input, schema)) } - #[tokio::test] - async fn window_function_input_partition() -> Result<()> { - let (input, schema) = create_test_schema(4)?; - - let window_exec = Arc::new(WindowAggExec::try_new( - vec![create_window_expr( - &WindowFunction::AggregateFunction(AggregateFunction::Count), - "count".to_owned(), - &[col("c3", &schema)?], - &[], - &[], - Some(WindowFrame::default()), - schema.as_ref(), - )?], - input, - schema.clone(), - )?); - - let result = collect(window_exec).await; - - assert!(result.is_err()); - if let Some(DataFusionError::Internal(msg)) = result.err() { - assert_eq!( - msg, - "WindowAggExec requires a single input partition".to_owned() - ); - } else { - unreachable!("Expect an internal error to happen"); - } - Ok(()) - } - #[tokio::test] async fn window_function() -> Result<()> { let (input, schema) = create_test_schema(1)?; diff --git a/datafusion/src/sql/utils.rs b/datafusion/src/sql/utils.rs index 080f84ef10ed..28243360c412 100644 --- a/datafusion/src/sql/utils.rs +++ b/datafusion/src/sql/utils.rs @@ -462,6 +462,30 @@ pub(crate) fn generate_sort_key( sort_key } +/// given a slice of window expressions sharing the same sort key, find their common partition +/// keys. +pub(crate) fn window_expr_common_partition_keys( + window_exprs: &[Expr], +) -> Result<&[Expr]> { + let all_partition_keys = window_exprs + .iter() + .map(|expr| match expr { + Expr::WindowFunction { partition_by, .. } => Ok(partition_by), + expr => Err(DataFusionError::Execution(format!( + "Impossibly got non-window expr {:?}", + expr + ))), + }) + .collect::>>()?; + let result = all_partition_keys + .iter() + .min_by_key(|s| s.len()) + .ok_or_else(|| { + DataFusionError::Execution("No window expressions found".to_owned()) + })?; + Ok(result) +} + /// group a slice of window expression expr by their order by expressions pub(crate) fn group_window_expr_by_sort_keys( window_expr: &[Expr],