Skip to content

Commit

Permalink
[Enhancement] Don't repartition ProjectionExec when it does not compu…
Browse files Browse the repository at this point in the history
…te anything

ProjectionExec can have the following two types of computations:
1. reorder/rename
2. other computations like col1 + col2
For reorder/rename, ProjectionExec will not benefit from repartition,
we should disable the repartition if all exprs are reorder and rename.

In this pr, we introduce `would_benefit` to ProjectionExec, if it is true,
then ProjectionExec would benefit from partitions, benefits_from_input_partitioning in
ProjectionExec will return true. Otherwise, benefits_from_input_partitioning will return
false. would_benefit will be false if only if all exprs are column_expr.

Signed-off-by: xyz <a997647204@gmail.com>
  • Loading branch information
xiaoyong-z committed Jan 26, 2023
1 parent 2aa1490 commit f1b6a48
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 0 deletions.
60 changes: 60 additions & 0 deletions datafusion/core/src/physical_plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ pub struct ProjectionExec {
alias_map: HashMap<Column, Vec<Column>>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// If 'would_benefit` is true, ProjectionExec would benefit
/// from additional partitions and thus repatitioning is considered.
would_benefit: bool,
}

impl ProjectionExec {
Expand All @@ -69,6 +72,13 @@ impl ProjectionExec {
expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
input: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
let mut all_column_expr = true;
// If expressions are all column_expr, then projection would not benefit
// from the repartition, this is because all computations in this projection
// are reorder or rename.
for (e, _) in expr.iter() {
all_column_expr &= e.is_column_expr()
}
let input_schema = input.schema();

let fields: Result<Vec<Field>> = expr
Expand Down Expand Up @@ -134,6 +144,7 @@ impl ProjectionExec {
output_ordering,
alias_map,
metrics: ExecutionPlanMetricsSet::new(),
would_benefit: !all_column_expr,
})
}

Expand Down Expand Up @@ -221,6 +232,10 @@ impl ExecutionPlan for ProjectionExec {
)?))
}

fn benefits_from_input_partitioning(&self) -> bool {
self.would_benefit
}

fn execute(
&self,
partition: usize,
Expand Down Expand Up @@ -384,8 +399,21 @@ mod tests {
use crate::scalar::ScalarValue;
use crate::test::{self};
use crate::test_util;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::binary;
use futures::future;

// Create a binary expression without coercion. Used here when we do not want to coerce the expressions
// to valid types. Usage can result in an execution (after plan) error.
fn binary_simple(
l: Arc<dyn PhysicalExpr>,
op: Operator,
r: Arc<dyn PhysicalExpr>,
input_schema: &Schema,
) -> Arc<dyn PhysicalExpr> {
binary(l, op, r, input_schema).unwrap()
}

#[tokio::test]
async fn project_first_column() -> Result<()> {
let session_ctx = SessionContext::new();
Expand Down Expand Up @@ -425,6 +453,38 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn project_input_not_partitioning() -> Result<()> {
let schema = test_util::aggr_test_schema();

let partitions = 4;
let csv = test::scan_partitioned_csv(partitions)?;

// pick column c1 and name it column c1 in the output schema
let projection =
ProjectionExec::try_new(vec![(col("c1", &schema)?, "c1".to_string())], csv)?;
assert!(!projection.would_benefit);
Ok(())
}

#[tokio::test]
async fn project_input_partitioning() -> Result<()> {
let schema = test_util::aggr_test_schema();

let partitions = 4;
let csv = test::scan_partitioned_csv(partitions)?;

let c1 = col("c2", &schema).unwrap();
let c2 = col("c9", &schema).unwrap();
let c1_plus_c2 = binary_simple(c1, Operator::Plus, c2, &schema);

let projection =
ProjectionExec::try_new(vec![(c1_plus_c2, "c2 + c9".to_string())], csv)?;

assert!(projection.would_benefit);
Ok(())
}

#[tokio::test]
async fn project_no_column() -> Result<()> {
let session_ctx = SessionContext::new();
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-expr/src/expressions/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ impl PhysicalExpr for Column {
let col_bounds = context.column_boundaries[self.index].clone();
context.with_boundaries(col_bounds)
}

/// indict whether this expr is a column expr
fn is_column_expr(&self) -> bool {
true
}
}

impl PartialEq<dyn Any> for Column {
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-expr/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq<dyn Any> {
fn analyze(&self, context: AnalysisContext) -> AnalysisContext {
context
}

/// indict whether this expr is a column expr
fn is_column_expr(&self) -> bool {
false
}
}

/// The shared context used during the analysis of an expression. Includes
Expand Down

0 comments on commit f1b6a48

Please sign in to comment.