Skip to content

Commit

Permalink
Remove ExecutionProps dependency from OptimizerRule (#2666)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Jun 1, 2022
1 parent 340bfb1 commit 9135251
Show file tree
Hide file tree
Showing 13 changed files with 145 additions and 109 deletions.
10 changes: 6 additions & 4 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use crate::logical_plan::{
use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
use crate::optimizer::filter_push_down::FilterPushDown;
use crate::optimizer::limit_push_down::LimitPushDown;
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule};
use crate::optimizer::projection_push_down::ProjectionPushDown;
use crate::optimizer::simplify_expressions::SimplifyExpressions;
use crate::optimizer::single_distinct_to_groupby::SingleDistinctToGroupBy;
Expand Down Expand Up @@ -1379,15 +1379,17 @@ impl SessionState {

/// Optimizes the logical plan by applying optimizer rules.
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
let execution_props = &mut self.execution_props.clone();
let mut optimizer_config = OptimizerConfig::new();
optimizer_config.query_execution_start_time =
self.execution_props.query_execution_start_time;

if let LogicalPlan::Explain(e) = plan {
let mut stringified_plans = e.stringified_plans.clone();

// optimize the child plan, capturing the output of each optimizer
let plan = self.optimizer.optimize(
e.plan.as_ref(),
execution_props,
&optimizer_config,
|optimized_plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
Expand All @@ -1402,7 +1404,7 @@ impl SessionState {
schema: e.schema.clone(),
}))
} else {
self.optimizer.optimize(plan, execution_props, |_, _| {})
self.optimizer.optimize(plan, &optimizer_config, |_, _| {})
}
}

Expand Down
29 changes: 16 additions & 13 deletions datafusion/core/src/optimizer/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
//! Eliminate common sub-expression.

