Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix 593, reduce cloning by taking ownership in logical planner's from fn #610

Merged
merged 1 commit into from
Jun 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, _>>()?;
LogicalPlanBuilder::from(&input)
LogicalPlanBuilder::from(input)
.project(x)?
.build()
.map_err(|e| e.into())
}
LogicalPlanType::Selection(selection) => {
let input: LogicalPlan = convert_box_required!(selection.input)?;
LogicalPlanBuilder::from(&input)
LogicalPlanBuilder::from(input)
.filter(
selection
.expr
Expand All @@ -86,7 +86,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, _>>()?;
LogicalPlanBuilder::from(&input)
LogicalPlanBuilder::from(input)
.window(window_expr)?
.build()
.map_err(|e| e.into())
Expand All @@ -103,7 +103,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, _>>()?;
LogicalPlanBuilder::from(&input)
LogicalPlanBuilder::from(input)
.aggregate(group_expr, aggr_expr)?
.build()
.map_err(|e| e.into())
Expand Down Expand Up @@ -162,7 +162,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<Expr>, _>>()?;
LogicalPlanBuilder::from(&input)
LogicalPlanBuilder::from(input)
.sort(sort_expr)?
.build()
.map_err(|e| e.into())
Expand Down Expand Up @@ -193,7 +193,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
}
};

