diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 1613e5089860c..de2f2310eedd1 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -42,6 +42,7 @@ use datafusion_common::{internal_err, JoinSide, JoinType}; use datafusion_expr::sort_properties::SortProperties; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; +use datafusion_physical_plan::joins::utils::{project_index_to_exprs, remap_join_projections_join_to_output}; /// The [`JoinSelection`] rule tries to modify a given plan so that it can /// accommodate infinite sources and optimize joins in the plan according to @@ -193,8 +194,28 @@ pub fn swap_hash_join( Ok(Arc::new(new_join)) } else { // TODO avoid adding ProjectionExec again and again, only adding Final Projection + // ADR: FIXME the projection inside the hash join functionality is not consistent + // see https://github.com/apache/datafusion/commit/afddb321e9a98ffc1947005c38b6b50a6ef2a401 + // Failing to do the below code will create a projection exec with a projection that is + // possibly outside the schema. + let actual_projection = if new_join.projection.is_some() { + let tmp = remap_join_projections_join_to_output( + new_join.left().clone(), + new_join.right().clone(), + new_join.join_type(), + new_join.projection.clone(), + )?.unwrap(); + project_index_to_exprs( + &tmp, + &new_join.schema() + ) + } else { + swap_reverting_projection(&left.schema(), &right.schema()) + }; + // let swap_proj = swap_reverting_projection(&left.schema(), &right.schema()); + let proj = ProjectionExec::try_new( - swap_reverting_projection(&left.schema(), &right.schema()), + actual_projection, Arc::new(new_join), )?; Ok(Arc::new(proj)) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 784584f03f0f5..3786a61be045a 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -69,11 +69,12 @@ use datafusion_physical_expr::equivalence::{ join_equivalence_properties, ProjectionMapping, }; use datafusion_physical_expr::expressions::UnKnownColumn; -use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; +use datafusion_physical_expr::PhysicalExprRef; use ahash::RandomState; use futures::{ready, Stream, StreamExt, TryStreamExt}; use parking_lot::Mutex; +use crate::joins::utils::project_index_to_exprs; type SharedBitmapBuilder = Mutex; @@ -610,25 +611,6 @@ impl DisplayAs for HashJoinExec { } } -fn project_index_to_exprs( - projection_index: &[usize], - schema: &SchemaRef, -) -> Vec<(Arc, String)> { - projection_index - .iter() - .map(|index| { - let field = schema.field(*index); - ( - Arc::new(datafusion_physical_expr::expressions::Column::new( - field.name(), - *index, - )) as Arc, - field.name().to_owned(), - ) - }) - .collect::>() -} - impl ExecutionPlan for HashJoinExec { fn name(&self) -> &'static str { "HashJoinExec" @@ -1566,6 +1548,7 @@ mod tests { use hashbrown::raw::RawTable; use rstest::*; use rstest_reuse::*; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; fn div_ceil(a: usize, b: usize) -> usize { (a + b - 1) / b diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 0d99d7a163567..462ea0fdd034b 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -37,6 +37,7 @@ use arrow::datatypes::{Field, Schema, SchemaBuilder}; use arrow::record_batch::{RecordBatch, RecordBatchOptions}; use arrow_array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray}; use arrow_buffer::ArrowNativeType; +use arrow_schema::SchemaRef; use datafusion_common::cast::as_boolean_array; use datafusion_common::stats::Precision; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; @@ -44,7 +45,7 @@ use datafusion_common::{ plan_err, DataFusionError, JoinSide, JoinType, Result, SharedResult, }; use datafusion_expr::interval_arithmetic::Interval; -use datafusion_physical_expr::equivalence::add_offset_to_expr; +use datafusion_physical_expr::equivalence::{add_offset_to_expr, ProjectionMapping}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::utils::{collect_columns, merge_vectors}; use datafusion_physical_expr::{ @@ -55,6 +56,7 @@ use futures::future::{BoxFuture, Shared}; use futures::{ready, FutureExt}; use hashbrown::raw::RawTable; use parking_lot::Mutex; +use crate::common::can_project; /// Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value. /// @@ -728,6 +730,66 @@ pub fn build_join_schema( (fields.finish(), column_indices) } +/// This assumes that the projections are relative to the join schema. +/// We need to redo them to point to the actual hash join output schema +pub fn remap_join_projections_join_to_output( + left: Arc, + right: Arc, + join_type: &JoinType, + projection: Option>, +) -> datafusion_common::Result>> { + match projection { + Some(ref projection) => { + let (join_schema, indices) = build_join_schema( + left.schema().as_ref(), + right.schema().as_ref(), + join_type + ); + + let join_schema = Arc::new(join_schema); + can_project(&join_schema, Some(projection.clone()).as_ref())?; + + let projection_exprs = project_index_to_exprs( + &projection.clone(), + &join_schema + ); + let projection_mapping = + ProjectionMapping::try_new(&projection_exprs, &join_schema)?; + + // projection mapping contains from and to, get the second one + let dest_physical_exprs = projection_mapping.map.iter().map(|(f, t)| t.clone()).collect::>(); + let dest_columns = dest_physical_exprs.iter().map(|pe| pe.as_any().downcast_ref::()).collect::>(); + let output = dest_physical_exprs.iter().enumerate().map(|(idx, pe)| { + // :Vec<(Arc, String)> + // (pe.clone(), dest_column.name().to_owned()) + let dest_column = dest_columns.get(idx).unwrap().unwrap(); + dest_column.index() + }).collect::>(); + Ok(Some(output)) + }, + None => Ok(None) + } +} + +pub fn project_index_to_exprs( + projection_index: &[usize], + schema: &SchemaRef, +) -> Vec<(Arc, String)> { + projection_index + .iter() + .map(|index| { + let field = schema.field(*index); + ( + Arc::new(datafusion_physical_expr::expressions::Column::new( + field.name(), + *index, + )) as Arc, + field.name().to_owned(), + ) + }) + .collect::>() +} + /// A [`OnceAsync`] can be used to run an async closure once, with subsequent calls /// to [`OnceAsync::once`] returning a [`OnceFut`] to the same asynchronous computation ///