From 592245013d19329d16b15249c4df19e3b649a1ea Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 9 Aug 2021 18:00:57 +0200 Subject: [PATCH 1/5] Fix right, full join handling --- datafusion/src/physical_plan/hash_join.rs | 22 ++++++++++------------ datafusion/tests/sql.rs | 6 ------ 2 files changed, 10 insertions(+), 18 deletions(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index fa75437e3fea..ec2b812a2949 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -22,8 +22,9 @@ use ahash::RandomState; use arrow::{ array::{ - ArrayData, ArrayRef, BooleanArray, LargeStringArray, PrimitiveArray, - UInt32BufferBuilder, UInt32Builder, UInt64BufferBuilder, UInt64Builder, + ArrayBuilder, ArrayData, ArrayRef, BooleanArray, LargeStringArray, + PrimitiveArray, UInt32BufferBuilder, UInt32Builder, UInt64BufferBuilder, + UInt64Builder, }, compute, datatypes::{UInt32Type, UInt64Type}, @@ -737,6 +738,7 @@ fn build_join_indexes( for (row, hash_value) in hash_values.iter().enumerate() { match left.0.get(*hash_value, |(hash, _)| *hash_value == *hash) { Some((_, indices)) => { + let mut no_match = true; for &i in indices { if equal_rows( i as usize, @@ -745,9 +747,12 @@ fn build_join_indexes( &keys_values, )? { left_indices.append_value(i)?; - } else { - left_indices.append_null()?; + right_indices.append_value(row as u32)?; + no_match = false; } + } + if no_match { + left_indices.append_null()?; right_indices.append_value(row as u32)?; } } @@ -768,7 +773,7 @@ macro_rules! equal_rows_elem { let left_array = $l.as_any().downcast_ref::<$array_type>().unwrap(); let right_array = $r.as_any().downcast_ref::<$array_type>().unwrap(); - match (left_array.is_null($left), left_array.is_null($right)) { + match (left_array.is_null($left), right_array.is_null($right)) { (false, false) => left_array.value($left) == right_array.value($right), _ => false, } @@ -1373,7 +1378,6 @@ mod tests { #[tokio::test] // Disable until https://github.com/apache/arrow-datafusion/issues/843 fixed - #[cfg(not(feature = "force_hash_collisions"))] async fn join_full_multi_batch() { let left = build_table( ("a1", &vec![1, 2, 3]), @@ -1639,8 +1643,6 @@ mod tests { } #[tokio::test] - // Disable until https://github.com/apache/arrow-datafusion/issues/843 fixed - #[cfg(not(feature = "force_hash_collisions"))] async fn join_right_one() -> Result<()> { let left = build_table( ("a1", &vec![1, 2, 3]), @@ -1677,8 +1679,6 @@ mod tests { } #[tokio::test] - // Disable until https://github.com/apache/arrow-datafusion/issues/843 fixed - #[cfg(not(feature = "force_hash_collisions"))] async fn partitioned_join_right_one() -> Result<()> { let left = build_table( ("a1", &vec![1, 2, 3]), @@ -1716,8 +1716,6 @@ mod tests { } #[tokio::test] - // Disable until https://github.com/apache/arrow-datafusion/issues/843 fixed - #[cfg(not(feature = "force_hash_collisions"))] async fn join_full_one() -> Result<()> { let left = build_table( ("a1", &vec![1, 2, 3]), diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 046e4f28ec42..0c33bd477266 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -1797,8 +1797,6 @@ async fn equijoin_left_and_condition_from_right() -> Result<()> { } #[tokio::test] -// Disable until https://github.com/apache/arrow-datafusion/issues/843 fixed -#[cfg(not(feature = "force_hash_collisions"))] async fn equijoin_right_and_condition_from_left() -> Result<()> { let mut ctx = create_join_context("t1_id", "t2_id")?; let sql = @@ -1852,8 +1850,6 @@ async fn left_join() -> Result<()> { } #[tokio::test] -// Disable until https://github.com/apache/arrow-datafusion/issues/843 fixed -#[cfg(not(feature = "force_hash_collisions"))] async fn right_join() -> Result<()> { let mut ctx = create_join_context("t1_id", "t2_id")?; let equivalent_sql = [ @@ -1874,8 +1870,6 @@ async fn right_join() -> Result<()> { } #[tokio::test] -// Disable until https://github.com/apache/arrow-datafusion/issues/843 fixed -#[cfg(not(feature = "force_hash_collisions"))] async fn full_join() -> Result<()> { let mut ctx = create_join_context("t1_id", "t2_id")?; let equivalent_sql = [ From 0364b5fd61d2a77ee2925fc312ff872ac17d3344 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 9 Aug 2021 18:08:20 +0200 Subject: [PATCH 2/5] Remove comment --- datafusion/src/physical_plan/hash_join.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index ec2b812a2949..28c0724109c9 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -1377,7 +1377,6 @@ mod tests { } #[tokio::test] - // Disable until https://github.com/apache/arrow-datafusion/issues/843 fixed async fn join_full_multi_batch() { let left = build_table( ("a1", &vec![1, 2, 3]), From 83b00005bb91519c3b3d39c8e813399c5075cb21 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 9 Aug 2021 18:21:19 +0200 Subject: [PATCH 3/5] Clippy --- datafusion/src/physical_plan/hash_join.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 28c0724109c9..d0ac87705f82 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -22,9 +22,8 @@ use ahash::RandomState; use arrow::{ array::{ - ArrayBuilder, ArrayData, ArrayRef, BooleanArray, LargeStringArray, - PrimitiveArray, UInt32BufferBuilder, UInt32Builder, UInt64BufferBuilder, - UInt64Builder, + ArrayData, ArrayRef, BooleanArray, LargeStringArray, PrimitiveArray, + UInt32BufferBuilder, UInt32Builder, UInt64BufferBuilder, UInt64Builder, }, compute, datatypes::{UInt32Type, UInt64Type}, From 33a6f5ae91f60889a805e2e66318abc0da60e8c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 9 Aug 2021 18:30:23 +0200 Subject: [PATCH 4/5] Update datafusion/src/physical_plan/hash_join.rs Co-authored-by: Andrew Lamb --- datafusion/src/physical_plan/hash_join.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index d0ac87705f82..3ac1cdc2613f 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -750,6 +750,8 @@ fn build_join_indexes( no_match = false; } } + // If no rows matched left, still must keep the right + // with all nulls for left if no_match { left_indices.append_null()?; right_indices.append_value(row as u32)?; From b7f3792dec377aadb26ca79b6f12cffd8a2d2dac Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 9 Aug 2021 19:44:42 +0200 Subject: [PATCH 5/5] Fmt --- datafusion/src/physical_plan/hash_join.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 3ac1cdc2613f..99708249fc6a 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -750,8 +750,8 @@ fn build_join_indexes( no_match = false; } } - // If no rows matched left, still must keep the right - // with all nulls for left + // If no rows matched left, still must keep the right + // with all nulls for left if no_match { left_indices.append_null()?; right_indices.append_value(row as u32)?;