Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support create_physical_expr and ExecutionContextState or DefaultPhysicalPlanner for faster speed #1700

Merged
merged 4 commits into from
Jan 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,6 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {
let ctx_state = ExecutionContextState {
catalog_list,
scalar_functions: Default::default(),
var_provider: Default::default(),
aggregate_functions: Default::default(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
Expand All @@ -632,7 +631,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {

let fun_expr = functions::create_physical_fun(
&(&scalar_function).into(),
&ctx_state,
&ctx_state.execution_props,
)?;

Arc::new(ScalarFunctionExpr::new(
Expand Down
40 changes: 34 additions & 6 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ impl ExecutionContext {
state: Arc::new(Mutex::new(ExecutionContextState {
catalog_list,
scalar_functions: HashMap::new(),
var_provider: HashMap::new(),
aggregate_functions: HashMap::new(),
config,
execution_props: ExecutionProps::new(),
Expand Down Expand Up @@ -324,8 +323,8 @@ impl ExecutionContext {
self.state
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pub fn register_variable( API remains the same

.lock()
.unwrap()
.var_provider
.insert(variable_type, provider);
.execution_props
.add_var_provider(variable_type, provider);
}

/// Registers a scalar UDF within this context.
Expand Down Expand Up @@ -1115,9 +1114,14 @@ impl ExecutionConfig {
/// An instance of this struct is created each time a [`LogicalPlan`] is prepared for
/// execution (optimized). If the same plan is optimized multiple times, a new
/// `ExecutionProps` is created each time.
///
/// It is important that this structure be cheap to create as it is
/// done so during predicate pruning and expression simplification
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

#[derive(Clone)]
pub struct ExecutionProps {
pub(crate) query_execution_start_time: DateTime<Utc>,
/// providers for scalar variables
pub var_providers: Option<HashMap<VarType, Arc<dyn VarProvider + Send + Sync>>>,
}

impl Default for ExecutionProps {
Expand All @@ -1131,6 +1135,7 @@ impl ExecutionProps {
pub fn new() -> Self {
ExecutionProps {
query_execution_start_time: chrono::Utc::now(),
var_providers: None,
}
}

Expand All @@ -1139,6 +1144,32 @@ impl ExecutionProps {
self.query_execution_start_time = chrono::Utc::now();
&*self
}

/// Registers a variable provider, returning the existing
/// provider, if any
pub fn add_var_provider(
&mut self,
var_type: VarType,
provider: Arc<dyn VarProvider + Send + Sync>,
) -> Option<Arc<dyn VarProvider + Send + Sync>> {
let mut var_providers = self.var_providers.take().unwrap_or_else(HashMap::new);

let old_provider = var_providers.insert(var_type, provider);

self.var_providers = Some(var_providers);

old_provider
}

/// Returns the provider for the var_type, if any
pub fn get_var_provider(
&self,
var_type: VarType,
) -> Option<Arc<dyn VarProvider + Send + Sync>> {
self.var_providers
.as_ref()
.and_then(|var_providers| var_providers.get(&var_type).map(Arc::clone))
}
}

/// Execution context for registering data sources and executing queries
Expand All @@ -1148,8 +1179,6 @@ pub struct ExecutionContextState {
pub catalog_list: Arc<dyn CatalogList>,
/// Scalar functions that are registered with the context
pub scalar_functions: HashMap<String, Arc<ScalarUDF>>,
/// Variable provider that are registered with the context
pub var_provider: HashMap<VarType, Arc<dyn VarProvider + Send + Sync>>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while technically this is a breaking change, the ExecutionContext::register_variable remains the same which I think means this change will be minimally disruptive to people using DataFusion

/// Aggregate functions registered in the context
pub aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
/// Context configuration
Expand All @@ -1174,7 +1203,6 @@ impl ExecutionContextState {
ExecutionContextState {
catalog_list: Arc::new(MemoryCatalogList::new()),
scalar_functions: HashMap::new(),
var_provider: HashMap::new(),
aggregate_functions: HashMap::new(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
Expand Down
30 changes: 13 additions & 17 deletions datafusion/src/optimizer/simplify_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;

use crate::error::DataFusionError;
use crate::execution::context::{ExecutionContextState, ExecutionProps};
use crate::execution::context::ExecutionProps;
use crate::logical_plan::{lit, DFSchemaRef, Expr};
use crate::logical_plan::{DFSchema, ExprRewriter, LogicalPlan, RewriteRecursion};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
use crate::physical_plan::functions::Volatility;
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::planner::create_physical_expr;
use crate::scalar::ScalarValue;
use crate::{error::Result, logical_plan::Operator};

Expand Down Expand Up @@ -223,7 +223,7 @@ impl SimplifyExpressions {
/// let rewritten = expr.rewrite(&mut const_evaluator).unwrap();
/// assert_eq!(rewritten, lit(3) + col("a"));
/// ```
pub struct ConstEvaluator {
pub struct ConstEvaluator<'a> {
/// can_evaluate is used during the depth-first-search of the
/// Expr tree to track if any siblings (or their descendants) were
/// non evaluatable (e.g. had a column reference or volatile
Expand All @@ -238,13 +238,12 @@ pub struct ConstEvaluator {
/// descendants) so this Expr can be evaluated
can_evaluate: Vec<bool>,

ctx_state: ExecutionContextState,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is reason 1 for this change: We no longer have to create a whole new ctx_state and planner for each potential constant evaluation 🎉

planner: DefaultPhysicalPlanner,
execution_props: &'a ExecutionProps,
input_schema: DFSchema,
input_batch: RecordBatch,
}

impl ExprRewriter for ConstEvaluator {
impl<'a> ExprRewriter for ConstEvaluator<'a> {
fn pre_visit(&mut self, expr: &Expr) -> Result<RewriteRecursion> {
// Default to being able to evaluate this node
self.can_evaluate.push(true);
Expand Down Expand Up @@ -282,16 +281,11 @@ impl ExprRewriter for ConstEvaluator {
}
}

impl ConstEvaluator {
impl<'a> ConstEvaluator<'a> {
/// Create a new `ConstantEvaluator`. Session constants (such as
/// the time for `now()` are taken from the passed
/// `execution_props`.
pub fn new(execution_props: &ExecutionProps) -> Self {
let planner = DefaultPhysicalPlanner::default();
let ctx_state = ExecutionContextState {
execution_props: execution_props.clone(),
..ExecutionContextState::new()
};
pub fn new(execution_props: &'a ExecutionProps) -> Self {
let input_schema = DFSchema::empty();

// The dummy column name is unused and doesn't matter as only
Expand All @@ -306,8 +300,7 @@ impl ConstEvaluator {

Self {
can_evaluate: vec![],
ctx_state,
planner,
execution_props,
input_schema,
input_batch,
}
Expand Down Expand Up @@ -364,11 +357,11 @@ impl ConstEvaluator {
return Ok(s);
}

let phys_expr = self.planner.create_physical_expr(
let phys_expr = create_physical_expr(
&expr,
&self.input_schema,
&self.input_batch.schema(),
&self.ctx_state,
self.execution_props,
)?;
let col_val = phys_expr.evaluate(&self.input_batch)?;
match col_val {
Expand Down Expand Up @@ -1141,6 +1134,7 @@ mod tests {
) {
let execution_props = ExecutionProps {
query_execution_start_time: *date_time,
var_providers: None,
};

let mut const_evaluator = ConstEvaluator::new(&execution_props);
Expand Down Expand Up @@ -1622,6 +1616,7 @@ mod tests {
let rule = SimplifyExpressions::new();
let execution_props = ExecutionProps {
query_execution_start_time: *date_time,
var_providers: None,
};

let err = rule
Expand All @@ -1638,6 +1633,7 @@ mod tests {
let rule = SimplifyExpressions::new();
let execution_props = ExecutionProps {
query_execution_start_time: *date_time,
var_providers: None,
};

let optimized_plan = rule
Expand Down
13 changes: 8 additions & 5 deletions datafusion/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ use arrow::{
record_batch::RecordBatch,
};

use crate::execution::context::ExecutionProps;
use crate::physical_plan::planner::create_physical_expr;
use crate::prelude::lit;
use crate::{
error::{DataFusionError, Result},
execution::context::ExecutionContextState,
logical_plan::{Column, DFSchema, Expr, Operator},
optimizer::utils,
physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr},
physical_plan::{ColumnarValue, PhysicalExpr},
};

/// Interface to pass statistics information to [`PruningPredicates`]
Expand Down Expand Up @@ -129,12 +130,14 @@ impl PruningPredicate {
.collect::<Vec<_>>();
let stat_schema = Schema::new(stat_fields);
let stat_dfschema = DFSchema::try_from(stat_schema.clone())?;
let execution_context_state = ExecutionContextState::new();
let predicate_expr = DefaultPhysicalPlanner::default().create_physical_expr(

// TODO allow these properties to be passed in
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is reason 2 for this change: again we save a whole new ctx_state and planner for each potential constant evaluation 🎉

let execution_props = ExecutionProps::new();
let predicate_expr = create_physical_expr(
&logical_predicate_expr,
&stat_dfschema,
&stat_schema,
&execution_context_state,
&execution_props,
)?;
Ok(Self {
schema,
Expand Down
34 changes: 17 additions & 17 deletions datafusion/src/physical_plan/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use super::{
type_coercion::{coerce, data_types},
ColumnarValue, PhysicalExpr,
};
use crate::execution::context::ExecutionContextState;
use crate::execution::context::ExecutionProps;
use crate::physical_plan::array_expressions;
use crate::physical_plan::datetime_expressions;
use crate::physical_plan::expressions::{
Expand Down Expand Up @@ -723,7 +723,7 @@ macro_rules! invoke_if_unicode_expressions_feature_flag {
/// Create a physical scalar function.
pub fn create_physical_fun(
fun: &BuiltinScalarFunction,
ctx_state: &ExecutionContextState,
execution_props: &ExecutionProps,
) -> Result<ScalarFunctionImplementation> {
Ok(match fun {
// math functions
Expand Down Expand Up @@ -820,7 +820,7 @@ pub fn create_physical_fun(
BuiltinScalarFunction::Now => {
// bind value for now at plan time
Arc::new(datetime_expressions::make_now(
ctx_state.execution_props.query_execution_start_time,
execution_props.query_execution_start_time,
))
}
BuiltinScalarFunction::InitCap => Arc::new(|args| match args[0].data_type() {
Expand Down Expand Up @@ -1157,7 +1157,7 @@ pub fn create_physical_expr(
fun: &BuiltinScalarFunction,
input_phy_exprs: &[Arc<dyn PhysicalExpr>],
input_schema: &Schema,
ctx_state: &ExecutionContextState,
execution_props: &ExecutionProps,
) -> Result<Arc<dyn PhysicalExpr>> {
let coerced_phy_exprs = coerce(input_phy_exprs, input_schema, &signature(fun))?;

Expand Down Expand Up @@ -1254,7 +1254,7 @@ pub fn create_physical_expr(
}
}),
// These don't need args and input schema
_ => create_physical_fun(fun, ctx_state)?,
_ => create_physical_fun(fun, execution_props)?,
};

Ok(Arc::new(ScalarFunctionExpr::new(
Expand Down Expand Up @@ -1720,14 +1720,14 @@ mod tests {
($FUNC:ident, $ARGS:expr, $EXPECTED:expr, $EXPECTED_TYPE:ty, $DATA_TYPE: ident, $ARRAY_TYPE:ident) => {
// used to provide type annotation
let expected: Result<Option<$EXPECTED_TYPE>> = $EXPECTED;
let ctx_state = ExecutionContextState::new();
let execution_props = ExecutionProps::new();

// any type works here: we evaluate against a literal of `value`
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let columns: Vec<ArrayRef> = vec![Arc::new(Int32Array::from_slice(&[1]))];

let expr =
create_physical_expr(&BuiltinScalarFunction::$FUNC, $ARGS, &schema, &ctx_state)?;
create_physical_expr(&BuiltinScalarFunction::$FUNC, $ARGS, &schema, &execution_props)?;

// type is correct
assert_eq!(expr.data_type(&schema)?, DataType::$DATA_TYPE);
Expand Down Expand Up @@ -3888,7 +3888,7 @@ mod tests {

#[test]
fn test_empty_arguments_error() -> Result<()> {
let ctx_state = ExecutionContextState::new();
let execution_props = ExecutionProps::new();
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);

// pick some arbitrary functions to test
Expand All @@ -3900,7 +3900,7 @@ mod tests {
];

for fun in funs.iter() {
let expr = create_physical_expr(fun, &[], &schema, &ctx_state);
let expr = create_physical_expr(fun, &[], &schema, &execution_props);

match expr {
Ok(..) => {
Expand Down Expand Up @@ -3931,13 +3931,13 @@ mod tests {

#[test]
fn test_empty_arguments() -> Result<()> {
let ctx_state = ExecutionContextState::new();
let execution_props = ExecutionProps::new();
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);

let funs = [BuiltinScalarFunction::Now, BuiltinScalarFunction::Random];

for fun in funs.iter() {
create_physical_expr(fun, &[], &schema, &ctx_state)?;
create_physical_expr(fun, &[], &schema, &execution_props)?;
}
Ok(())
}
Expand All @@ -3954,13 +3954,13 @@ mod tests {
Field::new("b", value2.data_type().clone(), false),
]);
let columns: Vec<ArrayRef> = vec![value1, value2];
let ctx_state = ExecutionContextState::new();
let execution_props = ExecutionProps::new();

let expr = create_physical_expr(
&BuiltinScalarFunction::Array,
&[col("a", &schema)?, col("b", &schema)?],
&schema,
&ctx_state,
&execution_props,
)?;

// type is correct
Expand Down Expand Up @@ -4017,7 +4017,7 @@ mod tests {
fn test_regexp_match() -> Result<()> {
use arrow::array::ListArray;
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
let ctx_state = ExecutionContextState::new();
let execution_props = ExecutionProps::new();

let col_value: ArrayRef = Arc::new(StringArray::from_slice(&["aaa-555"]));
let pattern = lit(ScalarValue::Utf8(Some(r".*-(\d*)".to_string())));
Expand All @@ -4026,7 +4026,7 @@ mod tests {
&BuiltinScalarFunction::RegexpMatch,
&[col("a", &schema)?, pattern],
&schema,
&ctx_state,
&execution_props,
)?;

// type is correct
Expand Down Expand Up @@ -4056,7 +4056,7 @@ mod tests {
fn test_regexp_match_all_literals() -> Result<()> {
use arrow::array::ListArray;
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let ctx_state = ExecutionContextState::new();
let execution_props = ExecutionProps::new();

let col_value = lit(ScalarValue::Utf8(Some("aaa-555".to_string())));
let pattern = lit(ScalarValue::Utf8(Some(r".*-(\d*)".to_string())));
Expand All @@ -4065,7 +4065,7 @@ mod tests {
&BuiltinScalarFunction::RegexpMatch,
&[col_value, pattern],
&schema,
&ctx_state,
&execution_props,
)?;

// type is correct
Expand Down
Loading