diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 0b3d50306063..5dd57d1b7079 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -622,7 +622,6 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc { let ctx_state = ExecutionContextState { catalog_list, scalar_functions: Default::default(), - var_provider: Default::default(), aggregate_functions: Default::default(), config: ExecutionConfig::new(), execution_props: ExecutionProps::new(), @@ -632,7 +631,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc { let fun_expr = functions::create_physical_fun( &(&scalar_function).into(), - &ctx_state, + &ctx_state.execution_props, )?; Arc::new(ScalarFunctionExpr::new( diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 6ed8223f0c52..ca86d0f0a019 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -190,7 +190,6 @@ impl ExecutionContext { state: Arc::new(Mutex::new(ExecutionContextState { catalog_list, scalar_functions: HashMap::new(), - var_provider: HashMap::new(), aggregate_functions: HashMap::new(), config, execution_props: ExecutionProps::new(), @@ -324,8 +323,8 @@ impl ExecutionContext { self.state .lock() .unwrap() - .var_provider - .insert(variable_type, provider); + .execution_props + .add_var_provider(variable_type, provider); } /// Registers a scalar UDF within this context. @@ -1115,9 +1114,14 @@ impl ExecutionConfig { /// An instance of this struct is created each time a [`LogicalPlan`] is prepared for /// execution (optimized). If the same plan is optimized multiple times, a new /// `ExecutionProps` is created each time. +/// +/// It is important that this structure be cheap to create as it is +/// done so during predicate pruning and expression simplification #[derive(Clone)] pub struct ExecutionProps { pub(crate) query_execution_start_time: DateTime, + /// providers for scalar variables + pub var_providers: Option>>, } impl Default for ExecutionProps { @@ -1131,6 +1135,7 @@ impl ExecutionProps { pub fn new() -> Self { ExecutionProps { query_execution_start_time: chrono::Utc::now(), + var_providers: None, } } @@ -1139,6 +1144,32 @@ impl ExecutionProps { self.query_execution_start_time = chrono::Utc::now(); &*self } + + /// Registers a variable provider, returning the existing + /// provider, if any + pub fn add_var_provider( + &mut self, + var_type: VarType, + provider: Arc, + ) -> Option> { + let mut var_providers = self.var_providers.take().unwrap_or_else(HashMap::new); + + let old_provider = var_providers.insert(var_type, provider); + + self.var_providers = Some(var_providers); + + old_provider + } + + /// Returns the provider for the var_type, if any + pub fn get_var_provider( + &self, + var_type: VarType, + ) -> Option> { + self.var_providers + .as_ref() + .and_then(|var_providers| var_providers.get(&var_type).map(Arc::clone)) + } } /// Execution context for registering data sources and executing queries @@ -1148,8 +1179,6 @@ pub struct ExecutionContextState { pub catalog_list: Arc, /// Scalar functions that are registered with the context pub scalar_functions: HashMap>, - /// Variable provider that are registered with the context - pub var_provider: HashMap>, /// Aggregate functions registered in the context pub aggregate_functions: HashMap>, /// Context configuration @@ -1174,7 +1203,6 @@ impl ExecutionContextState { ExecutionContextState { catalog_list: Arc::new(MemoryCatalogList::new()), scalar_functions: HashMap::new(), - var_provider: HashMap::new(), aggregate_functions: HashMap::new(), config: ExecutionConfig::new(), execution_props: ExecutionProps::new(), diff --git a/datafusion/src/optimizer/simplify_expressions.rs b/datafusion/src/optimizer/simplify_expressions.rs index 7127a8fa94d6..6f5235e852b7 100644 --- a/datafusion/src/optimizer/simplify_expressions.rs +++ b/datafusion/src/optimizer/simplify_expressions.rs @@ -22,13 +22,13 @@ use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; use crate::error::DataFusionError; -use crate::execution::context::{ExecutionContextState, ExecutionProps}; +use crate::execution::context::ExecutionProps; use crate::logical_plan::{lit, DFSchemaRef, Expr}; use crate::logical_plan::{DFSchema, ExprRewriter, LogicalPlan, RewriteRecursion}; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils; use crate::physical_plan::functions::Volatility; -use crate::physical_plan::planner::DefaultPhysicalPlanner; +use crate::physical_plan::planner::create_physical_expr; use crate::scalar::ScalarValue; use crate::{error::Result, logical_plan::Operator}; @@ -223,7 +223,7 @@ impl SimplifyExpressions { /// let rewritten = expr.rewrite(&mut const_evaluator).unwrap(); /// assert_eq!(rewritten, lit(3) + col("a")); /// ``` -pub struct ConstEvaluator { +pub struct ConstEvaluator<'a> { /// can_evaluate is used during the depth-first-search of the /// Expr tree to track if any siblings (or their descendants) were /// non evaluatable (e.g. had a column reference or volatile @@ -238,13 +238,12 @@ pub struct ConstEvaluator { /// descendants) so this Expr can be evaluated can_evaluate: Vec, - ctx_state: ExecutionContextState, - planner: DefaultPhysicalPlanner, + execution_props: &'a ExecutionProps, input_schema: DFSchema, input_batch: RecordBatch, } -impl ExprRewriter for ConstEvaluator { +impl<'a> ExprRewriter for ConstEvaluator<'a> { fn pre_visit(&mut self, expr: &Expr) -> Result { // Default to being able to evaluate this node self.can_evaluate.push(true); @@ -282,16 +281,11 @@ impl ExprRewriter for ConstEvaluator { } } -impl ConstEvaluator { +impl<'a> ConstEvaluator<'a> { /// Create a new `ConstantEvaluator`. Session constants (such as /// the time for `now()` are taken from the passed /// `execution_props`. - pub fn new(execution_props: &ExecutionProps) -> Self { - let planner = DefaultPhysicalPlanner::default(); - let ctx_state = ExecutionContextState { - execution_props: execution_props.clone(), - ..ExecutionContextState::new() - }; + pub fn new(execution_props: &'a ExecutionProps) -> Self { let input_schema = DFSchema::empty(); // The dummy column name is unused and doesn't matter as only @@ -306,8 +300,7 @@ impl ConstEvaluator { Self { can_evaluate: vec![], - ctx_state, - planner, + execution_props, input_schema, input_batch, } @@ -364,11 +357,11 @@ impl ConstEvaluator { return Ok(s); } - let phys_expr = self.planner.create_physical_expr( + let phys_expr = create_physical_expr( &expr, &self.input_schema, &self.input_batch.schema(), - &self.ctx_state, + self.execution_props, )?; let col_val = phys_expr.evaluate(&self.input_batch)?; match col_val { @@ -1141,6 +1134,7 @@ mod tests { ) { let execution_props = ExecutionProps { query_execution_start_time: *date_time, + var_providers: None, }; let mut const_evaluator = ConstEvaluator::new(&execution_props); @@ -1622,6 +1616,7 @@ mod tests { let rule = SimplifyExpressions::new(); let execution_props = ExecutionProps { query_execution_start_time: *date_time, + var_providers: None, }; let err = rule @@ -1638,6 +1633,7 @@ mod tests { let rule = SimplifyExpressions::new(); let execution_props = ExecutionProps { query_execution_start_time: *date_time, + var_providers: None, }; let optimized_plan = rule diff --git a/datafusion/src/physical_optimizer/pruning.rs b/datafusion/src/physical_optimizer/pruning.rs index 22b854b93a59..7bbffd1546fd 100644 --- a/datafusion/src/physical_optimizer/pruning.rs +++ b/datafusion/src/physical_optimizer/pruning.rs @@ -37,13 +37,14 @@ use arrow::{ record_batch::RecordBatch, }; +use crate::execution::context::ExecutionProps; +use crate::physical_plan::planner::create_physical_expr; use crate::prelude::lit; use crate::{ error::{DataFusionError, Result}, - execution::context::ExecutionContextState, logical_plan::{Column, DFSchema, Expr, Operator}, optimizer::utils, - physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr}, + physical_plan::{ColumnarValue, PhysicalExpr}, }; /// Interface to pass statistics information to [`PruningPredicates`] @@ -129,12 +130,14 @@ impl PruningPredicate { .collect::>(); let stat_schema = Schema::new(stat_fields); let stat_dfschema = DFSchema::try_from(stat_schema.clone())?; - let execution_context_state = ExecutionContextState::new(); - let predicate_expr = DefaultPhysicalPlanner::default().create_physical_expr( + + // TODO allow these properties to be passed in + let execution_props = ExecutionProps::new(); + let predicate_expr = create_physical_expr( &logical_predicate_expr, &stat_dfschema, &stat_schema, - &execution_context_state, + &execution_props, )?; Ok(Self { schema, diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs index 2c1946e9da37..644defce1545 100644 --- a/datafusion/src/physical_plan/functions.rs +++ b/datafusion/src/physical_plan/functions.rs @@ -33,7 +33,7 @@ use super::{ type_coercion::{coerce, data_types}, ColumnarValue, PhysicalExpr, }; -use crate::execution::context::ExecutionContextState; +use crate::execution::context::ExecutionProps; use crate::physical_plan::array_expressions; use crate::physical_plan::datetime_expressions; use crate::physical_plan::expressions::{ @@ -723,7 +723,7 @@ macro_rules! invoke_if_unicode_expressions_feature_flag { /// Create a physical scalar function. pub fn create_physical_fun( fun: &BuiltinScalarFunction, - ctx_state: &ExecutionContextState, + execution_props: &ExecutionProps, ) -> Result { Ok(match fun { // math functions @@ -820,7 +820,7 @@ pub fn create_physical_fun( BuiltinScalarFunction::Now => { // bind value for now at plan time Arc::new(datetime_expressions::make_now( - ctx_state.execution_props.query_execution_start_time, + execution_props.query_execution_start_time, )) } BuiltinScalarFunction::InitCap => Arc::new(|args| match args[0].data_type() { @@ -1157,7 +1157,7 @@ pub fn create_physical_expr( fun: &BuiltinScalarFunction, input_phy_exprs: &[Arc], input_schema: &Schema, - ctx_state: &ExecutionContextState, + execution_props: &ExecutionProps, ) -> Result> { let coerced_phy_exprs = coerce(input_phy_exprs, input_schema, &signature(fun))?; @@ -1254,7 +1254,7 @@ pub fn create_physical_expr( } }), // These don't need args and input schema - _ => create_physical_fun(fun, ctx_state)?, + _ => create_physical_fun(fun, execution_props)?, }; Ok(Arc::new(ScalarFunctionExpr::new( @@ -1720,14 +1720,14 @@ mod tests { ($FUNC:ident, $ARGS:expr, $EXPECTED:expr, $EXPECTED_TYPE:ty, $DATA_TYPE: ident, $ARRAY_TYPE:ident) => { // used to provide type annotation let expected: Result> = $EXPECTED; - let ctx_state = ExecutionContextState::new(); + let execution_props = ExecutionProps::new(); // any type works here: we evaluate against a literal of `value` let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); let columns: Vec = vec![Arc::new(Int32Array::from_slice(&[1]))]; let expr = - create_physical_expr(&BuiltinScalarFunction::$FUNC, $ARGS, &schema, &ctx_state)?; + create_physical_expr(&BuiltinScalarFunction::$FUNC, $ARGS, &schema, &execution_props)?; // type is correct assert_eq!(expr.data_type(&schema)?, DataType::$DATA_TYPE); @@ -3888,7 +3888,7 @@ mod tests { #[test] fn test_empty_arguments_error() -> Result<()> { - let ctx_state = ExecutionContextState::new(); + let execution_props = ExecutionProps::new(); let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); // pick some arbitrary functions to test @@ -3900,7 +3900,7 @@ mod tests { ]; for fun in funs.iter() { - let expr = create_physical_expr(fun, &[], &schema, &ctx_state); + let expr = create_physical_expr(fun, &[], &schema, &execution_props); match expr { Ok(..) => { @@ -3931,13 +3931,13 @@ mod tests { #[test] fn test_empty_arguments() -> Result<()> { - let ctx_state = ExecutionContextState::new(); + let execution_props = ExecutionProps::new(); let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); let funs = [BuiltinScalarFunction::Now, BuiltinScalarFunction::Random]; for fun in funs.iter() { - create_physical_expr(fun, &[], &schema, &ctx_state)?; + create_physical_expr(fun, &[], &schema, &execution_props)?; } Ok(()) } @@ -3954,13 +3954,13 @@ mod tests { Field::new("b", value2.data_type().clone(), false), ]); let columns: Vec = vec![value1, value2]; - let ctx_state = ExecutionContextState::new(); + let execution_props = ExecutionProps::new(); let expr = create_physical_expr( &BuiltinScalarFunction::Array, &[col("a", &schema)?, col("b", &schema)?], &schema, - &ctx_state, + &execution_props, )?; // type is correct @@ -4017,7 +4017,7 @@ mod tests { fn test_regexp_match() -> Result<()> { use arrow::array::ListArray; let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]); - let ctx_state = ExecutionContextState::new(); + let execution_props = ExecutionProps::new(); let col_value: ArrayRef = Arc::new(StringArray::from_slice(&["aaa-555"])); let pattern = lit(ScalarValue::Utf8(Some(r".*-(\d*)".to_string()))); @@ -4026,7 +4026,7 @@ mod tests { &BuiltinScalarFunction::RegexpMatch, &[col("a", &schema)?, pattern], &schema, - &ctx_state, + &execution_props, )?; // type is correct @@ -4056,7 +4056,7 @@ mod tests { fn test_regexp_match_all_literals() -> Result<()> { use arrow::array::ListArray; let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); - let ctx_state = ExecutionContextState::new(); + let execution_props = ExecutionProps::new(); let col_value = lit(ScalarValue::Utf8(Some("aaa-555".to_string()))); let pattern = lit(ScalarValue::Utf8(Some(r".*-(\d*)".to_string()))); @@ -4065,7 +4065,7 @@ mod tests { &BuiltinScalarFunction::RegexpMatch, &[col_value, pattern], &schema, - &ctx_state, + &execution_props, )?; // type is correct diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 226e3f392497..bf8be3df720b 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -22,7 +22,7 @@ use super::{ aggregates, empty::EmptyExec, expressions::binary, functions, hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows, }; -use crate::execution::context::ExecutionContextState; +use crate::execution::context::{ExecutionContextState, ExecutionProps}; use crate::logical_plan::plan::{ Aggregate, EmptyRelation, Filter, Join, Projection, Sort, TableScan, Window, }; @@ -299,12 +299,11 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { input_schema: &Schema, ctx_state: &ExecutionContextState, ) -> Result> { - DefaultPhysicalPlanner::create_physical_expr( - self, + create_physical_expr( expr, input_dfschema, input_schema, - ctx_state, + &ctx_state.execution_props, ) } } @@ -440,7 +439,7 @@ impl DefaultPhysicalPlanner { expr, asc, nulls_first, - } => self.create_physical_sort_expr( + } => create_physical_sort_expr( expr, logical_input_schema, &physical_input_schema, @@ -448,7 +447,7 @@ impl DefaultPhysicalPlanner { descending: !*asc, nulls_first: *nulls_first, }, - ctx_state, + &ctx_state.execution_props, ), _ => unreachable!(), }) @@ -464,11 +463,11 @@ impl DefaultPhysicalPlanner { let window_expr = window_expr .iter() .map(|e| { - self.create_window_expr( + create_window_expr( e, logical_input_schema, &physical_input_schema, - ctx_state, + &ctx_state.execution_props, ) }) .collect::>>()?; @@ -507,11 +506,11 @@ impl DefaultPhysicalPlanner { let aggregates = aggr_expr .iter() .map(|e| { - self.create_aggregate_expr( + create_aggregate_expr( e, logical_input_schema, &physical_input_schema, - ctx_state, + &ctx_state.execution_props, ) }) .collect::>>()?; @@ -688,7 +687,7 @@ impl DefaultPhysicalPlanner { expr, asc, nulls_first, - } => self.create_physical_sort_expr( + } => create_physical_sort_expr( expr, input_dfschema, &input_schema, @@ -696,7 +695,7 @@ impl DefaultPhysicalPlanner { descending: !*asc, nulls_first: *nulls_first, }, - ctx_state, + &ctx_state.execution_props, ), _ => Err(DataFusionError::Plan( "Sort only accepts sort expressions".to_string(), @@ -866,517 +865,487 @@ impl DefaultPhysicalPlanner { exec_plan }.boxed() } +} - /// Create a physical expression from a logical expression - pub fn create_physical_expr( - &self, - e: &Expr, - input_dfschema: &DFSchema, - input_schema: &Schema, - ctx_state: &ExecutionContextState, - ) -> Result> { - match e { - Expr::Alias(expr, ..) => Ok(self.create_physical_expr( - expr, - input_dfschema, - input_schema, - ctx_state, - )?), - Expr::Column(c) => { - let idx = input_dfschema.index_of_column(c)?; - Ok(Arc::new(Column::new(&c.name, idx))) - } - Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))), - Expr::ScalarVariable(variable_names) => { - if &variable_names[0][0..2] == "@@" { - match ctx_state.var_provider.get(&VarType::System) { - Some(provider) => { - let scalar_value = - provider.get_value(variable_names.clone())?; - Ok(Arc::new(Literal::new(scalar_value))) - } - _ => Err(DataFusionError::Plan( - "No system variable provider found".to_string(), - )), +/// Create a physical expression from a logical expression ([Expr]) +pub fn create_physical_expr( + e: &Expr, + input_dfschema: &DFSchema, + input_schema: &Schema, + execution_props: &ExecutionProps, +) -> Result> { + match e { + Expr::Alias(expr, ..) => Ok(create_physical_expr( + expr, + input_dfschema, + input_schema, + execution_props, + )?), + Expr::Column(c) => { + let idx = input_dfschema.index_of_column(c)?; + Ok(Arc::new(Column::new(&c.name, idx))) + } + Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))), + Expr::ScalarVariable(variable_names) => { + if &variable_names[0][0..2] == "@@" { + match execution_props.get_var_provider(VarType::System) { + Some(provider) => { + let scalar_value = provider.get_value(variable_names.clone())?; + Ok(Arc::new(Literal::new(scalar_value))) } - } else { - match ctx_state.var_provider.get(&VarType::UserDefined) { - Some(provider) => { - let scalar_value = - provider.get_value(variable_names.clone())?; - Ok(Arc::new(Literal::new(scalar_value))) - } - _ => Err(DataFusionError::Plan( - "No user defined variable provider found".to_string(), - )), + _ => Err(DataFusionError::Plan( + "No system variable provider found".to_string(), + )), + } + } else { + match execution_props.get_var_provider(VarType::UserDefined) { + Some(provider) => { + let scalar_value = provider.get_value(variable_names.clone())?; + Ok(Arc::new(Literal::new(scalar_value))) } + _ => Err(DataFusionError::Plan( + "No user defined variable provider found".to_string(), + )), } } - Expr::BinaryExpr { left, op, right } => { - let lhs = self.create_physical_expr( - left, - input_dfschema, - input_schema, - ctx_state, - )?; - let rhs = self.create_physical_expr( - right, + } + Expr::BinaryExpr { left, op, right } => { + let lhs = create_physical_expr( + left, + input_dfschema, + input_schema, + execution_props, + )?; + let rhs = create_physical_expr( + right, + input_dfschema, + input_schema, + execution_props, + )?; + binary(lhs, *op, rhs, input_schema) + } + Expr::Case { + expr, + when_then_expr, + else_expr, + .. + } => { + let expr: Option> = if let Some(e) = expr { + Some(create_physical_expr( + e.as_ref(), input_dfschema, input_schema, - ctx_state, - )?; - binary(lhs, *op, rhs, input_schema) - } - Expr::Case { - expr, - when_then_expr, - else_expr, - .. - } => { - let expr: Option> = if let Some(e) = expr { - Some(self.create_physical_expr( - e.as_ref(), + execution_props, + )?) + } else { + None + }; + let when_expr = when_then_expr + .iter() + .map(|(w, _)| { + create_physical_expr( + w.as_ref(), input_dfschema, input_schema, - ctx_state, - )?) - } else { - None - }; - let when_expr = when_then_expr - .iter() - .map(|(w, _)| { - self.create_physical_expr( - w.as_ref(), - input_dfschema, - input_schema, - ctx_state, - ) - }) - .collect::>>()?; - let then_expr = when_then_expr - .iter() - .map(|(_, t)| { - self.create_physical_expr( - t.as_ref(), - input_dfschema, - input_schema, - ctx_state, - ) - }) - .collect::>>()?; - let when_then_expr: Vec<(Arc, Arc)> = - when_expr - .iter() - .zip(then_expr.iter()) - .map(|(w, t)| (w.clone(), t.clone())) - .collect(); - let else_expr: Option> = if let Some(e) = else_expr - { - Some(self.create_physical_expr( - e.as_ref(), + execution_props, + ) + }) + .collect::>>()?; + let then_expr = when_then_expr + .iter() + .map(|(_, t)| { + create_physical_expr( + t.as_ref(), input_dfschema, input_schema, - ctx_state, - )?) - } else { - None - }; - Ok(Arc::new(CaseExpr::try_new( - expr, - &when_then_expr, - else_expr, - )?)) - } - Expr::Cast { expr, data_type } => expressions::cast( - self.create_physical_expr(expr, input_dfschema, input_schema, ctx_state)?, - input_schema, - data_type.clone(), - ), - Expr::TryCast { expr, data_type } => expressions::try_cast( - self.create_physical_expr(expr, input_dfschema, input_schema, ctx_state)?, - input_schema, - data_type.clone(), - ), - Expr::Not(expr) => expressions::not( - self.create_physical_expr(expr, input_dfschema, input_schema, ctx_state)?, - input_schema, - ), - Expr::Negative(expr) => expressions::negative( - self.create_physical_expr(expr, input_dfschema, input_schema, ctx_state)?, - input_schema, - ), - Expr::IsNull(expr) => expressions::is_null(self.create_physical_expr( + execution_props, + ) + }) + .collect::>>()?; + let when_then_expr: Vec<(Arc, Arc)> = + when_expr + .iter() + .zip(then_expr.iter()) + .map(|(w, t)| (w.clone(), t.clone())) + .collect(); + let else_expr: Option> = if let Some(e) = else_expr { + Some(create_physical_expr( + e.as_ref(), + input_dfschema, + input_schema, + execution_props, + )?) + } else { + None + }; + Ok(Arc::new(CaseExpr::try_new( expr, - input_dfschema, + &when_then_expr, + else_expr, + )?)) + } + Expr::Cast { expr, data_type } => expressions::cast( + create_physical_expr(expr, input_dfschema, input_schema, execution_props)?, + input_schema, + data_type.clone(), + ), + Expr::TryCast { expr, data_type } => expressions::try_cast( + create_physical_expr(expr, input_dfschema, input_schema, execution_props)?, + input_schema, + data_type.clone(), + ), + Expr::Not(expr) => expressions::not( + create_physical_expr(expr, input_dfschema, input_schema, execution_props)?, + input_schema, + ), + Expr::Negative(expr) => expressions::negative( + create_physical_expr(expr, input_dfschema, input_schema, execution_props)?, + input_schema, + ), + Expr::IsNull(expr) => expressions::is_null(create_physical_expr( + expr, + input_dfschema, + input_schema, + execution_props, + )?), + Expr::IsNotNull(expr) => expressions::is_not_null(create_physical_expr( + expr, + input_dfschema, + input_schema, + execution_props, + )?), + Expr::GetIndexedField { expr, key } => Ok(Arc::new(GetIndexedFieldExpr::new( + create_physical_expr(expr, input_dfschema, input_schema, execution_props)?, + key.clone(), + ))), + + Expr::ScalarFunction { fun, args } => { + let physical_args = args + .iter() + .map(|e| { + create_physical_expr(e, input_dfschema, input_schema, execution_props) + }) + .collect::>>()?; + functions::create_physical_expr( + fun, + &physical_args, input_schema, - ctx_state, - )?), - Expr::IsNotNull(expr) => expressions::is_not_null( - self.create_physical_expr(expr, input_dfschema, input_schema, ctx_state)?, - ), - Expr::GetIndexedField { expr, key } => { - Ok(Arc::new(GetIndexedFieldExpr::new( - self.create_physical_expr( - expr, - input_dfschema, - input_schema, - ctx_state, - )?, - key.clone(), - ))) - } - - Expr::ScalarFunction { fun, args } => { - let physical_args = args - .iter() - .map(|e| { - self.create_physical_expr( - e, - input_dfschema, - input_schema, - ctx_state, - ) - }) - .collect::>>()?; - functions::create_physical_expr( - fun, - &physical_args, + execution_props, + ) + } + Expr::ScalarUDF { fun, args } => { + let mut physical_args = vec![]; + for e in args { + physical_args.push(create_physical_expr( + e, + input_dfschema, input_schema, - ctx_state, - ) + execution_props, + )?); } - Expr::ScalarUDF { fun, args } => { - let mut physical_args = vec![]; - for e in args { - physical_args.push(self.create_physical_expr( - e, - input_dfschema, - input_schema, - ctx_state, - )?); - } - udf::create_physical_expr( - fun.clone().as_ref(), - &physical_args, - input_schema, - ) - } - Expr::Between { + udf::create_physical_expr(fun.clone().as_ref(), &physical_args, input_schema) + } + Expr::Between { + expr, + negated, + low, + high, + } => { + let value_expr = create_physical_expr( expr, - negated, - low, + input_dfschema, + input_schema, + execution_props, + )?; + let low_expr = + create_physical_expr(low, input_dfschema, input_schema, execution_props)?; + let high_expr = create_physical_expr( high, - } => { - let value_expr = self.create_physical_expr( + input_dfschema, + input_schema, + execution_props, + )?; + + // rewrite the between into the two binary operators + let binary_expr = binary( + binary(value_expr.clone(), Operator::GtEq, low_expr, input_schema)?, + Operator::And, + binary(value_expr.clone(), Operator::LtEq, high_expr, input_schema)?, + input_schema, + ); + + if *negated { + expressions::not(binary_expr?, input_schema) + } else { + binary_expr + } + } + Expr::InList { + expr, + list, + negated, + } => match expr.as_ref() { + Expr::Literal(ScalarValue::Utf8(None)) => { + Ok(expressions::lit(ScalarValue::Boolean(None))) + } + _ => { + let value_expr = create_physical_expr( expr, input_dfschema, input_schema, - ctx_state, + execution_props, )?; - let low_expr = self.create_physical_expr( - low, - input_dfschema, - input_schema, - ctx_state, - )?; - let high_expr = self.create_physical_expr( - high, - input_dfschema, - input_schema, - ctx_state, - )?; - - // rewrite the between into the two binary operators - let binary_expr = binary( - binary(value_expr.clone(), Operator::GtEq, low_expr, input_schema)?, - Operator::And, - binary(value_expr.clone(), Operator::LtEq, high_expr, input_schema)?, - input_schema, - ); - - if *negated { - expressions::not(binary_expr?, input_schema) - } else { - binary_expr - } - } - Expr::InList { - expr, - list, - negated, - } => match expr.as_ref() { - Expr::Literal(ScalarValue::Utf8(None)) => { - Ok(expressions::lit(ScalarValue::Boolean(None))) - } - _ => { - let value_expr = self.create_physical_expr( - expr, - input_dfschema, - input_schema, - ctx_state, - )?; - let value_expr_data_type = value_expr.data_type(input_schema)?; + let value_expr_data_type = value_expr.data_type(input_schema)?; - let list_exprs = list - .iter() - .map(|expr| match expr { - Expr::Literal(ScalarValue::Utf8(None)) => self - .create_physical_expr( - expr, - input_dfschema, - input_schema, - ctx_state, - ), - _ => { - let list_expr = self.create_physical_expr( - expr, - input_dfschema, - input_schema, - ctx_state, - )?; - let list_expr_data_type = - list_expr.data_type(input_schema)?; - - if list_expr_data_type == value_expr_data_type { - Ok(list_expr) - } else if can_cast_types( - &list_expr_data_type, - &value_expr_data_type, - ) { - expressions::cast( - list_expr, - input_schema, - value_expr.data_type(input_schema)?, - ) - } else { - Err(DataFusionError::Plan(format!( - "Unsupported CAST from {:?} to {:?}", - list_expr_data_type, value_expr_data_type - ))) - } - } - }) - .collect::>>()?; - - expressions::in_list(value_expr, list_exprs, negated) - } - }, - other => Err(DataFusionError::NotImplemented(format!( - "Physical plan does not support logical expression {:?}", - other - ))), - } - } - - /// Create a window expression with a name from a logical expression - pub fn create_window_expr_with_name( - &self, - e: &Expr, - name: impl Into, - logical_input_schema: &DFSchema, - physical_input_schema: &Schema, - ctx_state: &ExecutionContextState, - ) -> Result> { - let name = name.into(); - match e { - Expr::WindowFunction { - fun, - args, - partition_by, - order_by, - window_frame, - } => { - let args = args + let list_exprs = list .iter() - .map(|e| { - self.create_physical_expr( - e, - logical_input_schema, - physical_input_schema, - ctx_state, - ) - }) - .collect::>>()?; - let partition_by = partition_by - .iter() - .map(|e| { - self.create_physical_expr( - e, - logical_input_schema, - physical_input_schema, - ctx_state, - ) - }) - .collect::>>()?; - let order_by = order_by - .iter() - .map(|e| match e { - Expr::Sort { + .map(|expr| match expr { + Expr::Literal(ScalarValue::Utf8(None)) => create_physical_expr( expr, - asc, - nulls_first, - } => self.create_physical_sort_expr( - expr, - logical_input_schema, - physical_input_schema, - SortOptions { - descending: !*asc, - nulls_first: *nulls_first, - }, - ctx_state, + input_dfschema, + input_schema, + execution_props, ), - _ => Err(DataFusionError::Plan( - "Sort only accepts sort expressions".to_string(), - )), + _ => { + let list_expr = create_physical_expr( + expr, + input_dfschema, + input_schema, + execution_props, + )?; + let list_expr_data_type = + list_expr.data_type(input_schema)?; + + if list_expr_data_type == value_expr_data_type { + Ok(list_expr) + } else if can_cast_types( + &list_expr_data_type, + &value_expr_data_type, + ) { + expressions::cast( + list_expr, + input_schema, + value_expr.data_type(input_schema)?, + ) + } else { + Err(DataFusionError::Plan(format!( + "Unsupported CAST from {:?} to {:?}", + list_expr_data_type, value_expr_data_type + ))) + } + } }) .collect::>>()?; - if window_frame.is_some() { - return Err(DataFusionError::NotImplemented( - "window expression with window frame definition is not yet supported" - .to_owned(), - )); - } - windows::create_window_expr( - fun, - name, - &args, - &partition_by, - &order_by, - *window_frame, - physical_input_schema, - ) + + expressions::in_list(value_expr, list_exprs, negated) } - other => Err(DataFusionError::Internal(format!( - "Invalid window expression '{:?}'", - other - ))), - } + }, + other => Err(DataFusionError::NotImplemented(format!( + "Physical plan does not support logical expression {:?}", + other + ))), } +} - /// Create a window expression from a logical expression or an alias - pub fn create_window_expr( - &self, - e: &Expr, - logical_input_schema: &DFSchema, - physical_input_schema: &Schema, - ctx_state: &ExecutionContextState, - ) -> Result> { - // unpack aliased logical expressions, e.g. "sum(col) over () as total" - let (name, e) = match e { - Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()), - _ => (physical_name(e)?, e), - }; - self.create_window_expr_with_name( - e, - name, - logical_input_schema, - physical_input_schema, - ctx_state, - ) +/// Create a window expression with a name from a logical expression +pub fn create_window_expr_with_name( + e: &Expr, + name: impl Into, + logical_input_schema: &DFSchema, + physical_input_schema: &Schema, + execution_props: &ExecutionProps, +) -> Result> { + let name = name.into(); + match e { + Expr::WindowFunction { + fun, + args, + partition_by, + order_by, + window_frame, + } => { + let args = args + .iter() + .map(|e| { + create_physical_expr( + e, + logical_input_schema, + physical_input_schema, + execution_props, + ) + }) + .collect::>>()?; + let partition_by = partition_by + .iter() + .map(|e| { + create_physical_expr( + e, + logical_input_schema, + physical_input_schema, + execution_props, + ) + }) + .collect::>>()?; + let order_by = order_by + .iter() + .map(|e| match e { + Expr::Sort { + expr, + asc, + nulls_first, + } => create_physical_sort_expr( + expr, + logical_input_schema, + physical_input_schema, + SortOptions { + descending: !*asc, + nulls_first: *nulls_first, + }, + execution_props, + ), + _ => Err(DataFusionError::Plan( + "Sort only accepts sort expressions".to_string(), + )), + }) + .collect::>>()?; + if window_frame.is_some() { + return Err(DataFusionError::NotImplemented( + "window expression with window frame definition is not yet supported" + .to_owned(), + )); + } + windows::create_window_expr( + fun, + name, + &args, + &partition_by, + &order_by, + *window_frame, + physical_input_schema, + ) + } + other => Err(DataFusionError::Internal(format!( + "Invalid window expression '{:?}'", + other + ))), } +} - /// Create an aggregate expression with a name from a logical expression - pub fn create_aggregate_expr_with_name( - &self, - e: &Expr, - name: impl Into, - logical_input_schema: &DFSchema, - physical_input_schema: &Schema, - ctx_state: &ExecutionContextState, - ) -> Result> { - match e { - Expr::AggregateFunction { +/// Create a window expression from a logical expression or an alias +pub fn create_window_expr( + e: &Expr, + logical_input_schema: &DFSchema, + physical_input_schema: &Schema, + execution_props: &ExecutionProps, +) -> Result> { + // unpack aliased logical expressions, e.g. "sum(col) over () as total" + let (name, e) = match e { + Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()), + _ => (physical_name(e)?, e), + }; + create_window_expr_with_name( + e, + name, + logical_input_schema, + physical_input_schema, + execution_props, + ) +} + +/// Create an aggregate expression with a name from a logical expression +pub fn create_aggregate_expr_with_name( + e: &Expr, + name: impl Into, + logical_input_schema: &DFSchema, + physical_input_schema: &Schema, + execution_props: &ExecutionProps, +) -> Result> { + match e { + Expr::AggregateFunction { + fun, + distinct, + args, + .. + } => { + let args = args + .iter() + .map(|e| { + create_physical_expr( + e, + logical_input_schema, + physical_input_schema, + execution_props, + ) + }) + .collect::>>()?; + aggregates::create_aggregate_expr( fun, - distinct, - args, - .. - } => { - let args = args - .iter() - .map(|e| { - self.create_physical_expr( - e, - logical_input_schema, - physical_input_schema, - ctx_state, - ) - }) - .collect::>>()?; - aggregates::create_aggregate_expr( - fun, - *distinct, - &args, - physical_input_schema, - name, - ) - } - Expr::AggregateUDF { fun, args, .. } => { - let args = args - .iter() - .map(|e| { - self.create_physical_expr( - e, - logical_input_schema, - physical_input_schema, - ctx_state, - ) - }) - .collect::>>()?; + *distinct, + &args, + physical_input_schema, + name, + ) + } + Expr::AggregateUDF { fun, args, .. } => { + let args = args + .iter() + .map(|e| { + create_physical_expr( + e, + logical_input_schema, + physical_input_schema, + execution_props, + ) + }) + .collect::>>()?; - udaf::create_aggregate_expr(fun, &args, physical_input_schema, name) - } - other => Err(DataFusionError::Internal(format!( - "Invalid aggregate expression '{:?}'", - other - ))), + udaf::create_aggregate_expr(fun, &args, physical_input_schema, name) } + other => Err(DataFusionError::Internal(format!( + "Invalid aggregate expression '{:?}'", + other + ))), } +} - /// Create an aggregate expression from a logical expression or an alias - pub fn create_aggregate_expr( - &self, - e: &Expr, - logical_input_schema: &DFSchema, - physical_input_schema: &Schema, - ctx_state: &ExecutionContextState, - ) -> Result> { - // unpack (nested) aliased logical expressions, e.g. "sum(col) as total" - let (name, e) = match e { - Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()), - _ => (physical_name(e)?, e), - }; - - self.create_aggregate_expr_with_name( - e, - name, - logical_input_schema, - physical_input_schema, - ctx_state, - ) - } +/// Create an aggregate expression from a logical expression or an alias +pub fn create_aggregate_expr( + e: &Expr, + logical_input_schema: &DFSchema, + physical_input_schema: &Schema, + execution_props: &ExecutionProps, +) -> Result> { + // unpack (nested) aliased logical expressions, e.g. "sum(col) as total" + let (name, e) = match e { + Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()), + _ => (physical_name(e)?, e), + }; - /// Create a physical sort expression from a logical expression - pub fn create_physical_sort_expr( - &self, - e: &Expr, - input_dfschema: &DFSchema, - input_schema: &Schema, - options: SortOptions, - ctx_state: &ExecutionContextState, - ) -> Result { - Ok(PhysicalSortExpr { - expr: self.create_physical_expr( - e, - input_dfschema, - input_schema, - ctx_state, - )?, - options, - }) - } + create_aggregate_expr_with_name( + e, + name, + logical_input_schema, + physical_input_schema, + execution_props, + ) +} + +/// Create a physical sort expression from a logical expression +pub fn create_physical_sort_expr( + e: &Expr, + input_dfschema: &DFSchema, + input_schema: &Schema, + options: SortOptions, + execution_props: &ExecutionProps, +) -> Result { + Ok(PhysicalSortExpr { + expr: create_physical_expr(e, input_dfschema, input_schema, execution_props)?, + options, + }) +} +impl DefaultPhysicalPlanner { /// Handles capturing the various plans for EXPLAIN queries /// /// Returns