Skip to content

Commit

Permalink
Remove ExecutionProps dependency from OptimizerRule (apache#2666)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored and gandronchik committed Aug 31, 2022
1 parent a3216b7 commit 09f5323
Show file tree
Hide file tree
Showing 13 changed files with 253 additions and 144 deletions.
10 changes: 6 additions & 4 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use crate::logical_plan::{
use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
use crate::optimizer::filter_push_down::FilterPushDown;
use crate::optimizer::limit_push_down::LimitPushDown;
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule};
use crate::optimizer::projection_push_down::ProjectionPushDown;
use crate::optimizer::simplify_expressions::SimplifyExpressions;
use crate::optimizer::single_distinct_to_groupby::SingleDistinctToGroupBy;
Expand Down Expand Up @@ -1258,15 +1258,17 @@ impl SessionState {

/// Optimizes the logical plan by applying optimizer rules.
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
let execution_props = &mut self.execution_props.clone();
let mut optimizer_config = OptimizerConfig::new();
optimizer_config.query_execution_start_time =
self.execution_props.query_execution_start_time;

if let LogicalPlan::Explain(e) = plan {
let mut stringified_plans = e.stringified_plans.clone();

// optimize the child plan, capturing the output of each optimizer
let plan = self.optimizer.optimize(
e.plan.as_ref(),
execution_props,
&optimizer_config,
|optimized_plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
Expand All @@ -1281,7 +1283,7 @@ impl SessionState {
schema: e.schema.clone(),
}))
} else {
self.optimizer.optimize(plan, execution_props, |_, _| {})
self.optimizer.optimize(plan, &optimizer_config, |_, _| {})
}
}

Expand Down
31 changes: 17 additions & 14 deletions datafusion/core/src/optimizer/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
//! Eliminate common sub-expression.

