Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reuse datafusion physical planner in ballista building from protobuf #532

Merged
merged 4 commits into from
Jun 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 26 additions & 116 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use datafusion::execution::context::{
ExecutionConfig, ExecutionContextState, ExecutionProps,
};
use datafusion::logical_plan::{DFSchema, Expr};
use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::expressions::col;
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::hash_join::PartitionMode;
Expand All @@ -45,7 +45,6 @@ use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
use datafusion::physical_plan::window_functions::{
BuiltInWindowFunction, WindowFunction,
};
use datafusion::physical_plan::windows::create_window_expr;
use datafusion::physical_plan::windows::WindowAggExec;
use datafusion::physical_plan::{
coalesce_batches::CoalesceBatchesExec,
Expand Down Expand Up @@ -205,76 +204,27 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
)
})?
.clone();

let physical_schema: SchemaRef =
SchemaRef::new((&input_schema).try_into()?);

let catalog_list =
Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;
let ctx_state = ExecutionContextState {
catalog_list,
scalar_functions: Default::default(),
var_provider: Default::default(),
aggregate_functions: Default::default(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
};

let ctx_state = ExecutionContextState::new();
let window_agg_expr: Vec<(Expr, String)> = window_agg
.window_expr
.iter()
.zip(window_agg.window_expr_name.iter())
.map(|(expr, name)| expr.try_into().map(|expr| (expr, name.clone())))
.collect::<Result<Vec<_>, _>>()?;

let mut physical_window_expr = vec![];

let df_planner = DefaultPhysicalPlanner::default();

for (expr, name) in &window_agg_expr {
match expr {
Expr::WindowFunction {
fun,
args,
partition_by,
order_by,
window_frame,
..
} => {
let arg = df_planner
.create_physical_expr(
&args[0],
&physical_schema,
&ctx_state,
)
.map_err(|e| {
BallistaError::General(format!("{:?}", e))
})?;
if !partition_by.is_empty() {
return Err(BallistaError::NotImplemented("Window function with partition by is not yet implemented".to_owned()));
}
if !order_by.is_empty() {
return Err(BallistaError::NotImplemented("Window function with order by is not yet implemented".to_owned()));
}
if window_frame.is_some() {
return Err(BallistaError::NotImplemented("Window function with window frame is not yet implemented".to_owned()));
}
let window_expr = create_window_expr(
&fun,
&[arg],
&physical_schema,
name.to_owned(),
)?;
physical_window_expr.push(window_expr);
}
_ => {
return Err(BallistaError::General(
"Invalid expression for WindowAggrExec".to_string(),
));
}
}
}

let physical_window_expr = window_agg_expr
.iter()
.map(|(expr, name)| {
df_planner.create_window_expr_with_name(
expr,
name.to_string(),
&physical_schema,
&ctx_state,
)
})
.collect::<Result<Vec<_>, _>>()?;
Ok(Arc::new(WindowAggExec::try_new(
physical_window_expr,
input,
Expand All @@ -297,7 +247,6 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
AggregateMode::FinalPartitioned
}
};

let group = hash_agg
.group_expr
.iter()
Expand All @@ -306,25 +255,13 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
compile_expr(expr, &input.schema()).map(|e| (e, name.to_string()))
})
.collect::<Result<Vec<_>, _>>()?;

let logical_agg_expr: Vec<(Expr, String)> = hash_agg
.aggr_expr
.iter()
.zip(hash_agg.aggr_expr_name.iter())
.map(|(expr, name)| expr.try_into().map(|expr| (expr, name.clone())))
.collect::<Result<Vec<_>, _>>()?;

