Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recursive unnest #11062

Merged
merged 13 commits into from
Jul 2, 2024
5 changes: 5 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,11 @@ impl Unnest {
expr: Box::new(expr),
}
}

/// Create a new Unnest expression.
pub fn new_boxed(boxed: Box<Expr>) -> Self {
Self { expr: boxed }
}
}

/// Alias expression
Expand Down
1 change: 0 additions & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1452,7 +1452,6 @@ pub fn project(
_ => projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?),
}
}

validate_unique_names("Projections", projected_expr.iter())?;

Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
Expand Down
3 changes: 2 additions & 1 deletion datafusion/expr/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ impl TreeNode for Expr {
| Expr::Exists { .. }
| Expr::ScalarSubquery(_)
| Expr::ScalarVariable(_, _)
| Expr::Unnest(_)
| Expr::Literal(_) => Transformed::no(self),
Expr::Unnest(Unnest { expr, .. }) => transform_box(expr, &mut f)?
.update_data(|be| Expr::Unnest(Unnest::new_boxed(be))),
Expr::Alias(Alias {
expr,
relation,
Expand Down
98 changes: 56 additions & 42 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ use std::sync::Arc;

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::utils::{
check_columns_satisfy_exprs, extract_aliases, rebase_expr,
recursive_transform_unnest, resolve_aliases_to_exprs, resolve_columns,
resolve_positions_to_exprs,
check_columns_satisfy_exprs, extract_aliases, rebase_expr, resolve_aliases_to_exprs,
resolve_columns, resolve_positions_to_exprs, transform_bottom_unnest,
};

use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
Expand Down Expand Up @@ -296,46 +295,61 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
input: LogicalPlan,
select_exprs: Vec<Expr>,
) -> Result<LogicalPlan> {
let mut unnest_columns = vec![];
// from which column used for projection, before the unnest happen
// including non unnest column and unnest column
let mut inner_projection_exprs = vec![];

// expr returned here maybe different from the originals in inner_projection_exprs
// for example:
// - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2
// - unnest(array_col) will be transformed into unnest(array_col).element
// - unnest(array_col) + 1 will be transformed into unnest(array_col).element +1
let outer_projection_exprs: Vec<Expr> = select_exprs
.into_iter()
.map(|expr| {
recursive_transform_unnest(
&input,
&mut unnest_columns,
&mut inner_projection_exprs,
expr,
)
})
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect();

// Do the final projection
if unnest_columns.is_empty() {
LogicalPlanBuilder::from(input)
.project(inner_projection_exprs)?
.build()
} else {
let columns = unnest_columns.into_iter().map(|col| col.into()).collect();
// Set preserve_nulls to false to ensure compatibility with DuckDB and PostgreSQL
let unnest_options = UnnestOptions::new().with_preserve_nulls(false);
LogicalPlanBuilder::from(input)
.project(inner_projection_exprs)?
.unnest_columns_with_options(columns, unnest_options)?
.project(outer_projection_exprs)?
.build()
let mut intermediate_plan = input;
let mut intermediate_select_exprs = select_exprs;
// Each expr in select_exprs can contains multiple unnest stage
// The transformation happen bottom up, one at a time for each iteration
// Ony exaust the loop if no more unnest transformation is found
for i in 0.. {
let mut unnest_columns = vec![];
// from which column used for projection, before the unnest happen
// including non unnest column and unnest column
let mut inner_projection_exprs = vec![];

// expr returned here maybe different from the originals in inner_projection_exprs
// for example:
// - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2
// - unnest(array_col) will be transformed into unnest(array_col).element
// - unnest(array_col) + 1 will be transformed into unnest(array_col).element +1
let outer_projection_exprs: Vec<Expr> = intermediate_select_exprs
.iter()
.map(|expr| {
transform_bottom_unnest(
&intermediate_plan,
&mut unnest_columns,
&mut inner_projection_exprs,
expr,
)
})
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect();

// No more unnest is possible
if unnest_columns.is_empty() {
// The original expr does not contain any unnest
if i == 0 {
return LogicalPlanBuilder::from(intermediate_plan)
.project(inner_projection_exprs)?
.build();
}
break;
} else {
let columns = unnest_columns.into_iter().map(|col| col.into()).collect();
// Set preserve_nulls to false to ensure compatibility with DuckDB and PostgreSQL
let unnest_options = UnnestOptions::new().with_preserve_nulls(false);
let plan = LogicalPlanBuilder::from(intermediate_plan)
.project(inner_projection_exprs)?
.unnest_columns_with_options(columns, unnest_options)?
.build()?;
intermediate_plan = plan;
intermediate_select_exprs = outer_projection_exprs;
}
}
LogicalPlanBuilder::from(intermediate_plan)
.project(intermediate_select_exprs)?
.build()
}

fn plan_selection(
Expand Down
109 changes: 93 additions & 16 deletions datafusion/sql/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use std::collections::HashMap;
use arrow_schema::{
DataType, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DECIMAL_DEFAULT_SCALE,
};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
use datafusion_common::{
exec_err, internal_err, plan_err, Column, DataFusionError, Result, ScalarValue,
};
Expand Down Expand Up @@ -267,11 +269,13 @@ pub(crate) fn normalize_ident(id: Ident) -> String {
/// - For list column: unnest(col) with type list -> unnest(col) with type list::item
/// - For struct column: unnest(struct(field1, field2)) -> unnest(struct).field1, unnest(struct).field2
/// The transformed exprs will be used in the outer projection
pub(crate) fn recursive_transform_unnest(
/// If along the path from root to bottom, there are multiple unnest expressions, the transformation
/// is done only for the bottom expression
pub(crate) fn transform_bottom_unnest(
Copy link
Contributor

@jayzhan211 jayzhan211 Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would transform List / Struct separately help readability here?

something like

transform_bottom_unnest() {
 // common logic
 fn transform();

 list:
     transform_list // list specific logic
 struct:
     transform_struct // struct specific logic (only unnest with the top level)
}

input: &LogicalPlan,
unnest_placeholder_columns: &mut Vec<String>,
inner_projection_exprs: &mut Vec<Expr>,
original_expr: Expr,
original_expr: &Expr,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could avoid clone in L297 too.

) -> Result<Vec<Expr>> {
let mut transform =
|unnest_expr: &Expr, expr_in_unnest: &Expr| -> Result<Vec<Expr>> {
Expand Down Expand Up @@ -306,21 +310,32 @@ pub(crate) fn recursive_transform_unnest(

// Specifically handle root level unnest expr, this is the only place
// unnest on struct can be handled
if let Expr::Unnest(Unnest { expr: ref arg }) = original_expr {
return transform(&original_expr, arg);
}
// if let Expr::Unnest(Unnest { expr: ref arg }) = original_expr {
// return transform(&original_expr, arg);
// }
let mut root_expr = vec![];
let Transformed {
data: transformed_expr,
transformed,
tnr: _,
} = original_expr.transform_up(|expr: Expr| {
} = original_expr.clone().transform_up(|expr: Expr| {
let is_root_expr = &expr == original_expr;

if let Expr::Unnest(Unnest { expr: ref arg }) = expr {
let (data_type, _) = arg.data_type_and_nullable(input.schema())?;
if let DataType::Struct(_) = data_type {
return internal_err!("unnest on struct can ony be applied at the root level of select expression");
if is_root_expr{
root_expr.extend(transform(original_expr, arg)?);
return Ok(Transformed::new(expr,true,TreeNodeRecursion::Stop));
}
if !is_root_expr {
if let DataType::Struct(_) = data_type {
return internal_err!("unnest on struct can ony be applied at the root level of select expression");
}
}

let transformed_exprs = transform(&expr, arg)?;
Ok(Transformed::yes(transformed_exprs[0].clone()))
// root_expr.push(transformed_exprs[0].clone());
Ok(Transformed::new(transformed_exprs[0].clone(),true,TreeNodeRecursion::Stop))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use swap_remove(0) to avoid clone

} else {
Ok(Transformed::no(expr))
}
Expand All @@ -338,6 +353,11 @@ pub(crate) fn recursive_transform_unnest(
Ok(vec![Expr::Column(Column::from_name(column_name))])
}
} else {
// A workaround, because at root level
// an unnest on struct column can be transformd into multiple exprs
if !root_expr.is_empty() {
return Ok(root_expr);
}
Ok(vec![transformed_expr])
}
}
Expand All @@ -351,12 +371,13 @@ mod tests {
use arrow_schema::Fields;
use datafusion_common::{DFSchema, Result};
use datafusion_expr::{col, lit, unnest, EmptyRelation, LogicalPlan};
use datafusion_functions::core::expr_ext::FieldAccessor;
use datafusion_functions_aggregate::expr_fn::count;

use crate::utils::{recursive_transform_unnest, resolve_positions_to_exprs};
use crate::utils::{resolve_positions_to_exprs, transform_bottom_unnest};

#[test]
fn test_recursive_transform_unnest() -> Result<()> {
fn test_transform_bottom_unnest() -> Result<()> {
let schema = Schema::new(vec![
Field::new(
"struct_col",
Expand Down Expand Up @@ -390,11 +411,11 @@ mod tests {

// unnest(struct_col)
let original_expr = unnest(col("struct_col"));
let transformed_exprs = recursive_transform_unnest(
let transformed_exprs = transform_bottom_unnest(
&input,
&mut unnest_placeholder_columns,
&mut inner_projection_exprs,
original_expr,
&original_expr,
)?;
assert_eq!(
transformed_exprs,
Expand All @@ -413,11 +434,11 @@ mod tests {

// unnest(array_col) + 1
let original_expr = unnest(col("array_col")).add(lit(1i64));
let transformed_exprs = recursive_transform_unnest(
let transformed_exprs = transform_bottom_unnest(
&input,
&mut unnest_placeholder_columns,
&mut inner_projection_exprs,
original_expr,
&original_expr,
)?;
assert_eq!(
unnest_placeholder_columns,
Expand All @@ -440,6 +461,62 @@ mod tests {
]
);

// a nested structure struct[[]]
let schema = Schema::new(vec![
Field::new(
"struct_col", // {array_col: [1,2,3]}
ArrowDataType::Struct(Fields::from(vec![Field::new(
"matrix",
ArrowDataType::List(Arc::new(Field::new(
"matrix_row",
ArrowDataType::List(Arc::new(Field::new(
"item",
ArrowDataType::Int64,
true,
))),
true,
))),
true,
)])),
false,
),
Field::new("int_col", ArrowDataType::Int32, false),
]);

let dfschema = DFSchema::try_from(schema)?;

let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(dfschema),
});

let mut unnest_placeholder_columns = vec![];
let mut inner_projection_exprs = vec![];

// An expr with multiple unnest
let original_expr = unnest(unnest(col("struct_col").field("matrix")));
let transformed_exprs = transform_bottom_unnest(
&input,
&mut unnest_placeholder_columns,
&mut inner_projection_exprs,
&original_expr,
)?;
// Only the inner most/ bottom most unnest is transformed
assert_eq!(
transformed_exprs,
vec![unnest(col("unnest(struct_col[matrix])"))]
);
assert_eq!(
unnest_placeholder_columns,
vec!["unnest(struct_col[matrix])"]
);
assert_eq!(
inner_projection_exprs,
vec![col("struct_col")
.field("matrix")
.alias("unnest(struct_col[matrix])"),]
);

Ok(())
}

Expand Down
Loading