diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 499fb9cbbcf0..c6b3fc4f2d2c 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -42,6 +42,7 @@ use datafusion_expr::sort_properties::SortProperties; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::joins::utils::{project_index_to_exprs, remap_join_projections_join_to_output}; /// The [`JoinSelection`] rule tries to modify a given plan so that it can /// accommodate infinite sources and optimize joins in the plan according to @@ -193,8 +194,28 @@ pub fn swap_hash_join( Ok(Arc::new(new_join)) } else { // TODO avoid adding ProjectionExec again and again, only adding Final Projection + // ADR: FIXME the projection inside the hash join functionality is not consistent + // see https://github.com/apache/datafusion/commit/afddb321e9a98ffc1947005c38b6b50a6ef2a401 + // Failing to do the below code will create a projection exec with a projection that is + // possibly outside the schema. + let actual_projection = if new_join.projection.is_some() { + let tmp = remap_join_projections_join_to_output( + new_join.left().clone(), + new_join.right().clone(), + new_join.join_type(), + new_join.projection.clone(), + )?.unwrap(); + project_index_to_exprs( + &tmp, + &new_join.schema() + ) + } else { + swap_reverting_projection(&left.schema(), &right.schema()) + }; + // let swap_proj = swap_reverting_projection(&left.schema(), &right.schema()); + let proj = ProjectionExec::try_new( - swap_reverting_projection(&left.schema(), &right.schema()), + actual_projection, Arc::new(new_join), )?; Ok(Arc::new(proj)) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 48d648c89a35..c6dd1e2d7f65 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -75,6 +75,7 @@ use datafusion_expr::Operator; use datafusion_physical_expr_common::datum::compare_op_for_nested; use futures::{ready, Stream, StreamExt, TryStreamExt}; use parking_lot::Mutex; +use crate::joins::utils::project_index_to_exprs; type SharedBitmapBuilder = Mutex; @@ -1548,6 +1549,7 @@ mod tests { use hashbrown::raw::RawTable; use rstest::*; use rstest_reuse::*; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; fn div_ceil(a: usize, b: usize) -> usize { (a + b - 1) / b diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 89f3feaf07be..458abdf8fef5 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -39,6 +39,7 @@ use arrow::record_batch::{RecordBatch, RecordBatchOptions}; use arrow_array::builder::UInt64Builder; use arrow_array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray}; use arrow_buffer::ArrowNativeType; +use arrow_schema::SchemaRef; use datafusion_common::cast::as_boolean_array; use datafusion_common::stats::Precision; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; @@ -46,7 +47,7 @@ use datafusion_common::{ plan_err, DataFusionError, JoinSide, JoinType, Result, SharedResult, }; use datafusion_expr::interval_arithmetic::Interval; -use datafusion_physical_expr::equivalence::add_offset_to_expr; +use datafusion_physical_expr::equivalence::{add_offset_to_expr, ProjectionMapping}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::utils::{collect_columns, merge_vectors}; use datafusion_physical_expr::{ @@ -57,6 +58,7 @@ use futures::future::{BoxFuture, Shared}; use futures::{ready, FutureExt}; use hashbrown::raw::RawTable; use parking_lot::Mutex; +use crate::common::can_project; /// Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value. /// @@ -703,6 +705,66 @@ pub fn build_join_schema( (fields.finish(), column_indices) } +/// This assumes that the projections are relative to the join schema. +/// We need to redo them to point to the actual hash join output schema +pub fn remap_join_projections_join_to_output( + left: Arc, + right: Arc, + join_type: &JoinType, + projection: Option>, +) -> datafusion_common::Result>> { + match projection { + Some(ref projection) => { + let (join_schema, indices) = build_join_schema( + left.schema().as_ref(), + right.schema().as_ref(), + join_type + ); + + let join_schema = Arc::new(join_schema); + can_project(&join_schema, Some(projection.clone()).as_ref())?; + + let projection_exprs = project_index_to_exprs( + &projection.clone(), + &join_schema + ); + let projection_mapping = + ProjectionMapping::try_new(&projection_exprs, &join_schema)?; + + // projection mapping contains from and to, get the second one + let dest_physical_exprs = projection_mapping.map.iter().map(|(f, t)| t.clone()).collect::>(); + let dest_columns = dest_physical_exprs.iter().map(|pe| pe.as_any().downcast_ref::()).collect::>(); + let output = dest_physical_exprs.iter().enumerate().map(|(idx, pe)| { + // :Vec<(Arc, String)> + // (pe.clone(), dest_column.name().to_owned()) + let dest_column = dest_columns.get(idx).unwrap().unwrap(); + dest_column.index() + }).collect::>(); + Ok(Some(output)) + }, + None => Ok(None) + } +} + +pub fn project_index_to_exprs( + projection_index: &[usize], + schema: &SchemaRef, +) -> Vec<(Arc, String)> { + projection_index + .iter() + .map(|index| { + let field = schema.field(*index); + ( + Arc::new(datafusion_physical_expr::expressions::Column::new( + field.name(), + *index, + )) as Arc, + field.name().to_owned(), + ) + }) + .collect::>() +} + /// A [`OnceAsync`] can be used to run an async closure once, with subsequent calls /// to [`OnceAsync::once`] returning a [`OnceFut`] to the same asynchronous computation ///