use crate::error::Result;
use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::{Filter, Projection, TableUDFs, Window};
use crate::logical_plan::{
col,
plan::{Aggregate, Sort},
DFField, DFSchema, Expr, ExprRewritable, ExprRewriter, ExprSchemable, ExprVisitable,
ExpressionVisitor, LogicalPlan, Recursion, RewriteRecursion,
};
use crate::optimizer::optimizer::OptimizerConfig;
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
use arrow::datatypes::DataType;
Expand Down Expand Up @@ -59,9 +59,9 @@ impl OptimizerRule for CommonSubexprEliminate {
fn optimize(
&self,
plan: &LogicalPlan,
execution_props: &ExecutionProps,
optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
optimize(plan, execution_props)
optimize(plan, optimizer_config)
}

fn name(&self) -> &str {
Expand All @@ -82,7 +82,10 @@ impl CommonSubexprEliminate {
}
}

fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<LogicalPlan> {
fn optimize(
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
let mut expr_set = ExprSet::new();

match plan {
Expand All @@ -100,7 +103,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
input,
&mut expr_set,
schema,
execution_props,
optimizer_config,
)?;

Ok(LogicalPlan::Projection(Projection {
Expand All @@ -123,7 +126,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
input,
&mut expr_set,
schema,
execution_props,
optimizer_config,
)?;

Ok(LogicalPlan::TableUDFs(TableUDFs {
Expand Down Expand Up @@ -156,7 +159,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
input,
&mut expr_set,
input.schema(),
execution_props,
optimizer_config,
)?;

Ok(LogicalPlan::Filter(Filter {
Expand All @@ -177,7 +180,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
input,
&mut expr_set,
schema,
execution_props,
optimizer_config,
)?;

Ok(LogicalPlan::Window(Window {
Expand All @@ -201,7 +204,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
input,
&mut expr_set,
schema,
execution_props,
optimizer_config,
)?;
// note the reversed pop order.
let new_aggr_expr = new_expr.pop().unwrap();
Expand All @@ -223,7 +226,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
input,
&mut expr_set,
input.schema(),
execution_props,
optimizer_config,
)?;

Ok(LogicalPlan::Sort(Sort {
Expand Down Expand Up @@ -253,7 +256,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|input_plan| optimize(input_plan, execution_props))
.map(|input_plan| optimize(input_plan, optimizer_config))
.collect::<Result<Vec<_>>>()?;

utils::from_plan(plan, &expr, &new_inputs)
Expand Down Expand Up @@ -320,7 +323,7 @@ fn rewrite_expr(
input: &LogicalPlan,
expr_set: &mut ExprSet,
schema: &DFSchema,
execution_props: &ExecutionProps,
optimizer_config: &OptimizerConfig,
) -> Result<(Vec<Vec<Expr>>, LogicalPlan)> {
let mut affected_id = HashSet::<Identifier>::new();

Expand All @@ -345,7 +348,7 @@ fn rewrite_expr(
})
.collect::<Result<Vec<_>>>()?;

let mut new_input = optimize(input, execution_props)?;
let mut new_input = optimize(input, optimizer_config)?;
if !affected_id.is_empty() {
new_input = build_project_plan(new_input, affected_id, expr_set)?;
}
Expand Down Expand Up @@ -694,7 +697,7 @@ mod test {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let optimizer = CommonSubexprEliminate {};
let optimized_plan = optimizer
.optimize(plan, &ExecutionProps::new())
.optimize(plan, &OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/optimizer/eliminate_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::logical_plan::{EmptyRelation, LogicalPlan};
use crate::optimizer::optimizer::OptimizerRule;

use super::utils;
use crate::execution::context::ExecutionProps;
use crate::optimizer::optimizer::OptimizerConfig;

/// Optimization rule that elimanate the scalar value (true/false) filter with an [LogicalPlan::EmptyRelation]
#[derive(Default)]
Expand All @@ -44,7 +44,7 @@ impl OptimizerRule for EliminateFilter {
fn optimize(
&self,
plan: &LogicalPlan,
execution_props: &ExecutionProps,
optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Filter(Filter {
Expand All @@ -65,7 +65,7 @@ impl OptimizerRule for EliminateFilter {
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|plan| self.optimize(plan, execution_props))
.map(|plan| self.optimize(plan, optimizer_config))
.collect::<Result<Vec<_>>>()?;

utils::from_plan(plan, &plan.expressions(), &new_inputs)
Expand All @@ -88,7 +88,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = EliminateFilter::new();
let optimized_plan = rule
.optimize(plan, &ExecutionProps::new())
.optimize(plan, &OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/optimizer/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::logical_plan::{EmptyRelation, Limit, LogicalPlan};
use crate::optimizer::optimizer::OptimizerRule;

use super::utils;
use crate::execution::context::ExecutionProps;
use crate::optimizer::optimizer::OptimizerConfig;

/// Optimization rule that replaces LIMIT 0 with an [LogicalPlan::EmptyRelation]
#[derive(Default)]
Expand All @@ -39,7 +39,7 @@ impl OptimizerRule for EliminateLimit {
fn optimize(
&self,
plan: &LogicalPlan,
execution_props: &ExecutionProps,
optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Limit(Limit { n, input }) if *n == 0 => {
Expand All @@ -56,7 +56,7 @@ impl OptimizerRule for EliminateLimit {
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|plan| self.optimize(plan, execution_props))
.map(|plan| self.optimize(plan, optimizer_config))
.collect::<Result<Vec<_>>>()?;

utils::from_plan(plan, &expr, &new_inputs)
Expand All @@ -79,7 +79,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = EliminateLimit::new();
let optimized_plan = rule
.optimize(plan, &ExecutionProps::new())
.optimize(plan, &OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
9 changes: 4 additions & 5 deletions datafusion/core/src/optimizer/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan

use crate::datasource::datasource::TableProviderFilterPushDown;
use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection, Union};
use crate::logical_plan::{
and, col, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, TableScan,
};
use crate::logical_plan::{DFSchema, Expr};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
use crate::optimizer::optimizer::OptimizerConfig;
use crate::optimizer::{optimizer::OptimizerRule, utils};
use crate::{error::Result, logical_plan::Operator};
use std::{
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -560,7 +559,7 @@ impl OptimizerRule for FilterPushDown {
"filter_push_down"
}

fn optimize(&self, plan: &LogicalPlan, _: &ExecutionProps) -> Result<LogicalPlan> {
fn optimize(&self, plan: &LogicalPlan, _: &OptimizerConfig) -> Result<LogicalPlan> {
optimize(plan, State::default())
}
}
Expand Down Expand Up @@ -605,7 +604,7 @@ mod tests {

fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan {
let rule = FilterPushDown::new();
rule.optimize(plan, &ExecutionProps::new())
rule.optimize(plan, &OptimizerConfig::new())
.expect("failed to optimize plan")
}

Expand Down
Loading

0 comments on commit 09f5323

Please sign in to comment.