Skip to content

Commit

Permalink
[ARROW-12441] [DataFusion] Cross join implementation (#11)
Browse files Browse the repository at this point in the history
* Cross join implementation

* Add to ballista, debug line

* Add to tpch test, format

* Simplify a bit

* Row-by-row processing for the left side to keep memory down

* Fix

* Fmt

* Clippy

* Fix doc, don't include as much debug info in memoryexec debug

* Use join

* Fix doc

* Add test cases with partitions

* Make clear that mutex is locked for very short amount of time

* Unwrap the lock
  • Loading branch information
Dandandan authored Apr 22, 2021
1 parent 395d9d6 commit 57eeb64
Show file tree
Hide file tree
Showing 17 changed files with 479 additions and 26 deletions.
1 change: 1 addition & 0 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
}
LogicalPlan::Extension { .. } => unimplemented!(),
LogicalPlan::Union { .. } => unimplemented!(),
LogicalPlan::CrossJoin { .. } => unimplemented!(),
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1374,6 +1374,11 @@ mod tests {
run_query(6).await
}

#[tokio::test]
async fn run_q9() -> Result<()> {
run_query(9).await
}

#[tokio::test]
async fn run_q10() -> Result<()> {
run_query(10).await
Expand Down
4 changes: 3 additions & 1 deletion datafusion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ DataFusion also includes a simple command-line interactive SQL utility. See the
- [ ] MINUS
- [x] Joins
- [x] INNER JOIN
- [ ] CROSS JOIN
- [x] LEFT JOIN
- [x] RIGHT JOIN
- [x] CROSS JOIN
- [ ] OUTER JOIN
- [ ] Window

Expand Down
10 changes: 10 additions & 0 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,16 @@ impl LogicalPlanBuilder {
}))
}
}
/// Apply a cross join
pub fn cross_join(&self, right: &LogicalPlan) -> Result<Self> {
let schema = self.plan.schema().join(right.schema())?;

Ok(Self::from(&LogicalPlan::CrossJoin {
left: Arc::new(self.plan.clone()),
right: Arc::new(right.clone()),
schema: DFSchemaRef::new(schema),
}))
}

