Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-12441: [Rust][DataFusion] Support cartesian join #10092

Closed
wants to merge 14 commits into from
1 change: 1 addition & 0 deletions rust/ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
}
LogicalPlan::Extension { .. } => unimplemented!(),
LogicalPlan::Union { .. } => unimplemented!(),
LogicalPlan::CartesianJoin { .. } => unimplemented!(),
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions rust/benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1374,6 +1374,11 @@ mod tests {
run_query(6).await
}

#[tokio::test]
async fn run_q9() -> Result<()> {
run_query(9).await
}

#[tokio::test]
async fn run_q10() -> Result<()> {
run_query(10).await
Expand Down
4 changes: 3 additions & 1 deletion rust/datafusion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ DataFusion also includes a simple command-line interactive SQL utility. See the
- [ ] MINUS
- [x] Joins
- [x] INNER JOIN
- [ ] CROSS JOIN
- [x] LEFT JOIN
- [x] RIGHT JOIN
- [x] CROSS JOIN
- [ ] OUTER JOIN
- [ ] Window

Expand Down
14 changes: 14 additions & 0 deletions rust/datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,20 @@ impl LogicalPlanBuilder {
}))
}
}
/// Apply a cartesian join
pub fn cartesian_join(&self, right: &LogicalPlan) -> Result<Self> {
let left_fields = self.plan.schema().fields().iter();
let right_fields = right.schema().fields();
let fields = left_fields.chain(right_fields).cloned().collect();

let schema = DFSchema::new(fields)?;

Ok(Self::from(&LogicalPlan::CartesianJoin {
left: Arc::new(self.plan.clone()),
right: Arc::new(right.clone()),
schema: DFSchemaRef::new(schema),
}))
}

/// Repartition
pub fn repartition(&self, partitioning_scheme: Partitioning) -> Result<Self> {
Expand Down
27 changes: 24 additions & 3 deletions rust/datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ pub enum LogicalPlan {
/// The output schema, containing fields from the left and right inputs
schema: DFSchemaRef,
},
/// Join two logical plans on one or more join columns
CartesianJoin {
/// Left input
left: Arc<LogicalPlan>,
/// Right input
right: Arc<LogicalPlan>,
/// The output schema, containing fields from the left and right inputs
schema: DFSchemaRef,
},
/// Repartition the plan based on a partitioning scheme.
Repartition {
/// The incoming logical plan
Expand Down Expand Up @@ -203,6 +212,7 @@ impl LogicalPlan {
LogicalPlan::Aggregate { schema, .. } => &schema,
LogicalPlan::Sort { input, .. } => input.schema(),
LogicalPlan::Join { schema, .. } => &schema,
LogicalPlan::CartesianJoin { schema, .. } => &schema,
LogicalPlan::Repartition { input, .. } => input.schema(),
LogicalPlan::Limit { input, .. } => input.schema(),
LogicalPlan::CreateExternalTable { schema, .. } => &schema,
Expand All @@ -229,6 +239,11 @@ impl LogicalPlan {
right,
schema,
..
}
| LogicalPlan::CartesianJoin {
left,
right,
schema,
} => {
let mut schemas = left.all_schemas();
schemas.extend(right.all_schemas());
Expand Down Expand Up @@ -290,8 +305,9 @@ impl LogicalPlan {
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::Explain { .. } => vec![],
LogicalPlan::Union { .. } => {
| LogicalPlan::CartesianJoin { .. }
| LogicalPlan::Explain { .. }
| LogicalPlan::Union { .. } => {
vec![]
}
}
Expand All @@ -307,6 +323,7 @@ impl LogicalPlan {
LogicalPlan::Aggregate { input, .. } => vec![input],
LogicalPlan::Sort { input, .. } => vec![input],
LogicalPlan::Join { left, right, .. } => vec![left, right],
LogicalPlan::CartesianJoin { left, right, .. } => vec![left, right],
LogicalPlan::Limit { input, .. } => vec![input],
LogicalPlan::Extension { node } => node.inputs(),
LogicalPlan::Union { inputs, .. } => inputs.iter().collect(),
Expand Down Expand Up @@ -396,7 +413,8 @@ impl LogicalPlan {
LogicalPlan::Repartition { input, .. } => input.accept(visitor)?,
LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?,
LogicalPlan::Sort { input, .. } => input.accept(visitor)?,
LogicalPlan::Join { left, right, .. } => {
LogicalPlan::Join { left, right, .. }
| LogicalPlan::CartesianJoin { left, right, .. } => {
left.accept(visitor)? && right.accept(visitor)?
}
LogicalPlan::Union { inputs, .. } => {
Expand Down Expand Up @@ -669,6 +687,9 @@ impl LogicalPlan {
keys.iter().map(|(l, r)| format!("{} = {}", l, r)).collect();
write!(f, "Join: {}", join_expr.join(", "))
}
LogicalPlan::CartesianJoin { .. } => {
write!(f, "CartesianJoin:")
}
LogicalPlan::Repartition {
partitioning_scheme,
..
Expand Down
3 changes: 2 additions & 1 deletion rust/datafusion/src/optimizer/constant_folding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ impl OptimizerRule for ConstantFolding {
| LogicalPlan::Explain { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::Union { .. }
| LogicalPlan::Join { .. } => {
| LogicalPlan::Join { .. }
| LogicalPlan::CartesianJoin { .. } => {
// apply the optimization to all inputs of the plan
let inputs = plan.inputs();
let new_inputs = inputs
Expand Down
27 changes: 27 additions & 0 deletions rust/datafusion/src/optimizer/hash_build_probe_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {
// we cannot predict the cardinality of the join output
None
}
LogicalPlan::CartesianJoin { left, right, .. } => {
// number of rows is equal to num_left * num_right
get_num_rows(left).and_then(|x| get_num_rows(right).map(|y| x * y))
}
LogicalPlan::Repartition { .. } => {
// we cannot predict how rows will be repartitioned
None
Expand Down Expand Up @@ -138,6 +142,29 @@ impl OptimizerRule for HashBuildProbeOrder {
})
}
}
LogicalPlan::CartesianJoin {
left,
right,
schema,
} => {
let left = self.optimize(left)?;
let right = self.optimize(right)?;
if should_swap_join_order(&left, &right) {
// Swap left and right
Ok(LogicalPlan::CartesianJoin {
left: Arc::new(right),
right: Arc::new(left),
schema: schema.clone(),
})
} else {
// Keep join as is
Ok(LogicalPlan::CartesianJoin {
left: Arc::new(left),
right: Arc::new(right),
schema: schema.clone(),
})
}
}
// Rest: recurse into plan, apply optimization where possible
LogicalPlan::Projection { .. }
| LogicalPlan::Aggregate { .. }
Expand Down
1 change: 1 addition & 0 deletions rust/datafusion/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ fn optimize_plan(
| LogicalPlan::Sort { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::Union { .. }
| LogicalPlan::CartesianJoin { .. }
| LogicalPlan::Extension { .. } => {
let expr = plan.expressions();
// collect all required columns by this plan
Expand Down
5 changes: 5 additions & 0 deletions rust/datafusion/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ pub fn from_plan(
on: on.clone(),
schema: schema.clone(),
}),
LogicalPlan::CartesianJoin { schema, .. } => Ok(LogicalPlan::CartesianJoin {
left: Arc::new(inputs[0].clone()),
right: Arc::new(inputs[1].clone()),
schema: schema.clone(),
}),
LogicalPlan::Limit { n, .. } => Ok(LogicalPlan::Limit {
n: *n,
input: Arc::new(inputs[0].clone()),
Expand Down
Loading