Skip to content

Commit

Permalink
use customized type
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiayu Liu committed Jun 5, 2021
1 parent a67ef0d commit 947e18f
Show file tree
Hide file tree
Showing 9 changed files with 320 additions and 150 deletions.
18 changes: 10 additions & 8 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,24 @@
use crate::error::BallistaError;
use crate::serde::{proto_error, protobuf};
use crate::{convert_box_required, convert_required};
use sqlparser::ast::{WindowFrame, WindowFrameBound, WindowFrameUnits};
use std::{
convert::{From, TryInto},
unimplemented,
};

use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::logical_plan::{
abs, acos, asin, atan, ceil, cos, exp, floor, ln, log10, log2, round, signum, sin,
sqrt, tan, trunc, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Operator,
};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::csv::CsvReadOptions;
use datafusion::physical_plan::window_frames::{
WindowFrame, WindowFrameBound, WindowFrameUnits,
};
use datafusion::physical_plan::window_functions::BuiltInWindowFunction;
use datafusion::scalar::ScalarValue;
use protobuf::logical_plan_node::LogicalPlanType;
use protobuf::{logical_expr_node::ExprType, scalar_type};
use std::{
convert::{From, TryInto},
unimplemented,
};

// use uuid::Uuid;

Expand Down Expand Up @@ -915,7 +916,7 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
.map(|e| e.try_into())
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
let window_frame: Option<WindowFrame> = expr
let window_frame = expr
.window_frame
.as_ref()
.map::<Result<WindowFrame, _>, _>(|e| match e {
Expand Down Expand Up @@ -1337,7 +1338,8 @@ impl TryFrom<protobuf::WindowFrame> for WindowFrame {
end_bound.try_into()
}
})
.transpose()?;
.transpose()?
.unwrap_or(WindowFrameBound::CurrentRow);
Ok(WindowFrame {
units,
start_bound,
Expand Down
22 changes: 12 additions & 10 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@ use std::{
convert::{TryFrom, TryInto},
};

use super::super::proto_error;
use crate::datasource::DfTableAdapter;
use crate::serde::{protobuf, BallistaError};
use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
use datafusion::datasource::CsvFile;
use datafusion::logical_plan::{Expr, JoinType, LogicalPlan};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::functions::BuiltinScalarFunction;
use datafusion::physical_plan::window_frames::{
WindowFrame, WindowFrameBound, WindowFrameUnits,
};
use datafusion::physical_plan::window_functions::{
BuiltInWindowFunction, WindowFunction,
};
Expand All @@ -38,10 +43,6 @@ use protobuf::{
arrow_type, logical_expr_node::ExprType, scalar_type, DateUnit, PrimitiveScalarType,
ScalarListValue, ScalarType,
};
use sqlparser::ast::{WindowFrame, WindowFrameBound, WindowFrameUnits};

use super::super::proto_error;
use datafusion::physical_plan::functions::BuiltinScalarFunction;

impl protobuf::IntervalUnit {
pub fn from_arrow_interval_unit(interval_unit: &IntervalUnit) -> Self {
Expand Down Expand Up @@ -1027,9 +1028,10 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
.iter()
.map(|e| e.try_into())
.collect::<Result<Vec<_>, _>>()?;
let window_frame = window_frame.as_ref().map(|window_frame| {
let window_frame: protobuf::WindowFrame = window_frame.clone().into();
protobuf::window_expr_node::WindowFrame::Frame(window_frame)
let window_frame = window_frame.map(|window_frame| {
protobuf::window_expr_node::WindowFrame::Frame(
window_frame.clone().into(),
)
});
let window_expr = Box::new(protobuf::WindowExprNode {
expr: Some(Box::new(arg.try_into()?)),
Expand Down Expand Up @@ -1287,9 +1289,9 @@ impl From<WindowFrame> for protobuf::WindowFrame {
protobuf::WindowFrame {
window_frame_units: protobuf::WindowFrameUnits::from(window.units).into(),
start_bound: Some(window.start_bound.into()),
end_bound: window.end_bound.map(|end_bound| {
protobuf::window_frame::EndBound::Bound(end_bound.into())
}),
end_bound: Some(protobuf::window_frame::EndBound::Bound(
window.end_bound.into(),
)),
}
}
}
Expand Down
5 changes: 1 addition & 4 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
fun,
args,
order_by,
window_frame,
..
} => {
let arg = df_planner
.create_physical_expr(
Expand All @@ -251,9 +251,6 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
if !order_by.is_empty() {
return Err(BallistaError::NotImplemented("Window function with order by is not yet implemented".to_owned()));
}
if window_frame.is_some() {
return Err(BallistaError::NotImplemented("Window function with window frame is not yet implemented".to_owned()));
}
let window_expr = create_window_expr(
&fun,
&[arg],
Expand Down
38 changes: 31 additions & 7 deletions datafusion/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ use crate::error::{DataFusionError, Result};
use crate::logical_plan::{DFField, DFSchema};
use crate::physical_plan::{
aggregates, expressions::binary_operator_data_type, functions, udf::ScalarUDF,
window_functions,
window_frames, window_functions,
};
use crate::{physical_plan::udaf::AggregateUDF, scalar::ScalarValue};
use aggregates::{AccumulatorFunctionImplementation, StateTypeFunction};
use arrow::{compute::can_cast_types, datatypes::DataType};
use functions::{ReturnTypeFunction, ScalarFunctionImplementation, Signature};
use sqlparser::ast::WindowFrame;
use std::collections::HashSet;
use std::fmt;
use std::sync::Arc;
Expand Down Expand Up @@ -198,7 +197,7 @@ pub enum Expr {
/// List of order by expressions
order_by: Vec<Expr>,
/// Window frame
window_frame: Option<WindowFrame>,
window_frame: Option<window_frames::WindowFrame>,
},
/// aggregate function
AggregateUDF {
Expand Down Expand Up @@ -1285,8 +1284,23 @@ impl fmt::Debug for Expr {
Expr::ScalarUDF { fun, ref args, .. } => {
fmt_function(f, &fun.name, false, args)
}
Expr::WindowFunction { fun, ref args, .. } => {
fmt_function(f, &fun.to_string(), false, args)
Expr::WindowFunction {
fun,
ref args,
window_frame,
..
} => {
fmt_function(f, &fun.to_string(), false, args)?;
if let Some(window_frame) = window_frame {
write!(
f,
" {} BETWEEN {} AND {}",
window_frame.units,
window_frame.start_bound,
window_frame.end_bound
)?;
}
Ok(())
}
Expr::AggregateFunction {
fun,
Expand Down Expand Up @@ -1403,8 +1417,18 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
Expr::ScalarUDF { fun, args, .. } => {
create_function_name(&fun.name, false, args, input_schema)
}
Expr::WindowFunction { fun, args, .. } => {
create_function_name(&fun.to_string(), false, args, input_schema)
Expr::WindowFunction {
fun,
args,
window_frame,
..
} => {
let fun_name =
create_function_name(&fun.to_string(), false, args, input_schema)?;
Ok(match window_frame {
Some(window_frame) => format!("{} {}", fun_name, window_frame),
None => fun_name,
})
}
Expr::AggregateFunction {
fun,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
fun: fun.clone(),
args: expressions[..index].to_vec(),
order_by: expressions[index + 1..].to_vec(),
window_frame: window_frame.clone(),
window_frame: *window_frame,
})
}
Expr::AggregateFunction { fun, distinct, .. } => Ok(Expr::AggregateFunction {
Expand Down
12 changes: 2 additions & 10 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

//! Physical query planner

use std::sync::Arc;

use super::{
aggregates, cross_join::CrossJoinExec, empty::EmptyExec, expressions::binary,
functions, hash_join::PartitionMode, udaf, union::UnionExec, windows,
Expand All @@ -39,7 +37,6 @@ use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sort::SortExec;
use crate::physical_plan::udf;
use crate::physical_plan::window_frames;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::{hash_utils, Partitioning};
use crate::physical_plan::{
Expand All @@ -57,6 +54,7 @@ use arrow::datatypes::{Schema, SchemaRef};
use arrow::{compute::can_cast_types, datatypes::DataType};
use expressions::col;
use log::debug;
use std::sync::Arc;

/// This trait exposes the ability to plan an [`ExecutionPlan`] out of a [`LogicalPlan`].
pub trait ExtensionPlanner {
Expand Down Expand Up @@ -748,12 +746,7 @@ impl DefaultPhysicalPlanner {
};

match e {
Expr::WindowFunction {
fun,
args,
window_frame,
..
} => {
Expr::WindowFunction { fun, args, .. } => {
let args = args
.iter()
.map(|e| {
Expand All @@ -765,7 +758,6 @@ impl DefaultPhysicalPlanner {
// "Window function with order by is not yet implemented".to_owned(),
// ));
// }
let _window_frame = window_frames::validate_window_frame(window_frame)?;
windows::create_window_expr(fun, &args, physical_input_schema, name)
}
other => Err(DataFusionError::Internal(format!(
Expand Down
Loading

0 comments on commit 947e18f

Please sign in to comment.