let catalog_list =
Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;
let ctx_state = ExecutionContextState {
catalog_list,
scalar_functions: Default::default(),
var_provider: Default::default(),
aggregate_functions: Default::default(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
};

let ctx_state = ExecutionContextState::new();
let input_schema = hash_agg
.input_schema
.as_ref()
Expand All @@ -336,37 +273,18 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
.clone();
let physical_schema: SchemaRef =
SchemaRef::new((&input_schema).try_into()?);

let mut physical_aggr_expr = vec![];

let df_planner = DefaultPhysicalPlanner::default();
for (expr, name) in &logical_agg_expr {
match expr {
Expr::AggregateFunction { fun, args, .. } => {
let arg = df_planner
.create_physical_expr(
&args[0],
&physical_schema,
&ctx_state,
)
.map_err(|e| {
BallistaError::General(format!("{:?}", e))
})?;
physical_aggr_expr.push(create_aggregate_expr(
&fun,
false,
&[arg],
&physical_schema,
name.to_string(),
)?);
}
_ => {
return Err(BallistaError::General(
"Invalid expression for HashAggregateExec".to_string(),
))
}
}
}
let physical_aggr_expr = logical_agg_expr
.iter()
.map(|(expr, name)| {
df_planner.create_aggregate_expr_with_name(
expr,
name.to_string(),
&physical_schema,
&ctx_state,
)
})
.collect::<Result<Vec<_>, _>>()?;
Ok(Arc::new(HashAggregateExec::try_new(
agg_mode,
group,
Expand Down Expand Up @@ -484,15 +402,7 @@ fn compile_expr(
schema: &Schema,
) -> Result<Arc<dyn PhysicalExpr>, BallistaError> {
let df_planner = DefaultPhysicalPlanner::default();
let catalog_list = Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;
let state = ExecutionContextState {
catalog_list,
scalar_functions: HashMap::new(),
var_provider: HashMap::new(),
aggregate_functions: HashMap::new(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
};
let state = ExecutionContextState::new();
let expr: Expr = expr.try_into()?;
df_planner
.create_physical_expr(&expr, schema, &state)
Expand Down
116 changes: 95 additions & 21 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,34 +731,82 @@ impl DefaultPhysicalPlanner {
}
}

/// Create a window expression from a logical expression
pub fn create_window_expr(
/// Create a window expression with a name from a logical expression
pub fn create_window_expr_with_name(
&self,
e: &Expr,
logical_input_schema: &DFSchema,
name: String,
physical_input_schema: &Schema,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn WindowExpr>> {
// unpack aliased logical expressions, e.g. "sum(col) over () as total"
let (name, e) = match e {
Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
_ => (e.name(logical_input_schema)?, e),
};

match e {
Expr::WindowFunction { fun, args, .. } => {
Expr::WindowFunction {
fun,
args,
partition_by,
order_by,
window_frame,
} => {
let args = args
.iter()
.map(|e| {
self.create_physical_expr(e, physical_input_schema, ctx_state)
})
.collect::<Result<Vec<_>>>()?;
// if !order_by.is_empty() {
// return Err(DataFusionError::NotImplemented(
// "Window function with order by is not yet implemented".to_owned(),
// ));
// }
windows::create_window_expr(fun, &args, physical_input_schema, name)
let partition_by = partition_by
.iter()
.map(|e| {
self.create_physical_expr(e, physical_input_schema, ctx_state)
})
.collect::<Result<Vec<_>>>()?;
let order_by = order_by
.iter()
.map(|e| match e {
Expr::Sort {
expr,
asc,
nulls_first,
} => self.create_physical_sort_expr(
expr,
&physical_input_schema,
SortOptions {
descending: !*asc,
nulls_first: *nulls_first,
},
&ctx_state,
),
_ => Err(DataFusionError::Plan(
"Sort only accepts sort expressions".to_string(),
)),
})
.collect::<Result<Vec<_>>>()?;
if !partition_by.is_empty() {
return Err(DataFusionError::NotImplemented(
"window expression with non-empty partition by clause is not yet supported"
.to_owned(),
));
}
if !order_by.is_empty() {
return Err(DataFusionError::NotImplemented(
"window expression with non-empty order by clause is not yet supported"
.to_owned(),
));
}
if window_frame.is_some() {
return Err(DataFusionError::NotImplemented(
"window expression with window frame definition is not yet supported"
.to_owned(),
));
}
windows::create_window_expr(
fun,
name,
&args,
&partition_by,
&order_by,
*window_frame,
physical_input_schema,
)
}
other => Err(DataFusionError::Internal(format!(
"Invalid window expression '{:?}'",
Expand All @@ -767,20 +815,30 @@ impl DefaultPhysicalPlanner {
}
}

/// Create an aggregate expression from a logical expression
pub fn create_aggregate_expr(
/// Create a window expression from a logical expression or an alias
pub fn create_window_expr(
&self,
e: &Expr,
logical_input_schema: &DFSchema,
physical_input_schema: &Schema,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn AggregateExpr>> {
// unpack aliased logical expressions, e.g. "sum(col) as total"
) -> Result<Arc<dyn WindowExpr>> {
// unpack aliased logical expressions, e.g. "sum(col) over () as total"
let (name, e) = match e {
Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
_ => (e.name(logical_input_schema)?, e),
};
self.create_window_expr_with_name(e, name, physical_input_schema, ctx_state)
}

/// Create an aggregate expression with a name from a logical expression
pub fn create_aggregate_expr_with_name(
&self,
e: &Expr,
name: String,
physical_input_schema: &Schema,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn AggregateExpr>> {
match e {
Expr::AggregateFunction {
fun,
Expand Down Expand Up @@ -819,7 +877,23 @@ impl DefaultPhysicalPlanner {
}
}

/// Create an aggregate expression from a logical expression
/// Create an aggregate expression from a logical expression or an alias
pub fn create_aggregate_expr(
&self,
e: &Expr,
logical_input_schema: &DFSchema,
physical_input_schema: &Schema,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn AggregateExpr>> {
// unpack aliased logical expressions, e.g. "sum(col) as total"
let (name, e) = match e {
Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
_ => (e.name(logical_input_schema)?, e),
};
self.create_aggregate_expr_with_name(e, name, physical_input_schema, ctx_state)
}

/// Create a physical sort expression from a logical expression
pub fn create_physical_sort_expr(
&self,
e: &Expr,
Expand Down
Loading