LogicalPlanBuilder::from(&input)
LogicalPlanBuilder::from(input)
.repartition(partitioning_scheme)?
.build()
.map_err(|e| e.into())
Expand Down Expand Up @@ -223,14 +223,14 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
}
LogicalPlanType::Explain(explain) => {
let input: LogicalPlan = convert_box_required!(explain.input)?;
LogicalPlanBuilder::from(&input)
LogicalPlanBuilder::from(input)
.explain(explain.verbose)?
.build()
.map_err(|e| e.into())
}
LogicalPlanType::Limit(limit) => {
let input: LogicalPlan = convert_box_required!(limit.input)?;
LogicalPlanBuilder::from(&input)
LogicalPlanBuilder::from(input)
.limit(limit.limit as usize)?
.build()
.map_err(|e| e.into())
Expand All @@ -255,7 +255,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
protobuf::JoinType::Semi => JoinType::Semi,
protobuf::JoinType::Anti => JoinType::Anti,
};
LogicalPlanBuilder::from(&convert_box_required!(join.left)?)
LogicalPlanBuilder::from(convert_box_required!(join.left)?)
.join(
&convert_box_required!(join.right)?,
join_type,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,7 @@ mod tests {
let ctx = create_ctx(&tmp_dir, partition_count)?;

let table = ctx.table("test")?;
let logical_plan = LogicalPlanBuilder::from(&table.to_logical_plan())
let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan())
.project(vec![col("c2")])?
.build()?;

Expand Down Expand Up @@ -2566,7 +2566,7 @@ mod tests {

let t = ctx.table("t")?;

let plan = LogicalPlanBuilder::from(&t.to_logical_plan())
let plan = LogicalPlanBuilder::from(t.to_logical_plan())
.project(vec![
col("a"),
col("b"),
Expand Down
22 changes: 13 additions & 9 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ impl DataFrame for DataFrameImpl {

/// Create a projection based on arbitrary expressions
fn select(&self, expr_list: Vec<Expr>) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(&self.plan)
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
.project(expr_list)?
.build()?;
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
}

/// Create a filter based on a predicate expression
fn filter(&self, predicate: Expr) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(&self.plan)
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
.filter(predicate)?
.build()?;
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
Expand All @@ -83,21 +83,25 @@ impl DataFrame for DataFrameImpl {
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(&self.plan)
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
.aggregate(group_expr, aggr_expr)?
.build()?;
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
}

/// Limit the number of rows
fn limit(&self, n: usize) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(&self.plan).limit(n)?.build()?;
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
.limit(n)?
.build()?;
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
}

/// Sort by specified sorting expressions
fn sort(&self, expr: Vec<Expr>) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(&self.plan).sort(expr)?.build()?;
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
.sort(expr)?
.build()?;
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
}

Expand All @@ -109,7 +113,7 @@ impl DataFrame for DataFrameImpl {
left_cols: &[&str],
right_cols: &[&str],
) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(&self.plan)
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
.join(
&right.to_logical_plan(),
join_type,
Expand All @@ -124,7 +128,7 @@ impl DataFrame for DataFrameImpl {
&self,
partitioning_scheme: Partitioning,
) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(&self.plan)
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
.repartition(partitioning_scheme)?
.build()?;
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
Expand Down Expand Up @@ -161,7 +165,7 @@ impl DataFrame for DataFrameImpl {
}

fn explain(&self, verbose: bool) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(&self.plan)
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
.explain(verbose)?
.build()?;
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
Expand All @@ -173,7 +177,7 @@ impl DataFrame for DataFrameImpl {
}

fn union(&self, dataframe: Arc<dyn DataFrame>) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(&self.plan)
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
.union(dataframe.to_logical_plan())?
.build()?;
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
Expand Down
36 changes: 16 additions & 20 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,15 @@ pub struct LogicalPlanBuilder {

impl LogicalPlanBuilder {
/// Create a builder from an existing plan
pub fn from(plan: &LogicalPlan) -> Self {
Self { plan: plan.clone() }
pub fn from(plan: LogicalPlan) -> Self {
Self { plan }
}

/// Create an empty relation.
///
/// `produce_one_row` set to true means this empty node needs to produce a placeholder row.
pub fn empty(produce_one_row: bool) -> Self {
Self::from(&LogicalPlan::EmptyRelation {
Self::from(LogicalPlan::EmptyRelation {
produce_one_row,
schema: DFSchemaRef::new(DFSchema::empty()),
})
Expand Down Expand Up @@ -182,7 +182,7 @@ impl LogicalPlanBuilder {
limit: None,
};

Ok(Self::from(&table_scan))
Ok(Self::from(table_scan))
}

/// Apply a projection.
Expand Down Expand Up @@ -214,7 +214,7 @@ impl LogicalPlanBuilder {

let schema = DFSchema::new(exprlist_to_fields(&projected_expr, input_schema)?)?;

Ok(Self::from(&LogicalPlan::Projection {
Ok(Self::from(LogicalPlan::Projection {
expr: projected_expr,
input: Arc::new(self.plan.clone()),
schema: DFSchemaRef::new(schema),
Expand All @@ -224,15 +224,15 @@ impl LogicalPlanBuilder {
/// Apply a filter
pub fn filter(&self, expr: Expr) -> Result<Self> {
let expr = normalize_col(expr, &self.plan.all_schemas())?;
Ok(Self::from(&LogicalPlan::Filter {
Ok(Self::from(LogicalPlan::Filter {
predicate: expr,
input: Arc::new(self.plan.clone()),
}))
}

/// Apply a limit
pub fn limit(&self, n: usize) -> Result<Self> {
Ok(Self::from(&LogicalPlan::Limit {
Ok(Self::from(LogicalPlan::Limit {
n,
input: Arc::new(self.plan.clone()),
}))
Expand All @@ -241,19 +241,15 @@ impl LogicalPlanBuilder {
/// Apply a sort
pub fn sort(&self, exprs: impl IntoIterator<Item = Expr>) -> Result<Self> {
let schemas = self.plan.all_schemas();
Ok(Self::from(&LogicalPlan::Sort {
Ok(Self::from(LogicalPlan::Sort {
expr: normalize_cols(exprs, &schemas)?,
input: Arc::new(self.plan.clone()),
}))
}

/// Apply a union
pub fn union(&self, plan: LogicalPlan) -> Result<Self> {
Ok(Self::from(&union_with_alias(
self.plan.clone(),
plan,
None,
)?))
Ok(Self::from(union_with_alias(self.plan.clone(), plan, None)?))
}

/// Apply a join with on constraint
Expand Down Expand Up @@ -287,7 +283,7 @@ impl LogicalPlanBuilder {
&JoinConstraint::On,
)?;

Ok(Self::from(&LogicalPlan::Join {
Ok(Self::from(LogicalPlan::Join {
left: Arc::new(self.plan.clone()),
right: Arc::new(right.clone()),
on,
Expand Down Expand Up @@ -323,7 +319,7 @@ impl LogicalPlanBuilder {
&JoinConstraint::Using,
)?;

Ok(Self::from(&LogicalPlan::Join {
Ok(Self::from(LogicalPlan::Join {
left: Arc::new(self.plan.clone()),
right: Arc::new(right.clone()),
on,
Expand All @@ -336,7 +332,7 @@ impl LogicalPlanBuilder {
/// Apply a cross join
pub fn cross_join(&self, right: &LogicalPlan) -> Result<Self> {
let schema = self.plan.schema().join(right.schema())?;
Ok(Self::from(&LogicalPlan::CrossJoin {
Ok(Self::from(LogicalPlan::CrossJoin {
left: Arc::new(self.plan.clone()),
right: Arc::new(right.clone()),
schema: DFSchemaRef::new(schema),
Expand All @@ -345,7 +341,7 @@ impl LogicalPlanBuilder {

/// Repartition
pub fn repartition(&self, partitioning_scheme: Partitioning) -> Result<Self> {
Ok(Self::from(&LogicalPlan::Repartition {
Ok(Self::from(LogicalPlan::Repartition {
input: Arc::new(self.plan.clone()),
partitioning_scheme,
}))
Expand All @@ -359,7 +355,7 @@ impl LogicalPlanBuilder {
let mut window_fields: Vec<DFField> =
exprlist_to_fields(all_expr, self.plan.schema())?;
window_fields.extend_from_slice(self.plan.schema().fields());
Ok(Self::from(&LogicalPlan::Window {
Ok(Self::from(LogicalPlan::Window {
input: Arc::new(self.plan.clone()),
window_expr,
schema: Arc::new(DFSchema::new(window_fields)?),
Expand All @@ -384,7 +380,7 @@ impl LogicalPlanBuilder {
let aggr_schema =
DFSchema::new(exprlist_to_fields(all_expr, self.plan.schema())?)?;

Ok(Self::from(&LogicalPlan::Aggregate {
Ok(Self::from(LogicalPlan::Aggregate {
input: Arc::new(self.plan.clone()),
group_expr,
aggr_expr,
Expand All @@ -401,7 +397,7 @@ impl LogicalPlanBuilder {

let schema = LogicalPlan::explain_schema();

Ok(Self::from(&LogicalPlan::Explain {
Ok(Self::from(LogicalPlan::Explain {
verbose,
plan: Arc::new(self.plan.clone()),
stringified_plans,
Expand Down
Loading