Skip to content

Commit

Permalink
[HSTACK] Fix for apache#10978 - Inconsistent behavior in HashJoin Pro…
Browse files Browse the repository at this point in the history
…jections
  • Loading branch information
adragomir committed Jun 20, 2024
1 parent d8d61d3 commit 3cea7ab
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 22 deletions.
23 changes: 22 additions & 1 deletion datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use datafusion_common::{internal_err, JoinSide, JoinType};
use datafusion_expr::sort_properties::SortProperties;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use datafusion_physical_plan::joins::utils::{project_index_to_exprs, remap_join_projections_join_to_output};

/// The [`JoinSelection`] rule tries to modify a given plan so that it can
/// accommodate infinite sources and optimize joins in the plan according to
Expand Down Expand Up @@ -193,8 +194,28 @@ pub fn swap_hash_join(
Ok(Arc::new(new_join))
} else {
// TODO avoid adding ProjectionExec again and again, only adding Final Projection
// ADR: FIXME the projection inside the hash join functionality is not consistent
// see https://github.com/apache/datafusion/commit/afddb321e9a98ffc1947005c38b6b50a6ef2a401
// Failing to do the below code will create a projection exec with a projection that is
// possibly outside the schema.
let actual_projection = if new_join.projection.is_some() {
let tmp = remap_join_projections_join_to_output(
new_join.left().clone(),
new_join.right().clone(),
new_join.join_type(),
new_join.projection.clone(),
)?.unwrap();
project_index_to_exprs(
&tmp,
&new_join.schema()
)
} else {
swap_reverting_projection(&left.schema(), &right.schema())
};
// let swap_proj = swap_reverting_projection(&left.schema(), &right.schema());

let proj = ProjectionExec::try_new(
swap_reverting_projection(&left.schema(), &right.schema()),
actual_projection,
Arc::new(new_join),
)?;
Ok(Arc::new(proj))
Expand Down
23 changes: 3 additions & 20 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,12 @@ use datafusion_physical_expr::equivalence::{
join_equivalence_properties, ProjectionMapping,
};
use datafusion_physical_expr::expressions::UnKnownColumn;
use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
use datafusion_physical_expr::PhysicalExprRef;

use ahash::RandomState;
use futures::{ready, Stream, StreamExt, TryStreamExt};
use parking_lot::Mutex;
use crate::joins::utils::project_index_to_exprs;

type SharedBitmapBuilder = Mutex<BooleanBufferBuilder>;

Expand Down Expand Up @@ -610,25 +611,6 @@ impl DisplayAs for HashJoinExec {
}
}

fn project_index_to_exprs(
projection_index: &[usize],
schema: &SchemaRef,
) -> Vec<(Arc<dyn PhysicalExpr>, String)> {
projection_index
.iter()
.map(|index| {
let field = schema.field(*index);
(
Arc::new(datafusion_physical_expr::expressions::Column::new(
field.name(),
*index,
)) as Arc<dyn PhysicalExpr>,
field.name().to_owned(),
)
})
.collect::<Vec<_>>()
}

impl ExecutionPlan for HashJoinExec {
fn name(&self) -> &'static str {
"HashJoinExec"
Expand Down Expand Up @@ -1566,6 +1548,7 @@ mod tests {
use hashbrown::raw::RawTable;
use rstest::*;
use rstest_reuse::*;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;

fn div_ceil(a: usize, b: usize) -> usize {
(a + b - 1) / b
Expand Down
64 changes: 63 additions & 1 deletion datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ use arrow::datatypes::{Field, Schema, SchemaBuilder};
use arrow::record_batch::{RecordBatch, RecordBatchOptions};
use arrow_array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray};
use arrow_buffer::ArrowNativeType;
use arrow_schema::SchemaRef;
use datafusion_common::cast::as_boolean_array;
use datafusion_common::stats::Precision;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{
plan_err, DataFusionError, JoinSide, JoinType, Result, SharedResult,
};
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_physical_expr::equivalence::add_offset_to_expr;
use datafusion_physical_expr::equivalence::{add_offset_to_expr, ProjectionMapping};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::{collect_columns, merge_vectors};
use datafusion_physical_expr::{
Expand All @@ -55,6 +56,7 @@ use futures::future::{BoxFuture, Shared};
use futures::{ready, FutureExt};
use hashbrown::raw::RawTable;
use parking_lot::Mutex;
use crate::common::can_project;

/// Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value.
///
Expand Down Expand Up @@ -728,6 +730,66 @@ pub fn build_join_schema(
(fields.finish(), column_indices)
}

/// This assumes that the projections are relative to the join schema.
/// We need to redo them to point to the actual hash join output schema
pub fn remap_join_projections_join_to_output(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
join_type: &JoinType,
projection: Option<Vec<usize>>,
) -> datafusion_common::Result<Option<Vec<usize>>> {
match projection {
Some(ref projection) => {
let (join_schema, indices) = build_join_schema(
left.schema().as_ref(),
right.schema().as_ref(),
join_type
);

let join_schema = Arc::new(join_schema);
can_project(&join_schema, Some(projection.clone()).as_ref())?;

let projection_exprs = project_index_to_exprs(
&projection.clone(),
&join_schema
);
let projection_mapping =
ProjectionMapping::try_new(&projection_exprs, &join_schema)?;

// projection mapping contains from and to, get the second one
let dest_physical_exprs = projection_mapping.map.iter().map(|(f, t)| t.clone()).collect::<Vec<_>>();
let dest_columns = dest_physical_exprs.iter().map(|pe| pe.as_any().downcast_ref::<Column>()).collect::<Vec<_>>();
let output = dest_physical_exprs.iter().enumerate().map(|(idx, pe)| {
// :Vec<(Arc<dyn PhysicalExpr>, String)>
// (pe.clone(), dest_column.name().to_owned())
let dest_column = dest_columns.get(idx).unwrap().unwrap();
dest_column.index()
}).collect::<Vec<_>>();
Ok(Some(output))
},
None => Ok(None)
}
}

pub fn project_index_to_exprs(
projection_index: &[usize],
schema: &SchemaRef,
) -> Vec<(Arc<dyn PhysicalExpr>, String)> {
projection_index
.iter()
.map(|index| {
let field = schema.field(*index);
(
Arc::new(datafusion_physical_expr::expressions::Column::new(
field.name(),
*index,
)) as Arc<dyn PhysicalExpr>,
field.name().to_owned(),
)
})
.collect::<Vec<_>>()
}

/// A [`OnceAsync`] can be used to run an async closure once, with subsequent calls
/// to [`OnceAsync::once`] returning a [`OnceFut`] to the same asynchronous computation
///
Expand Down

0 comments on commit 3cea7ab

Please sign in to comment.