From a61d9ee255ef1b7fac8fcff4a193e09d8da4db49 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Wed, 16 Jun 2021 21:23:05 +0800 Subject: [PATCH] allow window aggr to be parallelizable --- datafusion/src/execution/context.rs | 11 +++++++++++ datafusion/src/logical_plan/builder.rs | 8 -------- datafusion/src/physical_plan/planner.rs | 4 ++-- datafusion/src/physical_plan/windows.rs | 22 +++++----------------- datafusion/src/sql/utils.rs | 13 ++++++++----- 5 files changed, 26 insertions(+), 32 deletions(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index b42695b0c4c64..fffc6e728bb75 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -640,6 +640,9 @@ pub struct ExecutionConfig { /// Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel /// using the provided `concurrency` level pub repartition_aggregations: bool, + /// Should DataFusion repartition data using the partition keys to execute window functions in + /// parallel using the provided `concurrency` level + pub repartition_windows: bool, } impl Default for ExecutionConfig { @@ -668,6 +671,7 @@ impl Default for ExecutionConfig { information_schema: false, repartition_joins: true, repartition_aggregations: true, + repartition_windows: true, } } } @@ -758,11 +762,18 @@ impl ExecutionConfig { self.repartition_joins = enabled; self } + /// Enables or disables the use of repartitioning for aggregations to improve parallelism pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self { self.repartition_aggregations = enabled; self } + + /// Enables or disables the use of repartitioning for window functions to improve parallelism + pub fn with_repartition_windows(mut self, enabled: bool) -> Self { + self.repartition_windows = enabled; + self + } } /// Holds per-execution properties and data (such as starting timestamps, etc). diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 6bd5181050fd6..c92121aa195d0 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -289,14 +289,6 @@ impl LogicalPlanBuilder { } /// Apply a window - /// - /// NOTE: this feature is under development and this API will be changing - /// - /// - https://github.com/apache/arrow-datafusion/issues/359 basic structure - /// - https://github.com/apache/arrow-datafusion/issues/298 empty over clause - /// - https://github.com/apache/arrow-datafusion/issues/299 with partition clause - /// - https://github.com/apache/arrow-datafusion/issues/360 with order by - /// - https://github.com/apache/arrow-datafusion/issues/361 with window frame pub fn window(&self, window_expr: Vec) -> Result { let all_expr = window_expr.iter(); validate_unique_names("Windows", all_expr.clone(), self.plan.schema())?; diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index fa6598c5488f6..59f09d06a039f 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -158,7 +158,7 @@ impl DefaultPhysicalPlanner { let can_repartition = !partition_keys.is_empty() && ctx_state.config.concurrency > 1 - && ctx_state.config.repartition_aggregations; + && ctx_state.config.repartition_windows; let input_exec = if can_repartition { let partition_keys = partition_keys @@ -221,7 +221,7 @@ impl DefaultPhysicalPlanner { .map(|e| { self.create_window_expr( e, - &logical_input_schema, + logical_input_schema, &physical_input_schema, ctx_state, ) diff --git a/datafusion/src/physical_plan/windows.rs b/datafusion/src/physical_plan/windows.rs index 466cc51b447d0..ead66b024ca41 100644 --- a/datafusion/src/physical_plan/windows.rs +++ b/datafusion/src/physical_plan/windows.rs @@ -412,11 +412,14 @@ impl ExecutionPlan for WindowAggExec { /// Get the output partitioning of this plan fn output_partitioning(&self) -> Partitioning { - Partitioning::UnknownPartitioning(1) + // because we can have repartitioning using the partition keys + // this would be either 1 or more than 1 depending on the presense of + // repartitioning + self.input.output_partitioning() } fn required_child_distribution(&self) -> Distribution { - Distribution::SinglePartition + Distribution::UnspecifiedDistribution } fn with_new_children( @@ -436,22 +439,7 @@ impl ExecutionPlan for WindowAggExec { } async fn execute(&self, partition: usize) -> Result { - if 0 != partition { - return Err(DataFusionError::Internal(format!( - "WindowAggExec invalid partition {}", - partition - ))); - } - - // window needs to operate on a single partition currently - if 1 != self.input.output_partitioning().partition_count() { - return Err(DataFusionError::Internal( - "WindowAggExec requires a single input partition".to_owned(), - )); - } - let input = self.input.execute(partition).await?; - let stream = Box::pin(WindowAggStream::new( self.schema.clone(), self.window_expr.clone(), diff --git a/datafusion/src/sql/utils.rs b/datafusion/src/sql/utils.rs index 74ad69576f0f9..838a8ce8f2382 100644 --- a/datafusion/src/sql/utils.rs +++ b/datafusion/src/sql/utils.rs @@ -463,7 +463,7 @@ pub(crate) fn generate_sort_key( /// keys. pub(crate) fn window_expr_common_partition_keys( window_exprs: &[Expr], -) -> Result> { +) -> Result<&[Expr]> { let all_partition_keys = window_exprs .iter() .map(|expr| match expr { @@ -474,10 +474,13 @@ pub(crate) fn window_expr_common_partition_keys( ))), }) .collect::>>()?; - let result = all_partition_keys.iter().min_by_key(|s| s.len()).ok_or( - DataFusionError::Execution("No window expressions found".to_owned()), - )?; - Ok(result.to_vec()) + let result = all_partition_keys + .iter() + .min_by_key(|s| s.len()) + .ok_or_else(|| { + DataFusionError::Execution("No window expressions found".to_owned()) + })?; + Ok(result) } /// group a slice of window expression expr by their order by expressions