Skip to content

Commit

Permalink
fix 593
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiayu Liu committed Jun 23, 2021
1 parent d55a105 commit a632633
Show file tree
Hide file tree
Showing 14 changed files with 136 additions and 135 deletions.
18 changes: 9 additions & 9 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, _>>()?;
LogicalPlanBuilder::from(&input)
LogicalPlanBuilder::from(input)
.project(x)?
.build()
.map_err(|e| e.into())
}
LogicalPlanType::Selection(selection) => {
let input: LogicalPlan = convert_box_required!(selection.input)?;
LogicalPlanBuilder::from(&input)
LogicalPlanBuilder::from(input)
.filter(
selection
.expr
Expand All @@ -86,7 +86,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, _>>()?;
LogicalPlanBuilder::from(&input)
LogicalPlanBuilder::from(input)
.window(window_expr)?
.build()
.map_err(|e| e.into())
Expand All @@ -103,7 +103,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, _>>()?;
LogicalPlanBuilder::from(&input)
LogicalPlanBuilder::from(input)
.aggregate(group_expr, aggr_expr)?
.build()
.map_err(|e| e.into())
Expand Down Expand Up @@ -162,7 +162,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<Expr>, _>>()?;
LogicalPlanBuilder::from(&input)
LogicalPlanBuilder::from(input)
.sort(sort_expr)?
.build()
.map_err(|e| e.into())
Expand Down Expand Up @@ -193,7 +193,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
}
};

