Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move physical expression planning to datafusion-physical-expr crate #2682

Merged
merged 5 commits into from
Jun 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 2 additions & 63 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::{
hash_build_probe_order::HashBuildProbeOrder, optimizer::PhysicalOptimizerRule,
},
};
pub use datafusion_physical_expr::execution_props::ExecutionProps;
use parking_lot::RwLock;
use std::string::String;
use std::sync::Arc;
Expand Down Expand Up @@ -1128,68 +1129,6 @@ impl SessionConfig {
}
}

/// Holds per-execution properties and data (such as starting timestamps, etc).
/// An instance of this struct is created each time a [`LogicalPlan`] is prepared for
/// execution (optimized). If the same plan is optimized multiple times, a new
/// `ExecutionProps` is created each time.
///
/// It is important that this structure be cheap to create as it is
/// done so during predicate pruning and expression simplification
#[derive(Clone)]
pub struct ExecutionProps {
pub(crate) query_execution_start_time: DateTime<Utc>,
/// providers for scalar variables
pub var_providers: Option<HashMap<VarType, Arc<dyn VarProvider + Send + Sync>>>,
}

impl Default for ExecutionProps {
fn default() -> Self {
Self::new()
}
}

impl ExecutionProps {
/// Creates a new execution props
pub fn new() -> Self {
ExecutionProps {
query_execution_start_time: chrono::Utc::now(),
var_providers: None,
}
}

/// Marks the execution of query started timestamp
pub fn start_execution(&mut self) -> &Self {
self.query_execution_start_time = chrono::Utc::now();
&*self
}

/// Registers a variable provider, returning the existing
/// provider, if any
pub fn add_var_provider(
&mut self,
var_type: VarType,
provider: Arc<dyn VarProvider + Send + Sync>,
) -> Option<Arc<dyn VarProvider + Send + Sync>> {
let mut var_providers = self.var_providers.take().unwrap_or_default();

let old_provider = var_providers.insert(var_type, provider);

self.var_providers = Some(var_providers);

old_provider
}

/// Returns the provider for the var_type, if any
pub fn get_var_provider(
&self,
var_type: VarType,
) -> Option<Arc<dyn VarProvider + Send + Sync>> {
self.var_providers
.as_ref()
.and_then(|var_providers| var_providers.get(&var_type).map(Arc::clone))
}
}

/// Execution context for registering data sources and executing queries
#[derive(Clone)]
pub struct SessionState {
Expand Down Expand Up @@ -1652,7 +1591,6 @@ impl FunctionRegistry for TaskContext {
mod tests {
use super::*;
use crate::execution::context::QueryPlanner;
use crate::physical_plan::functions::make_scalar_function;
use crate::test;
use crate::test_util::parquet_test_data;
use crate::variable::VarType;
Expand All @@ -1666,6 +1604,7 @@ mod tests {
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion_expr::Volatility;
use datafusion_physical_expr::functions::make_scalar_function;
use std::fs::File;
use std::sync::Weak;
use std::thread::{self, JoinHandle};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ pub use datafusion_row as row;
#[cfg(feature = "jit")]
pub use datafusion_jit as jit;

pub mod from_slice;
pub use physical_expr::from_slice;

#[cfg(test)]
pub mod test;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/optimizer/simplify_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
use crate::execution::context::ExecutionProps;
use crate::logical_plan::{ExprSimplifiable, SimplifyInfo};
use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule};
use crate::physical_plan::planner::create_physical_expr;
use arrow::array::new_null_array;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
Expand All @@ -33,6 +32,7 @@ use datafusion_expr::{
utils::from_plan,
Expr, ExprSchemable, Operator, Volatility,
};
use datafusion_physical_expr::create_physical_expr;

/// Provides simplification information based on schema and properties
pub(crate) struct SimplifyContext<'a, 'b> {
Expand Down
15 changes: 7 additions & 8 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,21 @@
use std::convert::TryFrom;
use std::{collections::HashSet, sync::Arc};

use arrow::{
array::{new_null_array, ArrayRef, BooleanArray},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use datafusion_expr::utils::expr_to_columns;

use crate::execution::context::ExecutionProps;
use crate::physical_plan::planner::create_physical_expr;
use crate::prelude::lit;
use crate::{
error::{DataFusionError, Result},
logical_plan::{Column, DFSchema, Expr, Operator},
optimizer::utils,
physical_plan::{ColumnarValue, PhysicalExpr},
};
use arrow::{
array::{new_null_array, ArrayRef, BooleanArray},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use datafusion_expr::utils::expr_to_columns;
use datafusion_physical_expr::create_physical_expr;

/// Interface to pass statistics information to [`PruningPredicate`]
///
Expand Down
Loading