Skip to content

Commit

Permalink
Update API for extension planning to include logical plan (#643)
Browse files Browse the repository at this point in the history
* Update API for extension planning to include logical plan

* Review comments
  • Loading branch information
alamb authored Jul 1, 2021
1 parent fddab22 commit 03cfcb2
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 25 deletions.
10 changes: 10 additions & 0 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3376,6 +3376,16 @@ mod tests {
"query not supported".to_string(),
))
}

fn create_physical_expr(
&self,
_expr: &Expr,
_input_dfschema: &crate::logical_plan::DFSchema,
_input_schema: &Schema,
_ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn crate::physical_plan::PhysicalExpr>> {
unimplemented!()
}
}

struct MyQueryPlanner {}
Expand Down
14 changes: 2 additions & 12 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
use self::{
coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan,
};
use crate::execution::context::ExecutionContextState;
use crate::logical_plan::LogicalPlan;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::{
error::{DataFusionError, Result},
Expand Down Expand Up @@ -122,16 +120,8 @@ impl SQLMetric {
}
}

/// Physical query planner that converts a `LogicalPlan` to an
/// `ExecutionPlan` suitable for execution.
pub trait PhysicalPlanner {
/// Create a physical plan from a logical plan
fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>>;
}
/// Physical planner interface
pub use self::planner::PhysicalPlanner;

/// `ExecutionPlan` represent nodes in the DataFusion Physical Plan.
///
Expand Down
85 changes: 75 additions & 10 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ use crate::physical_plan::sort::SortExec;
use crate::physical_plan::udf;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::{hash_utils, Partitioning};
use crate::physical_plan::{
AggregateExpr, ExecutionPlan, PhysicalExpr, PhysicalPlanner, WindowExpr,
};
use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, WindowExpr};
use crate::prelude::JoinType;
use crate::scalar::ScalarValue;
use crate::sql::utils::{generate_sort_key, window_expr_common_partition_keys};
Expand Down Expand Up @@ -172,16 +170,51 @@ fn physical_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
}
}

/// Physical query planner that converts a `LogicalPlan` to an
/// `ExecutionPlan` suitable for execution.
pub trait PhysicalPlanner {
/// Create a physical plan from a logical plan
fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>>;

/// Create a physical expression from a logical expression
/// suitable for evaluation
///
/// `expr`: the expression to convert
///
/// `input_dfschema`: the logical plan schema for evaluating `e`
///
/// `input_schema`: the physical schema for evaluating `e`
fn create_physical_expr(
&self,
expr: &Expr,
input_dfschema: &DFSchema,
input_schema: &Schema,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn PhysicalExpr>>;
}

/// This trait exposes the ability to plan an [`ExecutionPlan`] out of a [`LogicalPlan`].
pub trait ExtensionPlanner {
/// Create a physical plan for a [`UserDefinedLogicalNode`].
/// This errors when the planner knows how to plan the concrete implementation of `node`
/// but errors while doing so, and `None` when the planner does not know how to plan the `node`
/// and wants to delegate the planning to another [`ExtensionPlanner`].
///
/// `input_dfschema`: the logical plan schema for the inputs to this node
///
/// Returns an error when the planner knows how to plan the concrete
/// implementation of `node` but errors while doing so.
///
/// Returns `None` when the planner does not know how to plan the
/// `node` and wants to delegate the planning to another
/// [`ExtensionPlanner`].
fn plan_extension(
&self,
planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
inputs: &[Arc<dyn ExecutionPlan>],
logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
ctx_state: &ExecutionContextState,
) -> Result<Option<Arc<dyn ExecutionPlan>>>;
}
Expand Down Expand Up @@ -210,6 +243,30 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
let plan = self.create_initial_plan(logical_plan, ctx_state)?;
self.optimize_plan(plan, ctx_state)
}

/// Create a physical expression from a logical expression
/// suitable for evaluation
///
/// `e`: the expression to convert
///
/// `input_dfschema`: the logical plan schema for evaluating `e`
///
/// `input_schema`: the physical schema for evaluating `e`
fn create_physical_expr(
&self,
expr: &Expr,
input_dfschema: &DFSchema,
input_schema: &Schema,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn PhysicalExpr>> {
DefaultPhysicalPlanner::create_physical_expr(
self,
expr,
input_dfschema,
input_schema,
ctx_state,
)
}
}

impl DefaultPhysicalPlanner {
Expand Down Expand Up @@ -721,7 +778,7 @@ impl DefaultPhysicalPlanner {
)))
}
LogicalPlan::Extension { node } => {
let inputs = node
let physical_inputs = node
.inputs()
.into_iter()
.map(|input_plan| self.create_initial_plan(input_plan, ctx_state))
Expand All @@ -733,7 +790,13 @@ impl DefaultPhysicalPlanner {
if let Some(plan) = maybe_plan {
Ok(Some(plan))
} else {
planner.plan_extension(node.as_ref(), &inputs, ctx_state)
planner.plan_extension(
self,
node.as_ref(),
&node.inputs(),
&physical_inputs,
ctx_state,
)
}
},
)?;
Expand Down Expand Up @@ -1644,8 +1707,10 @@ mod tests {
/// Create a physical plan for an extension node
fn plan_extension(
&self,
_planner: &dyn PhysicalPlanner,
_node: &dyn UserDefinedLogicalNode,
_inputs: &[Arc<dyn ExecutionPlan>],
_logical_inputs: &[&LogicalPlan],
_physical_inputs: &[Arc<dyn ExecutionPlan>],
_ctx_state: &ExecutionContextState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
Ok(Some(Arc::new(NoOpExecutionPlan {
Expand Down
9 changes: 6 additions & 3 deletions datafusion/tests/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,16 +321,19 @@ impl ExtensionPlanner for TopKPlanner {
/// Create a physical plan for an extension node
fn plan_extension(
&self,
_planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
inputs: &[Arc<dyn ExecutionPlan>],
logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
_ctx_state: &ExecutionContextState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
Ok(
if let Some(topk_node) = node.as_any().downcast_ref::<TopKPlanNode>() {
assert_eq!(inputs.len(), 1, "Inconsistent number of inputs");
assert_eq!(logical_inputs.len(), 1, "Inconsistent number of inputs");
assert_eq!(physical_inputs.len(), 1, "Inconsistent number of inputs");
// figure out input name
Some(Arc::new(TopKExec {
input: inputs[0].clone(),
input: physical_inputs[0].clone(),
k: topk_node.k,
}))
} else {
Expand Down

0 comments on commit 03cfcb2

Please sign in to comment.