LogicalPlanBuilder::from(&input)
LogicalPlanBuilder::from(input)
.repartition(partitioning_scheme)?
.build()
.map_err(|e| e.into())
Expand Down Expand Up @@ -223,14 +223,14 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
}
LogicalPlanType::Explain(explain) => {
let input: LogicalPlan = convert_box_required!(explain.input)?;
LogicalPlanBuilder::from(&input)
LogicalPlanBuilder::from(input)
.explain(explain.verbose)?
.build()
.map_err(|e| e.into())
}
LogicalPlanType::Limit(limit) => {
let input: LogicalPlan = convert_box_required!(limit.input)?;
LogicalPlanBuilder::from(&input)
LogicalPlanBuilder::from(input)
.limit(limit.limit as usize)?
.build()
.map_err(|e| e.into())
Expand All @@ -255,7 +255,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
protobuf::JoinType::Semi => JoinType::Semi,
protobuf::JoinType::Anti => JoinType::Anti,
};
LogicalPlanBuilder::from(&convert_box_required!(join.left)?)
LogicalPlanBuilder::from(convert_box_required!(join.left)?)
.join(
&convert_box_required!(join.right)?,
join_type,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,7 @@ mod tests {
let ctx = create_ctx(&tmp_dir, partition_count)?;

let table = ctx.table("test")?;
let logical_plan = LogicalPlanBuilder::from(&table.to_logical_plan())
let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan())
.project(vec![col("c2")])?
.build()?;

Expand Down Expand Up @@ -2566,7 +2566,7 @@ mod tests {

let t = ctx.table("t")?;

let plan = LogicalPlanBuilder::from(&t.to_logical_plan())
let plan = LogicalPlanBuilder::from(t.to_logical_plan())
.project(vec![
col("a"),
col("b"),
Expand Down
22 changes: 13 additions & 9 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ impl DataFrame for DataFrameImpl {

/// Create a projection based on arbitrary expressions
fn select(&self, expr_list: Vec<Expr>) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(&self.plan)
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
.project(expr_list)?
.build()?;
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
}

/// Create a filter based on a predicate expression
fn filter(&self, predicate: Expr) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(&self.plan)
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
.filter(predicate)?
.build()?;
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
Expand All @@ -83,21 +83,25 @@ impl DataFrame for DataFrameImpl {
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(&self.plan)
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
.aggregate(group_expr, aggr_expr)?
.build()?;
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
}

/// Limit the number of rows
fn limit(&self, n: usize) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(&self.plan).limit(n)?.build()?;
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
.limit(n)?
.build()?;
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
}

/// Sort by specified sorting expressions
fn sort(&self, expr: Vec<Expr>) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(&self.plan).sort(expr)?.build()?;
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
.sort(expr)?
.build()?;
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
}

Expand All @@ -109,7 +113,7 @@ impl DataFrame for DataFrameImpl {
left_cols: &[&str],
right_cols: &[&str],
) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(&self.plan)
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
.join(
&right.to_logical_plan(),
join_type,
Expand All @@ -124,7 +128,7 @@ impl DataFrame for DataFrameImpl {
&self,
partitioning_scheme: Partitioning,
) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(&self.plan)
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
.repartition(partitioning_scheme)?
.build()?;
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
Expand Down Expand Up @@ -161,7 +165,7 @@ impl DataFrame for DataFrameImpl {
}

fn explain(&self, verbose: bool) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(&self.plan)
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
.explain(verbose)?
.build()?;
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
Expand All @@ -173,7 +177,7 @@ impl DataFrame for DataFrameImpl {
}

fn union(&self, dataframe: Arc<dyn DataFrame>) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(&self.plan)
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
.union(dataframe.to_logical_plan())?
.build()?;
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
Expand Down
36 changes: 16 additions & 20 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,15 @@ pub struct LogicalPlanBuilder {

impl LogicalPlanBuilder {
/// Create a builder from an existing plan
pub fn from(plan: &LogicalPlan) -> Self {
Self { plan: plan.clone() }
pub fn from(plan: LogicalPlan) -> Self {
Self { plan }
}

/// Create an empty relation.
///
/// `produce_one_row` set to true means this empty node needs to produce a placeholder row.
pub fn empty(produce_one_row: bool) -> Self {
Self::from(&LogicalPlan::EmptyRelation {
Self::from(LogicalPlan::EmptyRelation {
produce_one_row,
schema: DFSchemaRef::new(DFSchema::empty()),
})
Expand Down Expand Up @@ -182,7 +182,7 @@ impl LogicalPlanBuilder {
limit: None,
};

Ok(Self::from(&table_scan))
Ok(Self::from(table_scan))
}

/// Apply a projection.
Expand Down Expand Up @@ -214,7 +214,7 @@ impl LogicalPlanBuilder {

let schema = DFSchema::new(exprlist_to_fields(&projected_expr, input_schema)?)?;

Ok(Self::from(&LogicalPlan::Projection {
Ok(Self::from(LogicalPlan::Projection {
expr: projected_expr,
input: Arc::new(self.plan.clone()),
schema: DFSchemaRef::new(schema),
Expand All @@ -224,15 +224,15 @@ impl LogicalPlanBuilder {
/// Apply a filter
pub fn filter(&self, expr: Expr) -> Result<Self> {
let expr = normalize_col(expr, &self.plan.all_schemas())?;
Ok(Self::from(&LogicalPlan::Filter {
Ok(Self::from(LogicalPlan::Filter {
predicate: expr,
input: Arc::new(self.plan.clone()),
}))
}

/// Apply a limit
pub fn limit(&self, n: usize) -> Result<Self> {
Ok(Self::from(&LogicalPlan::Limit {
Ok(Self::from(LogicalPlan::Limit {
n,
input: Arc::new(self.plan.clone()),
}))
Expand All @@ -241,19 +241,15 @@ impl LogicalPlanBuilder {
/// Apply a sort
pub fn sort(&self, exprs: impl IntoIterator<Item = Expr>) -> Result<Self> {
let schemas = self.plan.all_schemas();
Ok(Self::from(&LogicalPlan::Sort {
Ok(Self::from(LogicalPlan::Sort {
expr: normalize_cols(exprs, &schemas)?,
input: Arc::new(self.plan.clone()),
}))
}

/// Apply a union
pub fn union(&self, plan: LogicalPlan) -> Result<Self> {
Ok(Self::from(&union_with_alias(
self.plan.clone(),
plan,
None,
)?))
Ok(Self::from(union_with_alias(self.plan.clone(), plan, None)?))
}

/// Apply a join with on constraint
Expand Down Expand Up @@ -287,7 +283,7 @@ impl LogicalPlanBuilder {
&JoinConstraint::On,
)?;

Ok(Self::from(&LogicalPlan::Join {
Ok(Self::from(LogicalPlan::Join {
left: Arc::new(self.plan.clone()),
right: Arc::new(right.clone()),
on,
Expand Down Expand Up @@ -323,7 +319,7 @@ impl LogicalPlanBuilder {
&JoinConstraint::Using,
)?;

Ok(Self::from(&LogicalPlan::Join {
Ok(Self::from(LogicalPlan::Join {
left: Arc::new(self.plan.clone()),
right: Arc::new(right.clone()),
on,
Expand All @@ -336,7 +332,7 @@ impl LogicalPlanBuilder {
/// Apply a cross join
pub fn cross_join(&self, right: &LogicalPlan) -> Result<Self> {
let schema = self.plan.schema().join(right.schema())?;
Ok(Self::from(&LogicalPlan::CrossJoin {
Ok(Self::from(LogicalPlan::CrossJoin {
left: Arc::new(self.plan.clone()),
right: Arc::new(right.clone()),
schema: DFSchemaRef::new(schema),
Expand All @@ -345,7 +341,7 @@ impl LogicalPlanBuilder {

/// Repartition
pub fn repartition(&self, partitioning_scheme: Partitioning) -> Result<Self> {
Ok(Self::from(&LogicalPlan::Repartition {
Ok(Self::from(LogicalPlan::Repartition {
input: Arc::new(self.plan.clone()),
partitioning_scheme,
}))
Expand All @@ -368,7 +364,7 @@ impl LogicalPlanBuilder {
exprlist_to_fields(all_expr, self.plan.schema())?;
window_fields.extend_from_slice(self.plan.schema().fields());

Ok(Self::from(&LogicalPlan::Window {
Ok(Self::from(LogicalPlan::Window {
input: Arc::new(self.plan.clone()),
window_expr,
schema: Arc::new(DFSchema::new(window_fields)?),
Expand All @@ -393,7 +389,7 @@ impl LogicalPlanBuilder {
let aggr_schema =
DFSchema::new(exprlist_to_fields(all_expr, self.plan.schema())?)?;

Ok(Self::from(&LogicalPlan::Aggregate {
Ok(Self::from(LogicalPlan::Aggregate {
input: Arc::new(self.plan.clone()),
group_expr,
aggr_expr,
Expand All @@ -410,7 +406,7 @@ impl LogicalPlanBuilder {

let schema = LogicalPlan::explain_schema();

Ok(Self::from(&LogicalPlan::Explain {
Ok(Self::from(LogicalPlan::Explain {
verbose,
plan: Arc::new(self.plan.clone()),
stringified_plans,
Expand Down
Loading

0 comments on commit a632633

Please sign in to comment.