/// Repartition
pub fn repartition(&self, partitioning_scheme: Partitioning) -> Result<Self> {
Expand Down
27 changes: 24 additions & 3 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ pub enum LogicalPlan {
/// The output schema, containing fields from the left and right inputs
schema: DFSchemaRef,
},
/// Apply Cross Join to two logical plans
CrossJoin {
/// Left input
left: Arc<LogicalPlan>,
/// Right input
right: Arc<LogicalPlan>,
/// The output schema, containing fields from the left and right inputs
schema: DFSchemaRef,
},
/// Repartition the plan based on a partitioning scheme.
Repartition {
/// The incoming logical plan
Expand Down Expand Up @@ -203,6 +212,7 @@ impl LogicalPlan {
LogicalPlan::Aggregate { schema, .. } => &schema,
LogicalPlan::Sort { input, .. } => input.schema(),
LogicalPlan::Join { schema, .. } => &schema,
LogicalPlan::CrossJoin { schema, .. } => &schema,
LogicalPlan::Repartition { input, .. } => input.schema(),
LogicalPlan::Limit { input, .. } => input.schema(),
LogicalPlan::CreateExternalTable { schema, .. } => &schema,
Expand All @@ -229,6 +239,11 @@ impl LogicalPlan {
right,
schema,
..
}
| LogicalPlan::CrossJoin {
left,
right,
schema,
} => {
let mut schemas = left.all_schemas();
schemas.extend(right.all_schemas());
Expand Down Expand Up @@ -290,8 +305,9 @@ impl LogicalPlan {
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::Explain { .. } => vec![],
LogicalPlan::Union { .. } => {
| LogicalPlan::CrossJoin { .. }
| LogicalPlan::Explain { .. }
| LogicalPlan::Union { .. } => {
vec![]
}
}
Expand All @@ -307,6 +323,7 @@ impl LogicalPlan {
LogicalPlan::Aggregate { input, .. } => vec![input],
LogicalPlan::Sort { input, .. } => vec![input],
LogicalPlan::Join { left, right, .. } => vec![left, right],
LogicalPlan::CrossJoin { left, right, .. } => vec![left, right],
LogicalPlan::Limit { input, .. } => vec![input],
LogicalPlan::Extension { node } => node.inputs(),
LogicalPlan::Union { inputs, .. } => inputs.iter().collect(),
Expand Down Expand Up @@ -396,7 +413,8 @@ impl LogicalPlan {
LogicalPlan::Repartition { input, .. } => input.accept(visitor)?,
LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?,
LogicalPlan::Sort { input, .. } => input.accept(visitor)?,
LogicalPlan::Join { left, right, .. } => {
LogicalPlan::Join { left, right, .. }
| LogicalPlan::CrossJoin { left, right, .. } => {
left.accept(visitor)? && right.accept(visitor)?
}
LogicalPlan::Union { inputs, .. } => {
Expand Down Expand Up @@ -669,6 +687,9 @@ impl LogicalPlan {
keys.iter().map(|(l, r)| format!("{} = {}", l, r)).collect();
write!(f, "Join: {}", join_expr.join(", "))
}
LogicalPlan::CrossJoin { .. } => {
write!(f, "CrossJoin:")
}
LogicalPlan::Repartition {
partitioning_scheme,
..
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/optimizer/constant_folding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ impl OptimizerRule for ConstantFolding {
| LogicalPlan::Explain { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::Union { .. }
| LogicalPlan::Join { .. } => {
| LogicalPlan::Join { .. }
| LogicalPlan::CrossJoin { .. } => {
// apply the optimization to all inputs of the plan
let inputs = plan.inputs();
let new_inputs = inputs
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/optimizer/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
.collect::<HashSet<_>>();
issue_filters(state, used_columns, plan)
}
LogicalPlan::Join { left, right, .. } => {
LogicalPlan::Join { left, right, .. }
| LogicalPlan::CrossJoin { left, right, .. } => {
let (pushable_to_left, pushable_to_right, keep) =
get_join_predicates(&state, &left.schema(), &right.schema());

Expand Down
27 changes: 27 additions & 0 deletions datafusion/src/optimizer/hash_build_probe_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {
// we cannot predict the cardinality of the join output
None
}
LogicalPlan::CrossJoin { left, right, .. } => {
// number of rows is equal to num_left * num_right
get_num_rows(left).and_then(|x| get_num_rows(right).map(|y| x * y))
}
LogicalPlan::Repartition { .. } => {
// we cannot predict how rows will be repartitioned
None
Expand Down Expand Up @@ -138,6 +142,29 @@ impl OptimizerRule for HashBuildProbeOrder {
})
}
}
LogicalPlan::CrossJoin {
left,
right,
schema,
} => {
let left = self.optimize(left)?;
let right = self.optimize(right)?;
if should_swap_join_order(&left, &right) {
// Swap left and right
Ok(LogicalPlan::CrossJoin {
left: Arc::new(right),
right: Arc::new(left),
schema: schema.clone(),
})
} else {
// Keep join as is
Ok(LogicalPlan::CrossJoin {
left: Arc::new(left),
right: Arc::new(right),
schema: schema.clone(),
})
}
}
// Rest: recurse into plan, apply optimization where possible
LogicalPlan::Projection { .. }
| LogicalPlan::Aggregate { .. }
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ fn optimize_plan(
| LogicalPlan::Sort { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::Union { .. }
| LogicalPlan::CrossJoin { .. }
| LogicalPlan::Extension { .. } => {
let expr = plan.expressions();
// collect all required columns by this plan
Expand Down
5 changes: 5 additions & 0 deletions datafusion/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ pub fn from_plan(
on: on.clone(),
schema: schema.clone(),
}),
LogicalPlan::CrossJoin { schema, .. } => Ok(LogicalPlan::CrossJoin {
left: Arc::new(inputs[0].clone()),
right: Arc::new(inputs[1].clone()),
schema: schema.clone(),
}),
LogicalPlan::Limit { n, .. } => Ok(LogicalPlan::Limit {
n: *n,
input: Arc::new(inputs[0].clone()),
Expand Down
Loading

0 comments on commit 57eeb64

Please sign in to comment.