Skip to content

Commit

Permalink
Show the result of all optimizer passes in EXPLAIN VERBOSE (#759)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Jul 20, 2021
1 parent 3a24113 commit 30693df
Show file tree
Hide file tree
Showing 14 changed files with 252 additions and 239 deletions.
106 changes: 94 additions & 12 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::{
catalog::{CatalogList, MemoryCatalogList},
information_schema::CatalogWithInformationSchema,
},
logical_plan::{PlanType, ToStringifiedPlan},
optimizer::{
aggregate_statistics::AggregateStatistics, eliminate_limit::EliminateLimit,
hash_build_probe_order::HashBuildProbeOrder,
Expand Down Expand Up @@ -446,19 +447,31 @@ impl ExecutionContext {

/// Optimizes the logical plan by applying optimizer rules.
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
let state = &mut self.state.lock().unwrap();
let execution_props = &mut state.execution_props.clone();
let optimizers = &state.config.optimizers;

let execution_props = execution_props.start_execution();

let mut new_plan = plan.clone();
debug!("Logical plan:\n {:?}", plan);
for optimizer in optimizers {
new_plan = optimizer.optimize(&new_plan, execution_props)?;
if let LogicalPlan::Explain {
verbose,
plan,
stringified_plans,
schema,
} = plan
{
let mut stringified_plans = stringified_plans.clone();

// optimize the child plan, capturing the output of each optimizer
let plan = self.optimize_internal(plan, |optimized_plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
stringified_plans.push(optimized_plan.to_stringified(plan_type));
})?;

Ok(LogicalPlan::Explain {
verbose: *verbose,
plan: Arc::new(plan),
stringified_plans,
schema: schema.clone(),
})
} else {
self.optimize_internal(plan, |_, _| {})
}
debug!("Optimized logical plan:\n {:?}", new_plan);
Ok(new_plan)
}

/// Creates a physical plan from a logical plan.
Expand Down Expand Up @@ -556,6 +569,32 @@ impl ExecutionContext {
))),
}
}

/// Optimizes the logical plan by applying optimizer rules, and
/// invoking observer function after each call
fn optimize_internal<F>(
&self,
plan: &LogicalPlan,
mut observer: F,
) -> Result<LogicalPlan>
where
F: FnMut(&LogicalPlan, &dyn OptimizerRule),
{
let state = &mut self.state.lock().unwrap();
let execution_props = &mut state.execution_props.clone();
let optimizers = &state.config.optimizers;

let execution_props = execution_props.start_execution();

let mut new_plan = plan.clone();
debug!("Logical plan:\n {:?}", plan);
for optimizer in optimizers {
new_plan = optimizer.optimize(&new_plan, execution_props)?;
observer(&new_plan, optimizer.as_ref());
}
debug!("Optimized logical plan:\n {:?}", new_plan);
Ok(new_plan)
}
}

