diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index bd939cef7035..0cf8b3b6c276 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -21,6 +21,7 @@ use crate::{ catalog::{CatalogList, MemoryCatalogList}, information_schema::CatalogWithInformationSchema, }, + logical_plan::{PlanType, ToStringifiedPlan}, optimizer::{ aggregate_statistics::AggregateStatistics, eliminate_limit::EliminateLimit, hash_build_probe_order::HashBuildProbeOrder, @@ -446,19 +447,31 @@ impl ExecutionContext { /// Optimizes the logical plan by applying optimizer rules. pub fn optimize(&self, plan: &LogicalPlan) -> Result { - let state = &mut self.state.lock().unwrap(); - let execution_props = &mut state.execution_props.clone(); - let optimizers = &state.config.optimizers; - - let execution_props = execution_props.start_execution(); - - let mut new_plan = plan.clone(); - debug!("Logical plan:\n {:?}", plan); - for optimizer in optimizers { - new_plan = optimizer.optimize(&new_plan, execution_props)?; + if let LogicalPlan::Explain { + verbose, + plan, + stringified_plans, + schema, + } = plan + { + let mut stringified_plans = stringified_plans.clone(); + + // optimize the child plan, capturing the output of each optimizer + let plan = self.optimize_internal(plan, |optimized_plan, optimizer| { + let optimizer_name = optimizer.name().to_string(); + let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name }; + stringified_plans.push(optimized_plan.to_stringified(plan_type)); + })?; + + Ok(LogicalPlan::Explain { + verbose: *verbose, + plan: Arc::new(plan), + stringified_plans, + schema: schema.clone(), + }) + } else { + self.optimize_internal(plan, |_, _| {}) } - debug!("Optimized logical plan:\n {:?}", new_plan); - Ok(new_plan) } /// Creates a physical plan from a logical plan. @@ -556,6 +569,32 @@ impl ExecutionContext { ))), } } + + /// Optimizes the logical plan by applying optimizer rules, and + /// invoking observer function after each call + fn optimize_internal( + &self, + plan: &LogicalPlan, + mut observer: F, + ) -> Result + where + F: FnMut(&LogicalPlan, &dyn OptimizerRule), + { + let state = &mut self.state.lock().unwrap(); + let execution_props = &mut state.execution_props.clone(); + let optimizers = &state.config.optimizers; + + let execution_props = execution_props.start_execution(); + + let mut new_plan = plan.clone(); + debug!("Logical plan:\n {:?}", plan); + for optimizer in optimizers { + new_plan = optimizer.optimize(&new_plan, execution_props)?; + observer(&new_plan, optimizer.as_ref()); + } + debug!("Optimized logical plan:\n {:?}", new_plan); + Ok(new_plan) + } } impl From>> for ExecutionContext { @@ -941,6 +980,49 @@ mod tests { use tempfile::TempDir; use test::*; + #[test] + fn optimize_explain() { + let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]); + + let plan = LogicalPlanBuilder::scan_empty(Some("employee"), &schema, None) + .unwrap() + .explain(true) + .unwrap() + .build() + .unwrap(); + + if let LogicalPlan::Explain { + stringified_plans, .. + } = &plan + { + assert_eq!(stringified_plans.len(), 1); + } else { + panic!("plan was not an explain: {:?}", plan); + } + + // now optimize the plan and expect to see more plans + let optimized_plan = ExecutionContext::new().optimize(&plan).unwrap(); + if let LogicalPlan::Explain { + stringified_plans, .. + } = &optimized_plan + { + // should have more than one plan + assert!( + stringified_plans.len() > 1, + "plans: {:#?}", + stringified_plans + ); + // should have at least one optimized plan + let opt = stringified_plans + .iter() + .any(|p| matches!(p.plan_type, PlanType::OptimizedLogicalPlan { .. })); + + assert!(opt, "plans: {:#?}", stringified_plans); + } else { + panic!("plan was not an explain: {:?}", plan); + } + } + #[tokio::test] async fn parallel_projection() -> Result<()> { let partition_count = 4; diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 0335e29127ab..60e0ed3c0988 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -27,18 +27,15 @@ use arrow::{ record_batch::RecordBatch, }; -use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; +use crate::{datasource::TableProvider, logical_plan::plan::ToStringifiedPlan}; use crate::{ datasource::{empty::EmptyTable, parquet::ParquetTable, CsvFile, MemTable}, prelude::CsvReadOptions, }; use super::dfschema::ToDFSchema; -use super::{ - exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType, - StringifiedPlan, -}; +use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType}; use crate::logical_plan::{ columnize_expr, normalize_col, normalize_cols, Column, DFField, DFSchema, DFSchemaRef, Partitioning, @@ -398,10 +395,8 @@ impl LogicalPlanBuilder { /// Create an expression to represent the explanation of the plan pub fn explain(&self, verbose: bool) -> Result { - let stringified_plans = vec![StringifiedPlan::new( - PlanType::InitialLogicalPlan, - format!("{:#?}", self.plan.clone()), - )]; + let stringified_plans = + vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)]; let schema = LogicalPlan::explain_schema(); @@ -553,6 +548,8 @@ pub(crate) fn expand_wildcard( mod tests { use arrow::datatypes::{DataType, Field}; + use crate::logical_plan::StringifiedPlan; + use super::super::{col, lit, sum}; use super::*; diff --git a/datafusion/src/logical_plan/mod.rs b/datafusion/src/logical_plan/mod.rs index f381e316669e..a021d06f0950 100644 --- a/datafusion/src/logical_plan/mod.rs +++ b/datafusion/src/logical_plan/mod.rs @@ -50,6 +50,6 @@ pub use extension::UserDefinedLogicalNode; pub use operators::Operator; pub use plan::{ JoinConstraint, JoinType, LogicalPlan, Partitioning, PlanType, PlanVisitor, - StringifiedPlan, }; +pub(crate) use plan::{StringifiedPlan, ToStringifiedPlan}; pub use registry::FunctionRegistry; diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 9a4daae27ff5..28405fb6dfba 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -820,6 +820,11 @@ pub enum PlanType { FinalLogicalPlan, /// The initial physical plan, prepared for execution InitialPhysicalPlan, + /// The ExecutionPlan which results from applying an optimizer pass + OptimizedPhysicalPlan { + /// The name of the optimizer which produced this plan + optimizer_name: String, + }, /// The final, fully optimized physical which would be executed FinalPhysicalPlan, } @@ -833,6 +838,9 @@ impl fmt::Display for PlanType { } PlanType::FinalLogicalPlan => write!(f, "logical_plan"), PlanType::InitialPhysicalPlan => write!(f, "initial_physical_plan"), + PlanType::OptimizedPhysicalPlan { optimizer_name } => { + write!(f, "physical_plan after {}", optimizer_name) + } PlanType::FinalPhysicalPlan => write!(f, "physical_plan"), } } @@ -868,6 +876,18 @@ impl StringifiedPlan { } } +/// Trait for something that can be formatted as a stringified plan +pub trait ToStringifiedPlan { + /// Create a stringified plan with the specified type + fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan; +} + +impl ToStringifiedPlan for LogicalPlan { + fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan { + StringifiedPlan::new(plan_type, self.display_indent().to_string()) + } +} + #[cfg(test)] mod tests { use super::super::{col, lit, LogicalPlanBuilder}; diff --git a/datafusion/src/optimizer/limit_push_down.rs b/datafusion/src/optimizer/limit_push_down.rs index 37c95a443692..46738c557c63 100644 --- a/datafusion/src/optimizer/limit_push_down.rs +++ b/datafusion/src/optimizer/limit_push_down.rs @@ -23,7 +23,6 @@ use crate::execution::context::ExecutionProps; use crate::logical_plan::LogicalPlan; use crate::optimizer::optimizer::OptimizerRule; use std::sync::Arc; -use utils::optimize_explain; /// Optimization rule that tries pushes down LIMIT n /// where applicable to reduce the amount of scanned / processed data @@ -43,25 +42,6 @@ fn limit_push_down( execution_props: &ExecutionProps, ) -> Result { match (plan, upper_limit) { - ( - LogicalPlan::Explain { - verbose, - schema, - plan, - stringified_plans, - }, - _, - ) => { - let schema = schema.as_ref().to_owned().into(); - optimize_explain( - optimizer, - *verbose, - plan, - stringified_plans, - &schema, - execution_props, - ) - } (LogicalPlan::Limit { n, input }, upper_limit) => { let smallest = upper_limit.map(|x| std::cmp::min(x, *n)).unwrap_or(*n); Ok(LogicalPlan::Limit { diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs index 089dca2318c9..0de36f354206 100644 --- a/datafusion/src/optimizer/projection_push_down.rs +++ b/datafusion/src/optimizer/projection_push_down.rs @@ -18,7 +18,7 @@ //! Projection Push Down optimizer rule ensures that only referenced columns are //! loaded into memory -use crate::error::Result; +use crate::error::{DataFusionError, Result}; use crate::execution::context::ExecutionProps; use crate::logical_plan::{ build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan, @@ -33,7 +33,6 @@ use std::{ collections::{BTreeSet, HashSet}, sync::Arc, }; -use utils::optimize_explain; /// Optimizer that removes unused projections and aggregations from plans /// This reduces both scans and @@ -354,22 +353,9 @@ fn optimize_plan( limit: *limit, }) } - LogicalPlan::Explain { - verbose, - plan, - stringified_plans, - schema, - } => { - let schema = schema.as_ref().to_owned().into(); - optimize_explain( - optimizer, - *verbose, - &*plan, - stringified_plans, - &schema, - execution_props, - ) - } + LogicalPlan::Explain { .. } => Err(DataFusionError::Internal( + "Unsupported logical plan: Explain must be root of the plan".to_string(), + )), LogicalPlan::Union { inputs, schema, diff --git a/datafusion/src/optimizer/simplify_expressions.rs b/datafusion/src/optimizer/simplify_expressions.rs index 0e65de07305f..f629eaf95b5f 100644 --- a/datafusion/src/optimizer/simplify_expressions.rs +++ b/datafusion/src/optimizer/simplify_expressions.rs @@ -22,7 +22,6 @@ use crate::logical_plan::LogicalPlan; use crate::logical_plan::{lit, Expr}; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils; -use crate::optimizer::utils::optimize_explain; use crate::scalar::ScalarValue; use crate::{error::Result, logical_plan::Operator}; @@ -278,27 +277,9 @@ impl OptimizerRule for SimplifyExpressions { fn optimize( &self, plan: &LogicalPlan, - execution_props: &ExecutionProps, + _execution_props: &ExecutionProps, ) -> Result { - match plan { - LogicalPlan::Explain { - verbose, - plan, - stringified_plans, - schema, - } => { - let schema = schema.as_ref().to_owned().into(); - optimize_explain( - self, - *verbose, - &*plan, - stringified_plans, - &schema, - execution_props, - ) - } - _ => optimize(plan), - } + optimize(plan) } } diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index 88380ea17c87..615f0ccfceaf 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -17,15 +17,11 @@ //! Collection of utility functions that are leveraged by the query optimizer rules -use std::{collections::HashSet, sync::Arc}; - -use arrow::datatypes::Schema; - use super::optimizer::OptimizerRule; use crate::execution::context::ExecutionProps; use crate::logical_plan::{ build_join_schema, Column, DFSchemaRef, Expr, LogicalPlan, LogicalPlanBuilder, - Operator, Partitioning, PlanType, Recursion, StringifiedPlan, ToDFSchema, + Operator, Partitioning, Recursion, }; use crate::prelude::lit; use crate::scalar::ScalarValue; @@ -33,6 +29,7 @@ use crate::{ error::{DataFusionError, Result}, logical_plan::ExpressionVisitor, }; +use std::{collections::HashSet, sync::Arc}; const CASE_EXPR_MARKER: &str = "__DATAFUSION_CASE_EXPR__"; const CASE_ELSE_MARKER: &str = "__DATAFUSION_CASE_ELSE__"; @@ -94,34 +91,6 @@ pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet) -> Result<()> { Ok(()) } -/// Create a `LogicalPlan::Explain` node by running `optimizer` on the -/// input plan and capturing the resulting plan string -pub fn optimize_explain( - optimizer: &impl OptimizerRule, - verbose: bool, - plan: &LogicalPlan, - stringified_plans: &[StringifiedPlan], - schema: &Schema, - execution_props: &ExecutionProps, -) -> Result { - // These are the fields of LogicalPlan::Explain It might be nice - // to transform that enum Variant into its own struct and avoid - // passing the fields individually - let plan = Arc::new(optimizer.optimize(plan, execution_props)?); - let mut stringified_plans = stringified_plans.to_vec(); - let optimizer_name = optimizer.name().into(); - stringified_plans.push(StringifiedPlan::new( - PlanType::OptimizedLogicalPlan { optimizer_name }, - format!("{:#?}", plan), - )); - Ok(LogicalPlan::Explain { - verbose, - plan, - stringified_plans, - schema: schema.clone().to_dfschema_ref()?, - }) -} - /// Convenience rule for writing optimizers: recursively invoke /// optimize on plan's children and then return a node of the same /// type. Useful for optimizer rules which want to leave the type @@ -132,23 +101,6 @@ pub fn optimize_children( plan: &LogicalPlan, execution_props: &ExecutionProps, ) -> Result { - if let LogicalPlan::Explain { - verbose, - plan, - stringified_plans, - schema, - } = plan - { - return optimize_explain( - optimizer, - *verbose, - &*plan, - stringified_plans, - &schema.as_ref().to_owned().into(), - execution_props, - ); - } - let new_exprs = plan.expressions(); let new_inputs = plan .inputs() @@ -489,7 +441,7 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result { #[cfg(test)] mod tests { use super::*; - use crate::logical_plan::{col, LogicalPlanBuilder}; + use crate::logical_plan::col; use arrow::datatypes::DataType; use std::collections::HashSet; @@ -514,61 +466,4 @@ mod tests { assert!(accum.contains(&Column::from_name("a"))); Ok(()) } - - struct TestOptimizer {} - - impl OptimizerRule for TestOptimizer { - fn optimize( - &self, - plan: &LogicalPlan, - _: &ExecutionProps, - ) -> Result { - Ok(plan.clone()) - } - - fn name(&self) -> &str { - "test_optimizer" - } - } - - #[test] - fn test_optimize_explain() -> Result<()> { - let optimizer = TestOptimizer {}; - - let empty_plan = LogicalPlanBuilder::empty(false).build()?; - let schema = LogicalPlan::explain_schema(); - - let optimized_explain = optimize_explain( - &optimizer, - true, - &empty_plan, - &[StringifiedPlan::new(PlanType::InitialLogicalPlan, "...")], - schema.as_ref(), - &ExecutionProps::new(), - )?; - - match &optimized_explain { - LogicalPlan::Explain { - verbose, - stringified_plans, - .. - } => { - assert!(*verbose); - - let expected_stringified_plans = vec![ - StringifiedPlan::new(PlanType::InitialLogicalPlan, "..."), - StringifiedPlan::new( - PlanType::OptimizedLogicalPlan { - optimizer_name: "test_optimizer".into(), - }, - "EmptyRelation", - ), - ]; - assert_eq!(*stringified_plans, expected_stringified_plans); - } - _ => panic!("Expected explain plan but got {:?}", optimized_explain), - } - - Ok(()) - } } diff --git a/datafusion/src/physical_plan/display.rs b/datafusion/src/physical_plan/display.rs index 8498e02d50c8..e251e4ea53db 100644 --- a/datafusion/src/physical_plan/display.rs +++ b/datafusion/src/physical_plan/display.rs @@ -21,6 +21,8 @@ use std::fmt; +use crate::logical_plan::{StringifiedPlan, ToStringifiedPlan}; + use super::{accept, ExecutionPlan, ExecutionPlanVisitor}; /// Options for controlling how each [`ExecutionPlan`] should format itself @@ -131,3 +133,12 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> { Ok(true) } } + +impl<'a> ToStringifiedPlan for DisplayableExecutionPlan<'a> { + fn to_stringified( + &self, + plan_type: crate::logical_plan::PlanType, + ) -> StringifiedPlan { + StringifiedPlan::new(plan_type, self.indent().to_string()) + } +} diff --git a/datafusion/src/physical_plan/explain.rs b/datafusion/src/physical_plan/explain.rs index 195a7a518370..a6a34f5d0b0c 100644 --- a/datafusion/src/physical_plan/explain.rs +++ b/datafusion/src/physical_plan/explain.rs @@ -115,9 +115,20 @@ impl ExecutionPlan for ExplainExec { .iter() .filter(|s| s.should_display(self.verbose)); + // Identify plans that are not changed + let mut prev: Option<&StringifiedPlan> = None; + for p in plans_to_print { type_builder.append_value(p.plan_type.to_string())?; - plan_builder.append_value(&*p.plan)?; + match prev { + Some(prev) if !should_show(prev, p) => { + plan_builder.append_value("SAME TEXT AS ABOVE")?; + } + Some(_) | None => { + plan_builder.append_value(&*p.plan)?; + } + } + prev = Some(p); } let record_batch = RecordBatch::try_new( @@ -146,3 +157,14 @@ impl ExecutionPlan for ExplainExec { } } } + +/// If this plan should be shown, given the previous plan that was +/// displayed. +/// +/// This is meant to avoid repeating the same plan over and over again +/// in explain plans to make clear what is changing +fn should_show(previous_plan: &StringifiedPlan, this_plan: &StringifiedPlan) -> bool { + // if the plans are different, or if they would have been + // displayed in the normal explain (aka non verbose) plan + (previous_plan.plan != this_plan.plan) || this_plan.should_display(false) +} diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 5163e4b425b4..e662821e4539 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -24,9 +24,10 @@ use super::{ use crate::execution::context::ExecutionContextState; use crate::logical_plan::{ unnormalize_cols, DFSchema, Expr, LogicalPlan, Operator, - Partitioning as LogicalPartitioning, PlanType, StringifiedPlan, + Partitioning as LogicalPartitioning, PlanType, ToStringifiedPlan, UserDefinedLogicalNode, }; +use crate::physical_optimizer::optimizer::PhysicalOptimizerRule; use crate::physical_plan::explain::ExplainExec; use crate::physical_plan::expressions; use crate::physical_plan::expressions::{CaseExpr, Column, Literal, PhysicalSortExpr}; @@ -244,7 +245,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { Some(plan) => Ok(plan), None => { let plan = self.create_initial_plan(logical_plan, ctx_state)?; - self.optimize_plan(plan, ctx_state) + self.optimize_internal(plan, ctx_state, |_, _| {}) } } } @@ -285,23 +286,6 @@ impl DefaultPhysicalPlanner { Self { extension_planners } } - /// Optimize a physical plan by applying each physical optimizer - fn optimize_plan( - &self, - plan: Arc, - ctx_state: &ExecutionContextState, - ) -> Result> { - let optimizers = &ctx_state.config.physical_optimizers; - debug!("Physical plan:\n{:?}", plan); - - let mut new_plan = plan; - for optimizer in optimizers { - new_plan = optimizer.optimize(new_plan, &ctx_state.config)?; - } - debug!("Optimized physical plan:\n{:?}", new_plan); - Ok(new_plan) - } - /// Create a physical plan from a logical plan fn create_initial_plan( &self, @@ -1315,32 +1299,24 @@ impl DefaultPhysicalPlanner { schema, } = logical_plan { - let final_logical_plan = StringifiedPlan::new( - PlanType::FinalLogicalPlan, - plan.display_indent().to_string(), - ); + use PlanType::*; + let mut stringified_plans = stringified_plans.clone(); - let input = self.create_initial_plan(plan, ctx_state)?; + stringified_plans.push(plan.to_stringified(FinalLogicalPlan)); - let initial_physical_plan = StringifiedPlan::new( - PlanType::InitialPhysicalPlan, - displayable(input.as_ref()).indent().to_string(), - ); + let input = self.create_initial_plan(plan, ctx_state)?; - let input = self.optimize_plan(input, ctx_state)?; + stringified_plans + .push(displayable(input.as_ref()).to_stringified(InitialPhysicalPlan)); - let final_physical_plan = StringifiedPlan::new( - PlanType::FinalPhysicalPlan, - displayable(input.as_ref()).indent().to_string(), - ); + let input = self.optimize_internal(input, ctx_state, |plan, optimizer| { + let optimizer_name = optimizer.name().to_string(); + let plan_type = OptimizedPhysicalPlan { optimizer_name }; + stringified_plans.push(displayable(plan).to_stringified(plan_type)); + })?; - let stringified_plans = stringified_plans - .iter() - .cloned() - .chain(std::iter::once(final_logical_plan)) - .chain(std::iter::once(initial_physical_plan)) - .chain(std::iter::once(final_physical_plan)) - .collect::>(); + stringified_plans + .push(displayable(input.as_ref()).to_stringified(FinalPhysicalPlan)); Ok(Some(Arc::new(ExplainExec::new( SchemaRef::new(schema.as_ref().to_owned().into()), @@ -1351,6 +1327,29 @@ impl DefaultPhysicalPlanner { Ok(None) } } + + /// Optimize a physical plan by applying each physical optimizer, + /// calling observer(plan, optimizer after each one) + fn optimize_internal( + &self, + plan: Arc, + ctx_state: &ExecutionContextState, + mut observer: F, + ) -> Result> + where + F: FnMut(&dyn ExecutionPlan, &dyn PhysicalOptimizerRule), + { + let optimizers = &ctx_state.config.physical_optimizers; + debug!("Physical plan:\n{:?}", plan); + + let mut new_plan = plan; + for optimizer in optimizers { + new_plan = optimizer.optimize(new_plan, &ctx_state.config)?; + observer(new_plan.as_ref(), optimizer.as_ref()) + } + debug!("Optimized physical plan:\n{:?}", new_plan); + Ok(new_plan) + } } fn tuple_err(value: (Result, Result)) -> Result<(T, R)> { @@ -1645,6 +1644,42 @@ mod tests { Ok(()) } + #[test] + fn test_explain() { + let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]); + + let logical_plan = + LogicalPlanBuilder::scan_empty(Some("employee"), &schema, None) + .unwrap() + .explain(true) + .unwrap() + .build() + .unwrap(); + + let plan = plan(&logical_plan).unwrap(); + if let Some(plan) = plan.as_any().downcast_ref::() { + let stringified_plans = plan.stringified_plans(); + assert!(stringified_plans.len() >= 4); + assert!(stringified_plans + .iter() + .any(|p| matches!(p.plan_type, PlanType::FinalLogicalPlan))); + assert!(stringified_plans + .iter() + .any(|p| matches!(p.plan_type, PlanType::InitialPhysicalPlan))); + assert!(stringified_plans + .iter() + .any(|p| matches!(p.plan_type, PlanType::OptimizedPhysicalPlan { .. }))); + assert!(stringified_plans + .iter() + .any(|p| matches!(p.plan_type, PlanType::FinalPhysicalPlan))); + } else { + panic!( + "Plan was not an explain plan: {}", + displayable(plan.as_ref()).indent() + ); + } + } + /// An example extension node that doesn't do anything struct NoOpExtensionNode { schema: DFSchemaRef, diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index a4bb02cf0f9a..fa2b035162a6 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -28,8 +28,8 @@ use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits}; use crate::logical_plan::Expr::Alias; use crate::logical_plan::{ and, builder::expand_wildcard, col, lit, normalize_col, union_with_alias, Column, - DFSchema, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, StringifiedPlan, - ToDFSchema, + DFSchema, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ToDFSchema, + ToStringifiedPlan, }; use crate::prelude::JoinType; use crate::scalar::ScalarValue; @@ -233,10 +233,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ) -> Result { let plan = self.sql_statement_to_plan(statement)?; - let stringified_plans = vec![StringifiedPlan::new( - PlanType::InitialLogicalPlan, - plan.display_indent().to_string(), - )]; + let stringified_plans = vec![plan.to_stringified(PlanType::InitialLogicalPlan)]; let schema = LogicalPlan::explain_schema(); let plan = Arc::new(plan); diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 95b5596eb9f1..0ef8b4ca3024 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -2205,6 +2205,13 @@ async fn csv_explain_verbose() { "Actual: '{}'", actual ); + + // ensure the "same text as above" optimization is working + assert!( + actual.contains("SAME TEXT AS ABOVE"), + "Actual 2: '{}'", + actual + ); } #[tokio::test] diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs index e1f8c767bd8d..c1269d9a217f 100644 --- a/datafusion/tests/user_defined_plan.rs +++ b/datafusion/tests/user_defined_plan.rs @@ -163,9 +163,9 @@ async fn topk_plan() -> Result<()> { let mut ctx = setup_table(make_topk_context()).await?; let expected = vec![ - "| logical_plan after topk | TopK: k=3 |", - "| | Projection: #sales.customer_id, #sales.revenue |", - "| | TableScan: sales projection=Some([0, 1]) |", + "| logical_plan after topk | TopK: k=3 |", + "| | Projection: #sales.customer_id, #sales.revenue |", + "| | TableScan: sales projection=Some([0, 1]) |", ].join("\n"); let explain_query = format!("EXPLAIN VERBOSE {}", QUERY);