diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 9ba866a4c919..c11c9ea2ce4f 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -355,6 +355,11 @@ impl Unnest { expr: Box::new(expr), } } + + /// Create a new Unnest expression. + pub fn new_boxed(boxed: Box) -> Self { + Self { expr: boxed } + } } /// Alias expression diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 2f1ece32ab15..1ffb8fce7b22 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1452,7 +1452,6 @@ pub fn project( _ => projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?), } } - validate_unique_names("Projections", projected_expr.iter())?; Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection) diff --git a/datafusion/expr/src/tree_node.rs b/datafusion/expr/src/tree_node.rs index e57d57188743..f1df8609f903 100644 --- a/datafusion/expr/src/tree_node.rs +++ b/datafusion/expr/src/tree_node.rs @@ -136,8 +136,9 @@ impl TreeNode for Expr { | Expr::Exists { .. } | Expr::ScalarSubquery(_) | Expr::ScalarVariable(_, _) - | Expr::Unnest(_) | Expr::Literal(_) => Transformed::no(self), + Expr::Unnest(Unnest { expr, .. }) => transform_box(expr, &mut f)? + .update_data(|be| Expr::Unnest(Unnest::new_boxed(be))), Expr::Alias(Alias { expr, relation, diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 0fa266e4e01d..7e6eb48795d9 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -20,9 +20,8 @@ use std::sync::Arc; use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use crate::utils::{ - check_columns_satisfy_exprs, extract_aliases, rebase_expr, - recursive_transform_unnest, resolve_aliases_to_exprs, resolve_columns, - resolve_positions_to_exprs, + check_columns_satisfy_exprs, extract_aliases, rebase_expr, resolve_aliases_to_exprs, + resolve_columns, resolve_positions_to_exprs, transform_bottom_unnest, }; use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result}; @@ -296,46 +295,61 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { input: LogicalPlan, select_exprs: Vec, ) -> Result { - let mut unnest_columns = vec![]; - // from which column used for projection, before the unnest happen - // including non unnest column and unnest column - let mut inner_projection_exprs = vec![]; - - // expr returned here maybe different from the originals in inner_projection_exprs - // for example: - // - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2 - // - unnest(array_col) will be transformed into unnest(array_col).element - // - unnest(array_col) + 1 will be transformed into unnest(array_col).element +1 - let outer_projection_exprs: Vec = select_exprs - .into_iter() - .map(|expr| { - recursive_transform_unnest( - &input, - &mut unnest_columns, - &mut inner_projection_exprs, - expr, - ) - }) - .collect::>>()? - .into_iter() - .flatten() - .collect(); - - // Do the final projection - if unnest_columns.is_empty() { - LogicalPlanBuilder::from(input) - .project(inner_projection_exprs)? - .build() - } else { - let columns = unnest_columns.into_iter().map(|col| col.into()).collect(); - // Set preserve_nulls to false to ensure compatibility with DuckDB and PostgreSQL - let unnest_options = UnnestOptions::new().with_preserve_nulls(false); - LogicalPlanBuilder::from(input) - .project(inner_projection_exprs)? - .unnest_columns_with_options(columns, unnest_options)? - .project(outer_projection_exprs)? - .build() + let mut intermediate_plan = input; + let mut intermediate_select_exprs = select_exprs; + // Each expr in select_exprs can contains multiple unnest stage + // The transformation happen bottom up, one at a time for each iteration + // Ony exaust the loop if no more unnest transformation is found + for i in 0.. { + let mut unnest_columns = vec![]; + // from which column used for projection, before the unnest happen + // including non unnest column and unnest column + let mut inner_projection_exprs = vec![]; + + // expr returned here maybe different from the originals in inner_projection_exprs + // for example: + // - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2 + // - unnest(array_col) will be transformed into unnest(array_col).element + // - unnest(array_col) + 1 will be transformed into unnest(array_col).element +1 + let outer_projection_exprs: Vec = intermediate_select_exprs + .iter() + .map(|expr| { + transform_bottom_unnest( + &intermediate_plan, + &mut unnest_columns, + &mut inner_projection_exprs, + expr, + ) + }) + .collect::>>()? + .into_iter() + .flatten() + .collect(); + + // No more unnest is possible + if unnest_columns.is_empty() { + // The original expr does not contain any unnest + if i == 0 { + return LogicalPlanBuilder::from(intermediate_plan) + .project(inner_projection_exprs)? + .build(); + } + break; + } else { + let columns = unnest_columns.into_iter().map(|col| col.into()).collect(); + // Set preserve_nulls to false to ensure compatibility with DuckDB and PostgreSQL + let unnest_options = UnnestOptions::new().with_preserve_nulls(false); + let plan = LogicalPlanBuilder::from(intermediate_plan) + .project(inner_projection_exprs)? + .unnest_columns_with_options(columns, unnest_options)? + .build()?; + intermediate_plan = plan; + intermediate_select_exprs = outer_projection_exprs; + } } + LogicalPlanBuilder::from(intermediate_plan) + .project(intermediate_select_exprs)? + .build() } fn plan_selection( diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index bc27d25cf216..2eacbd174fc2 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -22,7 +22,9 @@ use std::collections::HashMap; use arrow_schema::{ DataType, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DECIMAL_DEFAULT_SCALE, }; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::tree_node::{ + Transformed, TransformedResult, TreeNode, TreeNodeRecursion, +}; use datafusion_common::{ exec_err, internal_err, plan_err, Column, DataFusionError, Result, ScalarValue, }; @@ -267,11 +269,13 @@ pub(crate) fn normalize_ident(id: Ident) -> String { /// - For list column: unnest(col) with type list -> unnest(col) with type list::item /// - For struct column: unnest(struct(field1, field2)) -> unnest(struct).field1, unnest(struct).field2 /// The transformed exprs will be used in the outer projection -pub(crate) fn recursive_transform_unnest( +/// If along the path from root to bottom, there are multiple unnest expressions, the transformation +/// is done only for the bottom expression +pub(crate) fn transform_bottom_unnest( input: &LogicalPlan, unnest_placeholder_columns: &mut Vec, inner_projection_exprs: &mut Vec, - original_expr: Expr, + original_expr: &Expr, ) -> Result> { let mut transform = |unnest_expr: &Expr, expr_in_unnest: &Expr| -> Result> { @@ -298,35 +302,53 @@ pub(crate) fn recursive_transform_unnest( .collect::>(); Ok(expr) }; - // expr transformed maybe either the same, or different from the originals exprs - // for example: - // - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2 + // This transformation is only done for list unnest + // struct unnest is done at the root level, and at the later stage + // because the syntax of TreeNode only support transform into 1 Expr, while + // Unnest struct will be transformed into multiple Exprs + // TODO: This can be resolved after this issue is resolved: https://github.com/apache/datafusion/issues/10102 + // + // The transformation looks like: // - unnest(array_col) will be transformed into unnest(array_col) // - unnest(array_col) + 1 will be transformed into unnest(array_col) + 1 - - // Specifically handle root level unnest expr, this is the only place - // unnest on struct can be handled - if let Expr::Unnest(Unnest { expr: ref arg }) = original_expr { - return transform(&original_expr, arg); - } let Transformed { - data: transformed_expr, - transformed, - tnr: _, - } = original_expr.transform_up(|expr: Expr| { - if let Expr::Unnest(Unnest { expr: ref arg }) = expr { - let (data_type, _) = arg.data_type_and_nullable(input.schema())?; - if let DataType::Struct(_) = data_type { - return internal_err!("unnest on struct can ony be applied at the root level of select expression"); - } - let transformed_exprs = transform(&expr, arg)?; - Ok(Transformed::yes(transformed_exprs[0].clone())) - } else { - Ok(Transformed::no(expr)) + data: transformed_expr, + transformed, + tnr: _, + } = original_expr.clone().transform_up(|expr: Expr| { + let is_root_expr = &expr == original_expr; + // Root expr is transformed separately + if is_root_expr { + return Ok(Transformed::no(expr)); + } + if let Expr::Unnest(Unnest { expr: ref arg }) = expr { + let (data_type, _) = arg.data_type_and_nullable(input.schema())?; + + if let DataType::Struct(_) = data_type { + return internal_err!("unnest on struct can ony be applied at the root level of select expression"); } - })?; + + let mut transformed_exprs = transform(&expr, arg)?; + // root_expr.push(transformed_exprs[0].clone()); + Ok(Transformed::new( + transformed_exprs.swap_remove(0), + true, + TreeNodeRecursion::Stop, + )) + } else { + Ok(Transformed::no(expr)) + } + })?; if !transformed { + // Because root expr need to transform separately + // unnest struct is only possible here + // The transformation looks like + // - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2 + if let Expr::Unnest(Unnest { expr: ref arg }) = transformed_expr { + return transform(&transformed_expr, arg); + } + if matches!(&transformed_expr, Expr::Column(_)) { inner_projection_exprs.push(transformed_expr.clone()); Ok(vec![transformed_expr]) @@ -351,12 +373,13 @@ mod tests { use arrow_schema::Fields; use datafusion_common::{DFSchema, Result}; use datafusion_expr::{col, lit, unnest, EmptyRelation, LogicalPlan}; + use datafusion_functions::core::expr_ext::FieldAccessor; use datafusion_functions_aggregate::expr_fn::count; - use crate::utils::{recursive_transform_unnest, resolve_positions_to_exprs}; + use crate::utils::{resolve_positions_to_exprs, transform_bottom_unnest}; #[test] - fn test_recursive_transform_unnest() -> Result<()> { + fn test_transform_bottom_unnest() -> Result<()> { let schema = Schema::new(vec![ Field::new( "struct_col", @@ -390,11 +413,11 @@ mod tests { // unnest(struct_col) let original_expr = unnest(col("struct_col")); - let transformed_exprs = recursive_transform_unnest( + let transformed_exprs = transform_bottom_unnest( &input, &mut unnest_placeholder_columns, &mut inner_projection_exprs, - original_expr, + &original_expr, )?; assert_eq!( transformed_exprs, @@ -413,11 +436,11 @@ mod tests { // unnest(array_col) + 1 let original_expr = unnest(col("array_col")).add(lit(1i64)); - let transformed_exprs = recursive_transform_unnest( + let transformed_exprs = transform_bottom_unnest( &input, &mut unnest_placeholder_columns, &mut inner_projection_exprs, - original_expr, + &original_expr, )?; assert_eq!( unnest_placeholder_columns, @@ -440,6 +463,62 @@ mod tests { ] ); + // a nested structure struct[[]] + let schema = Schema::new(vec![ + Field::new( + "struct_col", // {array_col: [1,2,3]} + ArrowDataType::Struct(Fields::from(vec![Field::new( + "matrix", + ArrowDataType::List(Arc::new(Field::new( + "matrix_row", + ArrowDataType::List(Arc::new(Field::new( + "item", + ArrowDataType::Int64, + true, + ))), + true, + ))), + true, + )])), + false, + ), + Field::new("int_col", ArrowDataType::Int32, false), + ]); + + let dfschema = DFSchema::try_from(schema)?; + + let input = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(dfschema), + }); + + let mut unnest_placeholder_columns = vec![]; + let mut inner_projection_exprs = vec![]; + + // An expr with multiple unnest + let original_expr = unnest(unnest(col("struct_col").field("matrix"))); + let transformed_exprs = transform_bottom_unnest( + &input, + &mut unnest_placeholder_columns, + &mut inner_projection_exprs, + &original_expr, + )?; + // Only the inner most/ bottom most unnest is transformed + assert_eq!( + transformed_exprs, + vec![unnest(col("unnest(struct_col[matrix])"))] + ); + assert_eq!( + unnest_placeholder_columns, + vec!["unnest(struct_col[matrix])"] + ); + assert_eq!( + inner_projection_exprs, + vec![col("struct_col") + .field("matrix") + .alias("unnest(struct_col[matrix])"),] + ); + Ok(()) } diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index 11ad8e0bb843..06733f7b1e40 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -37,6 +37,13 @@ AS VALUES (struct('d', 'e', struct('f')), (struct('x', 'y', [30,40, 50])), null) ; +statement ok +CREATE TABLE recursive_unnest_table +AS VALUES + (struct([1], 'a'), [[[1],[2]],[[1,1]]], [struct([1],[[1,2]])]), + (struct([2], 'b'), [[[3,4],[5]],[[null,6],null,[7,8]]], [struct([2],[[3],[4]])]) +; + ## Basic unnest expression in select list query I select unnest([1,2,3]); @@ -158,6 +165,37 @@ select unnest(column1), column1 from unnest_table; 6 [6] 12 [12] +## unnest as children of other expr +query I? +select unnest(column1) + 1 , column1 from unnest_table; +---- +2 [1, 2, 3] +3 [1, 2, 3] +4 [1, 2, 3] +5 [4, 5] +6 [4, 5] +7 [6] +13 [12] + +## unnest on multiple columns +query II +select unnest(column1), unnest(column2) from unnest_table; +---- +1 7 +2 NULL +3 NULL +4 8 +5 9 +NULL 10 +6 11 +NULL 12 +12 NULL +NULL 42 +NULL NULL + +query error DataFusion error: Error during planning: unnest\(\) can only be applied to array, struct and null +select unnest('foo'); + query ?II select array_remove(column1, 4), unnest(column2), column3 * 10 from unnest_table; ---- @@ -458,5 +496,64 @@ select unnest(column1) from (select * from (values([1,2,3]), ([4,5,6])) limit 1 5 6 +## FIXME: https://github.com/apache/datafusion/issues/11198 +query error DataFusion error: Error during planning: Projections require unique expression names but the expression "UNNEST\(Column\(Column \{ relation: Some\(Bare \{ table: "unnest_table" \}\), name: "column1" \}\)\)" at position 0 and "UNNEST\(Column\(Column \{ relation: Some\(Bare \{ table: "unnest_table" \}\), name: "column1" \}\)\)" at position 1 have the same name. Consider aliasing \("AS"\) one of them. +select unnest(column1), unnest(column1) from unnest_table; + statement ok drop table unnest_table; + +## unnest list followed by unnest struct +query ??? +select unnest(unnest(column3)), column3 from recursive_unnest_table; +---- +[1] [[1, 2]] [{c0: [1], c1: [[1, 2]]}] +[2] [[3], [4]] [{c0: [2], c1: [[3], [4]]}] + +## unnest->field_access->unnest->unnest +query I? +select unnest(unnest(unnest(column3)['c1'])), column3 from recursive_unnest_table; +---- +1 [{c0: [1], c1: [[1, 2]]}] +2 [{c0: [1], c1: [[1, 2]]}] +3 [{c0: [2], c1: [[3], [4]]}] +4 [{c0: [2], c1: [[3], [4]]}] + +## tripple list unnest +query I? +select unnest(unnest(unnest(column2))), column2 from recursive_unnest_table; +---- +1 [[[1], [2]], [[1, 1]]] +2 [[[1], [2]], [[1, 1]]] +1 [[[1], [2]], [[1, 1]]] +1 [[[1], [2]], [[1, 1]]] +3 [[[3, 4], [5]], [[, 6], , [7, 8]]] +4 [[[3, 4], [5]], [[, 6], , [7, 8]]] +5 [[[3, 4], [5]], [[, 6], , [7, 8]]] +NULL [[[3, 4], [5]], [[, 6], , [7, 8]]] +6 [[[3, 4], [5]], [[, 6], , [7, 8]]] +7 [[[3, 4], [5]], [[, 6], , [7, 8]]] +8 [[[3, 4], [5]], [[, 6], , [7, 8]]] + + + +query TT +explain select unnest(unnest(unnest(column3)['c1'])), column3 from recursive_unnest_table; +---- +logical_plan +01)Unnest: lists[unnest(unnest(unnest(recursive_unnest_table.column3)[c1]))] structs[] +02)--Projection: unnest(unnest(recursive_unnest_table.column3)[c1]) AS unnest(unnest(unnest(recursive_unnest_table.column3)[c1])), recursive_unnest_table.column3 +03)----Unnest: lists[unnest(unnest(recursive_unnest_table.column3)[c1])] structs[] +04)------Projection: get_field(unnest(recursive_unnest_table.column3), Utf8("c1")) AS unnest(unnest(recursive_unnest_table.column3)[c1]), recursive_unnest_table.column3 +05)--------Unnest: lists[unnest(recursive_unnest_table.column3)] structs[] +06)----------Projection: recursive_unnest_table.column3 AS unnest(recursive_unnest_table.column3), recursive_unnest_table.column3 +07)------------TableScan: recursive_unnest_table projection=[column3] +physical_plan +01)UnnestExec +02)--ProjectionExec: expr=[unnest(unnest(recursive_unnest_table.column3)[c1])@0 as unnest(unnest(unnest(recursive_unnest_table.column3)[c1])), column3@1 as column3] +03)----UnnestExec +04)------ProjectionExec: expr=[get_field(unnest(recursive_unnest_table.column3)@0, c1) as unnest(unnest(recursive_unnest_table.column3)[c1]), column3@1 as column3] +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +06)----------UnnestExec +07)------------ProjectionExec: expr=[column3@0 as unnest(recursive_unnest_table.column3), column3@0 as column3] +08)--------------MemoryExec: partitions=1, partition_sizes=[1] \ No newline at end of file