impl From<Arc<Mutex<ExecutionContextState>>> for ExecutionContext {
Expand Down Expand Up @@ -941,6 +980,49 @@ mod tests {
use tempfile::TempDir;
use test::*;

#[test]
fn optimize_explain() {
let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);

let plan = LogicalPlanBuilder::scan_empty(Some("employee"), &schema, None)
.unwrap()
.explain(true)
.unwrap()
.build()
.unwrap();

if let LogicalPlan::Explain {
stringified_plans, ..
} = &plan
{
assert_eq!(stringified_plans.len(), 1);
} else {
panic!("plan was not an explain: {:?}", plan);
}

// now optimize the plan and expect to see more plans
let optimized_plan = ExecutionContext::new().optimize(&plan).unwrap();
if let LogicalPlan::Explain {
stringified_plans, ..
} = &optimized_plan
{
// should have more than one plan
assert!(
stringified_plans.len() > 1,
"plans: {:#?}",
stringified_plans
);
// should have at least one optimized plan
let opt = stringified_plans
.iter()
.any(|p| matches!(p.plan_type, PlanType::OptimizedLogicalPlan { .. }));

assert!(opt, "plans: {:#?}", stringified_plans);
} else {
panic!("plan was not an explain: {:?}", plan);
}
}

#[tokio::test]
async fn parallel_projection() -> Result<()> {
let partition_count = 4;
Expand Down
15 changes: 6 additions & 9 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,15 @@ use arrow::{
record_batch::RecordBatch,
};

use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
use crate::{datasource::TableProvider, logical_plan::plan::ToStringifiedPlan};
use crate::{
datasource::{empty::EmptyTable, parquet::ParquetTable, CsvFile, MemTable},
prelude::CsvReadOptions,
};

use super::dfschema::ToDFSchema;
use super::{
exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType,
StringifiedPlan,
};
use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType};
use crate::logical_plan::{
columnize_expr, normalize_col, normalize_cols, Column, DFField, DFSchema,
DFSchemaRef, Partitioning,
Expand Down Expand Up @@ -398,10 +395,8 @@ impl LogicalPlanBuilder {

/// Create an expression to represent the explanation of the plan
pub fn explain(&self, verbose: bool) -> Result<Self> {
let stringified_plans = vec![StringifiedPlan::new(
PlanType::InitialLogicalPlan,
format!("{:#?}", self.plan.clone()),
)];
let stringified_plans =
vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];

let schema = LogicalPlan::explain_schema();

Expand Down Expand Up @@ -553,6 +548,8 @@ pub(crate) fn expand_wildcard(
mod tests {
use arrow::datatypes::{DataType, Field};

use crate::logical_plan::StringifiedPlan;

use super::super::{col, lit, sum};
use super::*;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,6 @@ pub use extension::UserDefinedLogicalNode;
pub use operators::Operator;
pub use plan::{
JoinConstraint, JoinType, LogicalPlan, Partitioning, PlanType, PlanVisitor,
StringifiedPlan,
};
pub(crate) use plan::{StringifiedPlan, ToStringifiedPlan};
pub use registry::FunctionRegistry;
20 changes: 20 additions & 0 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,11 @@ pub enum PlanType {
FinalLogicalPlan,
/// The initial physical plan, prepared for execution
InitialPhysicalPlan,
/// The ExecutionPlan which results from applying an optimizer pass
OptimizedPhysicalPlan {
/// The name of the optimizer which produced this plan
optimizer_name: String,
},
/// The final, fully optimized physical which would be executed
FinalPhysicalPlan,
}
Expand All @@ -833,6 +838,9 @@ impl fmt::Display for PlanType {
}
PlanType::FinalLogicalPlan => write!(f, "logical_plan"),
PlanType::InitialPhysicalPlan => write!(f, "initial_physical_plan"),
PlanType::OptimizedPhysicalPlan { optimizer_name } => {
write!(f, "physical_plan after {}", optimizer_name)
}
PlanType::FinalPhysicalPlan => write!(f, "physical_plan"),
}
}
Expand Down Expand Up @@ -868,6 +876,18 @@ impl StringifiedPlan {
}
}

/// Trait for something that can be formatted as a stringified plan
pub trait ToStringifiedPlan {
/// Create a stringified plan with the specified type
fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan;
}

impl ToStringifiedPlan for LogicalPlan {
fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
StringifiedPlan::new(plan_type, self.display_indent().to_string())
}
}

#[cfg(test)]
mod tests {
use super::super::{col, lit, LogicalPlanBuilder};
Expand Down
20 changes: 0 additions & 20 deletions datafusion/src/optimizer/limit_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::execution::context::ExecutionProps;
use crate::logical_plan::LogicalPlan;
use crate::optimizer::optimizer::OptimizerRule;
use std::sync::Arc;
use utils::optimize_explain;

/// Optimization rule that tries pushes down LIMIT n
/// where applicable to reduce the amount of scanned / processed data
Expand All @@ -43,25 +42,6 @@ fn limit_push_down(
execution_props: &ExecutionProps,
) -> Result<LogicalPlan> {
match (plan, upper_limit) {
(
LogicalPlan::Explain {
verbose,
schema,
plan,
stringified_plans,
},
_,
) => {
let schema = schema.as_ref().to_owned().into();
optimize_explain(
optimizer,
*verbose,
plan,
stringified_plans,
&schema,
execution_props,
)
}
(LogicalPlan::Limit { n, input }, upper_limit) => {
let smallest = upper_limit.map(|x| std::cmp::min(x, *n)).unwrap_or(*n);
Ok(LogicalPlan::Limit {
Expand Down
22 changes: 4 additions & 18 deletions datafusion/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Projection Push Down optimizer rule ensures that only referenced columns are
//! loaded into memory

use crate::error::Result;
use crate::error::{DataFusionError, Result};
use crate::execution::context::ExecutionProps;
use crate::logical_plan::{
build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan,
Expand All @@ -33,7 +33,6 @@ use std::{
collections::{BTreeSet, HashSet},
sync::Arc,
};
use utils::optimize_explain;

/// Optimizer that removes unused projections and aggregations from plans
/// This reduces both scans and
Expand Down Expand Up @@ -354,22 +353,9 @@ fn optimize_plan(
limit: *limit,
})
}
LogicalPlan::Explain {
verbose,
plan,
stringified_plans,
schema,
} => {
let schema = schema.as_ref().to_owned().into();
optimize_explain(
optimizer,
*verbose,
&*plan,
stringified_plans,
&schema,
execution_props,
)
}
LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
"Unsupported logical plan: Explain must be root of the plan".to_string(),
)),
LogicalPlan::Union {
inputs,
schema,
Expand Down
23 changes: 2 additions & 21 deletions datafusion/src/optimizer/simplify_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use crate::logical_plan::LogicalPlan;
use crate::logical_plan::{lit, Expr};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
use crate::optimizer::utils::optimize_explain;
use crate::scalar::ScalarValue;
use crate::{error::Result, logical_plan::Operator};

Expand Down Expand Up @@ -278,27 +277,9 @@ impl OptimizerRule for SimplifyExpressions {
fn optimize(
&self,
plan: &LogicalPlan,
execution_props: &ExecutionProps,
_execution_props: &ExecutionProps,
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Explain {
verbose,
plan,
stringified_plans,
schema,
} => {
let schema = schema.as_ref().to_owned().into();
optimize_explain(
self,
*verbose,
&*plan,
stringified_plans,
&schema,
execution_props,
)
}
_ => optimize(plan),
}
optimize(plan)
}
}

Expand Down
Loading

0 comments on commit 30693df

Please sign in to comment.