use crate::error::Result;
use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::{Filter, Projection, Window};
use crate::logical_plan::{
col,
plan::{Aggregate, Sort},
DFField, DFSchema, Expr, ExprRewritable, ExprRewriter, ExprSchemable, ExprVisitable,
ExpressionVisitor, LogicalPlan, Recursion, RewriteRecursion,
};
use crate::optimizer::optimizer::OptimizerConfig;
use crate::optimizer::optimizer::OptimizerRule;
use arrow::datatypes::DataType;
use datafusion_expr::expr::GroupingSet;
Expand Down Expand Up @@ -60,9 +60,9 @@ impl OptimizerRule for CommonSubexprEliminate {
fn optimize(
&self,
plan: &LogicalPlan,
execution_props: &ExecutionProps,
optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
optimize(plan, execution_props)
optimize(plan, optimizer_config)
}

fn name(&self) -> &str {
Expand All @@ -83,7 +83,10 @@ impl CommonSubexprEliminate {
}
}

fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<LogicalPlan> {
fn optimize(
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
let mut expr_set = ExprSet::new();

match plan {
Expand All @@ -101,7 +104,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
input,
&mut expr_set,
schema,
execution_props,
optimizer_config,
)?;

Ok(LogicalPlan::Projection(Projection {
Expand Down Expand Up @@ -135,7 +138,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
input,
&mut expr_set,
input.schema(),
execution_props,
optimizer_config,
)?;

Ok(LogicalPlan::Filter(Filter {
Expand All @@ -156,7 +159,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
input,
&mut expr_set,
schema,
execution_props,
optimizer_config,
)?;

Ok(LogicalPlan::Window(Window {
Expand All @@ -180,7 +183,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
input,
&mut expr_set,
schema,
execution_props,
optimizer_config,
)?;
// note the reversed pop order.
let new_aggr_expr = new_expr.pop().unwrap();
Expand All @@ -202,7 +205,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
input,
&mut expr_set,
input.schema(),
execution_props,
optimizer_config,
)?;

Ok(LogicalPlan::Sort(Sort {
Expand Down Expand Up @@ -235,7 +238,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|input_plan| optimize(input_plan, execution_props))
.map(|input_plan| optimize(input_plan, optimizer_config))
.collect::<Result<Vec<_>>>()?;

from_plan(plan, &expr, &new_inputs)
Expand Down Expand Up @@ -302,7 +305,7 @@ fn rewrite_expr(
input: &LogicalPlan,
expr_set: &mut ExprSet,
schema: &DFSchema,
execution_props: &ExecutionProps,
optimizer_config: &OptimizerConfig,
) -> Result<(Vec<Vec<Expr>>, LogicalPlan)> {
let mut affected_id = HashSet::<Identifier>::new();

Expand All @@ -327,7 +330,7 @@ fn rewrite_expr(
})
.collect::<Result<Vec<_>>>()?;

let mut new_input = optimize(input, execution_props)?;
let mut new_input = optimize(input, optimizer_config)?;
if !affected_id.is_empty() {
new_input = build_project_plan(new_input, affected_id, expr_set)?;
}
Expand Down Expand Up @@ -702,7 +705,7 @@ mod test {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let optimizer = CommonSubexprEliminate {};
let optimized_plan = optimizer
.optimize(plan, &ExecutionProps::new())
.optimize(plan, &OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/optimizer/eliminate_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::logical_plan::plan::Filter;
use crate::logical_plan::{EmptyRelation, LogicalPlan};
use crate::optimizer::optimizer::OptimizerRule;

use crate::execution::context::ExecutionProps;
use crate::optimizer::optimizer::OptimizerConfig;

/// Optimization rule that elimanate the scalar value (true/false) filter with an [LogicalPlan::EmptyRelation]
#[derive(Default)]
Expand All @@ -44,7 +44,7 @@ impl OptimizerRule for EliminateFilter {
fn optimize(
&self,
plan: &LogicalPlan,
execution_props: &ExecutionProps,
optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Filter(Filter {
Expand All @@ -65,7 +65,7 @@ impl OptimizerRule for EliminateFilter {
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|plan| self.optimize(plan, execution_props))
.map(|plan| self.optimize(plan, optimizer_config))
.collect::<Result<Vec<_>>>()?;

from_plan(plan, &plan.expressions(), &new_inputs)
Expand All @@ -88,7 +88,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = EliminateFilter::new();
let optimized_plan = rule
.optimize(plan, &ExecutionProps::new())
.optimize(plan, &OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/optimizer/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::logical_plan::{EmptyRelation, Limit, LogicalPlan};
use crate::optimizer::optimizer::OptimizerRule;
use datafusion_expr::utils::from_plan;

use crate::execution::context::ExecutionProps;
use crate::optimizer::optimizer::OptimizerConfig;

/// Optimization rule that replaces LIMIT 0 with an [LogicalPlan::EmptyRelation]
#[derive(Default)]
Expand All @@ -39,7 +39,7 @@ impl OptimizerRule for EliminateLimit {
fn optimize(
&self,
plan: &LogicalPlan,
execution_props: &ExecutionProps,
optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Limit(Limit { n, input }) if *n == 0 => {
Expand All @@ -56,7 +56,7 @@ impl OptimizerRule for EliminateLimit {
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|plan| self.optimize(plan, execution_props))
.map(|plan| self.optimize(plan, optimizer_config))
.collect::<Result<Vec<_>>>()?;

from_plan(plan, &expr, &new_inputs)
Expand All @@ -79,7 +79,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = EliminateLimit::new();
let optimized_plan = rule
.optimize(plan, &ExecutionProps::new())
.optimize(plan, &OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
10 changes: 4 additions & 6 deletions datafusion/core/src/optimizer/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@

//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan

use crate::{
execution::context::ExecutionProps,
optimizer::{optimizer::OptimizerRule, utils},
};
use crate::optimizer::optimizer::OptimizerConfig;
use crate::optimizer::{optimizer::OptimizerRule, utils};
use datafusion_common::{Column, DFSchema, Result};
use datafusion_expr::{
col,
Expand Down Expand Up @@ -530,7 +528,7 @@ impl OptimizerRule for FilterPushDown {
"filter_push_down"
}

fn optimize(&self, plan: &LogicalPlan, _: &ExecutionProps) -> Result<LogicalPlan> {
fn optimize(&self, plan: &LogicalPlan, _: &OptimizerConfig) -> Result<LogicalPlan> {
optimize(plan, State::default())
}
}
Expand Down Expand Up @@ -578,7 +576,7 @@ mod tests {

fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan {
let rule = FilterPushDown::new();
rule.optimize(plan, &ExecutionProps::new())
rule.optimize(plan, &OptimizerConfig::new())
.expect("failed to optimize plan")
}

Expand Down
Loading

0 comments on commit 9135251

Please sign in to comment.