diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index fc4ac2c9076c..6762124b7e24 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -68,14 +68,15 @@ impl TryInto for &protobuf::LogicalPlanNode { } LogicalPlanType::Selection(selection) => { let input: LogicalPlan = convert_box_required!(selection.input)?; + let expr: Expr = selection + .expr + .as_ref() + .ok_or_else(|| { + BallistaError::General("expression required".to_string()) + })? + .try_into()?; LogicalPlanBuilder::from(input) - .filter( - selection - .expr - .as_ref() - .expect("expression required") - .try_into()?, - )? + .filter(expr)? .build() .map_err(|e| e.into()) } @@ -85,7 +86,7 @@ impl TryInto for &protobuf::LogicalPlanNode { .window_expr .iter() .map(|expr| expr.try_into()) - .collect::, _>>()?; + .collect::, _>>()?; LogicalPlanBuilder::from(input) .window(window_expr)? .build() @@ -97,12 +98,12 @@ impl TryInto for &protobuf::LogicalPlanNode { .group_expr .iter() .map(|expr| expr.try_into()) - .collect::, _>>()?; + .collect::, _>>()?; let aggr_expr = aggregate .aggr_expr .iter() .map(|expr| expr.try_into()) - .collect::, _>>()?; + .collect::, _>>()?; LogicalPlanBuilder::from(input) .aggregate(group_expr, aggr_expr)? .build() diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index b7d837f8e904..80edde979077 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -419,8 +419,11 @@ impl LogicalPlanBuilder { } /// Apply a window functions to extend the schema - pub fn window(&self, window_expr: impl IntoIterator) -> Result { - let window_expr = window_expr.into_iter().collect::>(); + pub fn window( + &self, + window_expr: impl IntoIterator>, + ) -> Result { + let window_expr = normalize_cols(window_expr, &self.plan)?; let all_expr = window_expr.iter(); validate_unique_names("Windows", all_expr.clone(), self.plan.schema())?; let mut window_fields: Vec =