diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 5c41ed26eea43..e246356075e07 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -3365,6 +3365,16 @@ mod tests { "query not supported".to_string(), )) } + + fn create_physical_expr( + &self, + _e: &Expr, + _input_dfschema: &crate::logical_plan::DFSchema, + _input_schema: &Schema, + _ctx_state: &ExecutionContextState, + ) -> Result> { + unimplemented!() + } } struct MyQueryPlanner {} diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 2122751abb604..307fff619478e 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -20,8 +20,6 @@ use self::{ coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan, }; -use crate::execution::context::ExecutionContextState; -use crate::logical_plan::LogicalPlan; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::{ error::{DataFusionError, Result}, @@ -122,16 +120,8 @@ impl SQLMetric { } } -/// Physical query planner that converts a `LogicalPlan` to an -/// `ExecutionPlan` suitable for execution. -pub trait PhysicalPlanner { - /// Create a physical plan from a logical plan - fn create_physical_plan( - &self, - logical_plan: &LogicalPlan, - ctx_state: &ExecutionContextState, - ) -> Result>; -} +/// Physical planner interface +pub use self::planner::PhysicalPlanner; /// `ExecutionPlan` represent nodes in the DataFusion Physical Plan. /// diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index c3bb9a80136f1..d56bc9485e913 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -39,9 +39,7 @@ use crate::physical_plan::sort::SortExec; use crate::physical_plan::udf; use crate::physical_plan::windows::WindowAggExec; use crate::physical_plan::{hash_utils, Partitioning}; -use crate::physical_plan::{ - AggregateExpr, ExecutionPlan, PhysicalExpr, PhysicalPlanner, WindowExpr, -}; +use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, WindowExpr}; use crate::prelude::JoinType; use crate::scalar::ScalarValue; use crate::sql::utils::generate_sort_key; @@ -172,16 +170,51 @@ fn physical_name(e: &Expr, input_schema: &DFSchema) -> Result { } } +/// Physical query planner that converts a `LogicalPlan` to an +/// `ExecutionPlan` suitable for execution. +pub trait PhysicalPlanner { + /// Create a physical plan from a logical plan + fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + ctx_state: &ExecutionContextState, + ) -> Result>; + + /// Create a physical expression from a logical expression + /// suitable for evaluation + /// + /// `e`: the expression to convert + /// + /// `input_dfschema`: the logical plan schema for evaluating `e` + /// + /// `input_schema`: the physical schema for evaluating `e` + fn create_physical_expr( + &self, + e: &Expr, + input_dfschema: &DFSchema, + input_schema: &Schema, + ctx_state: &ExecutionContextState, + ) -> Result>; +} + /// This trait exposes the ability to plan an [`ExecutionPlan`] out of a [`LogicalPlan`]. pub trait ExtensionPlanner { /// Create a physical plan for a [`UserDefinedLogicalNode`]. - /// This errors when the planner knows how to plan the concrete implementation of `node` - /// but errors while doing so, and `None` when the planner does not know how to plan the `node` - /// and wants to delegate the planning to another [`ExtensionPlanner`]. + /// + /// `input_dfschema`: the logical plan schema for the inputs to this node + /// + /// Returns an error when the planner knows how to plan the concrete + /// implementation of `node` but errors while doing so. + /// + /// Returns `None` when the planner does not know how to plan the + /// `node` and wants to delegate the planning to another + /// [`ExtensionPlanner`]. fn plan_extension( &self, + planner: &dyn PhysicalPlanner, node: &dyn UserDefinedLogicalNode, - inputs: &[Arc], + logical_inputs: &[&LogicalPlan], + physical_inputs: &[Arc], ctx_state: &ExecutionContextState, ) -> Result>>; } @@ -210,6 +243,24 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { let plan = self.create_initial_plan(logical_plan, ctx_state)?; self.optimize_plan(plan, ctx_state) } + + /// Create a physical expression from a logical expression + /// suitable for evaluation + /// + /// `e`: the expression to convert + /// + /// `input_dfschema`: the logical plan schema for evaluating `e` + /// + /// `input_schema`: the physical schema for evaluating `e` + fn create_physical_expr( + &self, + e: &Expr, + input_dfschema: &DFSchema, + input_schema: &Schema, + ctx_state: &ExecutionContextState, + ) -> Result> { + self.create_physical_expr(e, input_dfschema, input_schema, ctx_state) + } } impl DefaultPhysicalPlanner { @@ -687,7 +738,7 @@ impl DefaultPhysicalPlanner { ))) } LogicalPlan::Extension { node } => { - let inputs = node + let physical_inputs = node .inputs() .into_iter() .map(|input_plan| self.create_initial_plan(input_plan, ctx_state)) @@ -699,7 +750,13 @@ impl DefaultPhysicalPlanner { if let Some(plan) = maybe_plan { Ok(Some(plan)) } else { - planner.plan_extension(node.as_ref(), &inputs, ctx_state) + planner.plan_extension( + self, + node.as_ref(), + &node.inputs(), + &physical_inputs, + ctx_state, + ) } }, )?; @@ -1610,8 +1667,10 @@ mod tests { /// Create a physical plan for an extension node fn plan_extension( &self, + _planner: &dyn PhysicalPlanner, _node: &dyn UserDefinedLogicalNode, - _inputs: &[Arc], + _logical_inputs: &[&LogicalPlan], + _physical_inputs: &[Arc], _ctx_state: &ExecutionContextState, ) -> Result>> { Ok(Some(Arc::new(NoOpExecutionPlan { diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs index 22ebec8b9a994..21b49638d23a1 100644 --- a/datafusion/tests/user_defined_plan.rs +++ b/datafusion/tests/user_defined_plan.rs @@ -321,16 +321,19 @@ impl ExtensionPlanner for TopKPlanner { /// Create a physical plan for an extension node fn plan_extension( &self, + _planner: &dyn PhysicalPlanner, node: &dyn UserDefinedLogicalNode, - inputs: &[Arc], + logical_inputs: &[&LogicalPlan], + physical_inputs: &[Arc], _ctx_state: &ExecutionContextState, ) -> Result>> { Ok( if let Some(topk_node) = node.as_any().downcast_ref::() { - assert_eq!(inputs.len(), 1, "Inconsistent number of inputs"); + assert_eq!(logical_inputs.len(), 1, "Inconsistent number of inputs"); + assert_eq!(physical_inputs.len(), 1, "Inconsistent number of inputs"); // figure out input name Some(Arc::new(TopKExec { - input: inputs[0].clone(), + input: physical_inputs[0].clone(), k: topk_node.k, })) } else {