From a2773a357e596c6286f5e63d103a7479d966b0a3 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 2 May 2021 12:55:48 +0200 Subject: [PATCH 01/10] Fix left join unmatched rows --- datafusion/src/physical_plan/hash_join.rs | 103 ++++++++++++++++----- datafusion/src/physical_plan/hash_utils.rs | 2 +- datafusion/tests/sql.rs | 2 +- 3 files changed, 80 insertions(+), 27 deletions(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 8e6b0428c041..3ab0ee76af5b 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -27,13 +27,13 @@ use arrow::{ TimestampMicrosecondArray, TimestampNanosecondArray, UInt32BufferBuilder, UInt32Builder, UInt64BufferBuilder, UInt64Builder, }, - compute, + compute::{self, take}, datatypes::{TimeUnit, UInt32Type, UInt64Type}, }; use smallvec::{smallvec, SmallVec}; -use std::time::Instant; -use std::{any::Any, collections::HashSet}; +use std::{any::Any, usize}; use std::{hash::Hasher, sync::Arc}; +use std::{time::Instant, vec}; use async_trait::async_trait; use futures::{Stream, StreamExt, TryStreamExt}; @@ -370,6 +370,12 @@ impl ExecutionPlan for HashJoinExec { let on_right = self.on.iter().map(|on| on.1.clone()).collect::>(); let column_indices = self.column_indices_from_schema()?; + let num_rows = left_data.1.num_rows(); + let visited_left_side = if self.join_type == JoinType::Left { + vec![false; num_rows] + } else { + vec![] + }; Ok(Box::pin(HashJoinStream { schema: self.schema.clone(), on_left, @@ -384,6 +390,8 @@ impl ExecutionPlan for HashJoinExec { num_output_rows: 0, join_time: 0, random_state: self.random_state.clone(), + visited_left_side: visited_left_side, + is_exhausted: false, })) } } @@ -453,6 +461,11 @@ struct HashJoinStream { join_time: usize, /// Random state used for hashing initialization random_state: RandomState, + /// Keeps track of the left side rows whether they are visited + /// TODO: use a more memory efficient data structure + visited_left_side: Vec, + /// There is nothing to process anymore and left side is proced in case of left join + is_exhausted: bool, } impl RecordBatchStream for HashJoinStream { @@ -473,7 +486,7 @@ fn build_batch_from_indices( left_indices: UInt64Array, right_indices: UInt32Array, column_indices: &[ColumnIndex], -) -> ArrowResult { +) -> ArrowResult<(RecordBatch, UInt64Array)> { // build the columns of the new [RecordBatch]: // 1. pick whether the column is from the left or right // 2. based on the pick, `take` items from the different RecordBatches @@ -489,7 +502,7 @@ fn build_batch_from_indices( }; columns.push(array); } - RecordBatch::try_new(Arc::new(schema.clone()), columns) + RecordBatch::try_new(Arc::new(schema.clone()), columns).map(|x| (x, left_indices)) } #[allow(clippy::too_many_arguments)] @@ -502,7 +515,7 @@ fn build_batch( schema: &Schema, column_indices: &[ColumnIndex], random_state: &RandomState, -) -> ArrowResult { +) -> ArrowResult<(RecordBatch, UInt64Array)> { let (left_indices, right_indices) = build_join_indexes( &left_data, &batch, @@ -617,13 +630,6 @@ fn build_join_indexes( let mut left_indices = UInt64Builder::new(0); let mut right_indices = UInt32Builder::new(0); - // Keep track of which item is visited in the build input - // TODO: this can be stored more efficiently with a marker - // https://issues.apache.org/jira/browse/ARROW-11116 - // TODO: Fix LEFT join with multiple right batches - // https://issues.apache.org/jira/browse/ARROW-10971 - let mut is_visited = HashSet::new(); - // First visit all of the rows for (row, hash_value) in hash_values.iter().enumerate() { if let Some((_, indices)) = @@ -634,20 +640,10 @@ fn build_join_indexes( if equal_rows(i as usize, row, &left_join_values, &keys_values)? { left_indices.append_value(i)?; right_indices.append_value(row as u32)?; - is_visited.insert(i); } } }; } - // Add the remaining left rows to the result set with None on the right side - for (_, indices) in left { - for i in indices.iter() { - if !is_visited.contains(i) { - left_indices.append_slice(&indices)?; - right_indices.append_null()?; - } - } - } Ok((left_indices.finish(), right_indices.finish())) } JoinType::Right => { @@ -1001,6 +997,35 @@ pub fn create_hashes<'a>( Ok(hashes_buffer) } +fn produce_unmatched( + visited_left_side: &[bool], + schema: &SchemaRef, + column_indices: &[ColumnIndex], + left_data: &JoinLeftData, +) -> ArrowResult { + let unmatched_indices: Vec = visited_left_side + .iter() + .enumerate() + .filter(|&(_, &value)| !value) + .map(|(index, _)| index as u64) + .collect(); + let indices = UInt64Array::from_iter_values(unmatched_indices); + let num_rows = indices.len(); + let mut columns: Vec> = Vec::with_capacity(schema.fields().len()); + for (idx, column_index) in column_indices.iter().enumerate() { + let array = if column_index.is_left { + let array = left_data.1.column(column_index.index); + take(array.as_ref(), &indices, None).unwrap() + } else { + let datatype = schema.field(idx).data_type(); + arrow::array::new_null_array(datatype, num_rows) + }; + + columns.push(array); + } + RecordBatch::try_new(schema.clone(), columns) +} + impl Stream for HashJoinStream { type Item = ArrowResult; @@ -1025,14 +1050,42 @@ impl Stream for HashJoinStream { ); self.num_input_batches += 1; self.num_input_rows += batch.num_rows(); - if let Ok(ref batch) = result { + if let Ok((ref batch, ref left_side)) = result { self.join_time += start.elapsed().as_millis() as usize; self.num_output_batches += 1; self.num_output_rows += batch.num_rows(); + + if self.join_type == JoinType::Left { + left_side.iter().flatten().for_each(|x| { + self.visited_left_side[x as usize] = true; + }); + } } - Some(result) + Some(result.map(|x| x.0)) } other => { + let start = Instant::now(); + // For the left join, produce rows for unmatched rows + if self.join_type == JoinType::Left && !self.is_exhausted { + let result = produce_unmatched( + &self.visited_left_side, + &self.schema, + &self.column_indices, + &self.left_data, + ); + if let Ok(ref batch) = result { + self.num_input_batches += 1; + self.num_input_rows += batch.num_rows(); + if let Ok(ref batch) = result { + self.join_time += start.elapsed().as_millis() as usize; + self.num_output_batches += 1; + self.num_output_rows += batch.num_rows(); + } + } + self.is_exhausted = true; + return Some(result); + } + debug!( "Processed {} probe-side input batches containing {} rows and \ produced {} output batches containing {} rows in {} ms", diff --git a/datafusion/src/physical_plan/hash_utils.rs b/datafusion/src/physical_plan/hash_utils.rs index a38cc092123d..54da1249e5c5 100644 --- a/datafusion/src/physical_plan/hash_utils.rs +++ b/datafusion/src/physical_plan/hash_utils.rs @@ -22,7 +22,7 @@ use arrow::datatypes::{Field, Schema}; use std::collections::HashSet; /// All valid types of joins. -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum JoinType { /// Inner join Inner, diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 716929405c3a..39f75e7a6601 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -31,7 +31,7 @@ use arrow::{ util::display::array_value_to_string, }; -use datafusion::execution::context::ExecutionContext; +use datafusion::{execution::context::ExecutionContext, prelude::ExecutionConfig}; use datafusion::logical_plan::LogicalPlan; use datafusion::prelude::create_udf; use datafusion::{ From 3d7a01b2535164892fcbdec4d4f18f3e45bf4788 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 2 May 2021 13:07:50 +0200 Subject: [PATCH 02/10] Add test for multiple batches --- datafusion/src/physical_plan/hash_join.rs | 50 +++++++++++++++++++++++ datafusion/tests/sql.rs | 2 +- 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 3ab0ee76af5b..2477c46deef4 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -1352,6 +1352,56 @@ mod tests { Ok(()) } + fn build_table_two_batches( + a: (&str, &Vec), + b: (&str, &Vec), + c: (&str, &Vec), + ) -> Arc { + let batch = build_table_i32(a, b, c); + let schema = batch.schema(); + Arc::new( + MemoryExec::try_new(&vec![vec![batch.clone(), batch.clone()]], schema, None) + .unwrap(), + ) + } + + #[tokio::test] + async fn join_left_multi_batch() { + let left = build_table( + ("a1", &vec![1, 2, 3]), + ("b1", &vec![4, 5, 7]), // 7 does not exist on the right + ("c1", &vec![7, 8, 9]), + ); + let right = build_table_two_batches( + ("a2", &vec![10, 20, 30]), + ("b1", &vec![4, 5, 6]), + ("c2", &vec![70, 80, 90]), + ); + let on = &[("b1", "b1")]; + + let join = join(left, right, on, &JoinType::Left).unwrap(); + + let columns = columns(&join.schema()); + assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "c2"]); + + let stream = join.execute(0).await.unwrap(); + let batches = common::collect(stream).await.unwrap(); + + let expected = vec![ + "+----+----+----+----+----+", + "| a1 | b1 | c1 | a2 | c2 |", + "+----+----+----+----+----+", + "| 1 | 4 | 7 | 10 | 70 |", + "| 1 | 4 | 7 | 10 | 70 |", + "| 2 | 5 | 8 | 20 | 80 |", + "| 2 | 5 | 8 | 20 | 80 |", + "| 3 | 7 | 9 | | |", + "+----+----+----+----+----+", + ]; + + assert_batches_sorted_eq!(expected, &batches); + } + #[tokio::test] async fn join_left_one() -> Result<()> { let left = build_table( diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 39f75e7a6601..a0876a22950f 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -31,7 +31,6 @@ use arrow::{ util::display::array_value_to_string, }; -use datafusion::{execution::context::ExecutionContext, prelude::ExecutionConfig}; use datafusion::logical_plan::LogicalPlan; use datafusion::prelude::create_udf; use datafusion::{ @@ -42,6 +41,7 @@ use datafusion::{ error::{DataFusionError, Result}, physical_plan::ColumnarValue, }; +use datafusion::{execution::context::ExecutionContext, prelude::ExecutionConfig}; #[tokio::test] async fn nyc() -> Result<()> { From 62e6f256c36a9dfd34bb42e861c44f8d06ced9d0 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 2 May 2021 13:32:56 +0200 Subject: [PATCH 03/10] Clippy --- datafusion/src/physical_plan/hash_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 2477c46deef4..c5074490d759 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -1360,7 +1360,7 @@ mod tests { let batch = build_table_i32(a, b, c); let schema = batch.schema(); Arc::new( - MemoryExec::try_new(&vec![vec![batch.clone(), batch.clone()]], schema, None) + MemoryExec::try_new(&[vec![batch.clone(), batch]], schema, None) .unwrap(), ) } From 141c708c106e44f976c237ff507968ec6db9d810 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 2 May 2021 13:34:58 +0200 Subject: [PATCH 04/10] Import cleanup --- datafusion/src/physical_plan/hash_join.rs | 4 ++-- datafusion/tests/sql.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index c5074490d759..37bd7562868d 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -27,7 +27,7 @@ use arrow::{ TimestampMicrosecondArray, TimestampNanosecondArray, UInt32BufferBuilder, UInt32Builder, UInt64BufferBuilder, UInt64Builder, }, - compute::{self, take}, + compute, datatypes::{TimeUnit, UInt32Type, UInt64Type}, }; use smallvec::{smallvec, SmallVec}; @@ -1015,7 +1015,7 @@ fn produce_unmatched( for (idx, column_index) in column_indices.iter().enumerate() { let array = if column_index.is_left { let array = left_data.1.column(column_index.index); - take(array.as_ref(), &indices, None).unwrap() + compute::take(array.as_ref(), &indices, None).unwrap() } else { let datatype = schema.field(idx).data_type(); arrow::array::new_null_array(datatype, num_rows) diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index a0876a22950f..716929405c3a 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -31,6 +31,7 @@ use arrow::{ util::display::array_value_to_string, }; +use datafusion::execution::context::ExecutionContext; use datafusion::logical_plan::LogicalPlan; use datafusion::prelude::create_udf; use datafusion::{ @@ -41,7 +42,6 @@ use datafusion::{ error::{DataFusionError, Result}, physical_plan::ColumnarValue, }; -use datafusion::{execution::context::ExecutionContext, prelude::ExecutionConfig}; #[tokio::test] async fn nyc() -> Result<()> { From 660d91d413b18db397706195d46a0d260cf64a1e Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 2 May 2021 13:54:03 +0200 Subject: [PATCH 05/10] Add test for empty right side --- datafusion/src/physical_plan/hash_join.rs | 37 +++++++++++++++++++++-- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 37bd7562868d..e721842ed4b4 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -464,7 +464,7 @@ struct HashJoinStream { /// Keeps track of the left side rows whether they are visited /// TODO: use a more memory efficient data structure visited_left_side: Vec, - /// There is nothing to process anymore and left side is proced in case of left join + /// There is nothing to process anymore and left side is processed in case of left join is_exhausted: bool, } @@ -1360,8 +1360,7 @@ mod tests { let batch = build_table_i32(a, b, c); let schema = batch.schema(); Arc::new( - MemoryExec::try_new(&[vec![batch.clone(), batch]], schema, None) - .unwrap(), + MemoryExec::try_new(&[vec![batch.clone(), batch]], schema, None).unwrap(), ) } @@ -1402,6 +1401,38 @@ mod tests { assert_batches_sorted_eq!(expected, &batches); } + #[tokio::test] + async fn join_left_empty_right() { + let left = build_table( + ("a1", &vec![1, 2, 3]), + ("b1", &vec![4, 5, 7]), // 7 does not exist on the right + ("c1", &vec![7, 8, 9]), + ); + let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![])); + let on = &[("b1", "b1")]; + let schema = right.schema(); + let right = Arc::new(MemoryExec::try_new(&[vec![right]], schema, None).unwrap()); + let join = join(left, right, on, &JoinType::Left).unwrap(); + + let columns = columns(&join.schema()); + assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "c2"]); + + let stream = join.execute(0).await.unwrap(); + let batches = common::collect(stream).await.unwrap(); + + let expected = vec![ + "+----+----+----+----+----+", + "| a1 | b1 | c1 | a2 | c2 |", + "+----+----+----+----+----+", + "| 1 | 4 | 7 | | |", + "| 2 | 5 | 8 | | |", + "| 3 | 7 | 9 | | |", + "+----+----+----+----+----+", + ]; + + assert_batches_sorted_eq!(expected, &batches); + } + #[tokio::test] async fn join_left_one() -> Result<()> { let left = build_table( From 30ee9368c215aea53e1e1a9890a09d704b0dcf1d Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 2 May 2021 14:04:09 +0200 Subject: [PATCH 06/10] Add some comments --- datafusion/src/physical_plan/hash_join.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index e721842ed4b4..8b75bc8e4226 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -997,18 +997,22 @@ pub fn create_hashes<'a>( Ok(hashes_buffer) } +// Produces a batch for left-side rows that are not marked as being visited during the whole join fn produce_unmatched( visited_left_side: &[bool], schema: &SchemaRef, column_indices: &[ColumnIndex], left_data: &JoinLeftData, ) -> ArrowResult { + // Find indices which didn't match any right row (are false) let unmatched_indices: Vec = visited_left_side .iter() .enumerate() .filter(|&(_, &value)| !value) .map(|(index, _)| index as u64) .collect(); + + // generate batches by taking values from the left side and generating columns filled with null on the right side let indices = UInt64Array::from_iter_values(unmatched_indices); let num_rows = indices.len(); let mut columns: Vec> = Vec::with_capacity(schema.fields().len()); From 144082b0991df1e85d8f3473149c61d92ee36485 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 2 May 2021 14:05:07 +0200 Subject: [PATCH 07/10] Fix comment --- datafusion/src/physical_plan/hash_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 8b75bc8e4226..d2e484ecf119 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -1372,7 +1372,7 @@ mod tests { async fn join_left_multi_batch() { let left = build_table( ("a1", &vec![1, 2, 3]), - ("b1", &vec![4, 5, 7]), // 7 does not exist on the right + ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]), ); let right = build_table_two_batches( From 638e6049fc584c3e817a8912d0c505221d5aa3ef Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 2 May 2021 14:05:24 +0200 Subject: [PATCH 08/10] Fix comment --- datafusion/src/physical_plan/hash_join.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index d2e484ecf119..4c1cf7303afb 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -1372,7 +1372,7 @@ mod tests { async fn join_left_multi_batch() { let left = build_table( ("a1", &vec![1, 2, 3]), - ("b1", &vec![4, 5, 7]), + ("b1", &vec![4, 5, 7]), // 7 does not exist on the right ("c1", &vec![7, 8, 9]), ); let right = build_table_two_batches( @@ -1409,7 +1409,7 @@ mod tests { async fn join_left_empty_right() { let left = build_table( ("a1", &vec![1, 2, 3]), - ("b1", &vec![4, 5, 7]), // 7 does not exist on the right + ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]), ); let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![])); From 2b4fe02635df0a68264952c7f59e0f7dc998dc55 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 2 May 2021 14:51:42 +0200 Subject: [PATCH 09/10] Link to GH issue --- datafusion/src/physical_plan/hash_join.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 4c1cf7303afb..7e8e46e9a7f9 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -462,8 +462,7 @@ struct HashJoinStream { /// Random state used for hashing initialization random_state: RandomState, /// Keeps track of the left side rows whether they are visited - /// TODO: use a more memory efficient data structure - visited_left_side: Vec, + visited_left_side: Vec, // TODO: use a more memory efficient data structure, https://github.com/apache/arrow-datafusion/issues/240 /// There is nothing to process anymore and left side is processed in case of left join is_exhausted: bool, } From ab760af0ceee524b4a615e514da75701b959501e Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 2 May 2021 22:52:22 +0200 Subject: [PATCH 10/10] Use explicity pattern match --- datafusion/src/physical_plan/hash_join.rs | 52 +++++++++++++---------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 7e8e46e9a7f9..3398494e3c46 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -371,10 +371,9 @@ impl ExecutionPlan for HashJoinExec { let column_indices = self.column_indices_from_schema()?; let num_rows = left_data.1.num_rows(); - let visited_left_side = if self.join_type == JoinType::Left { - vec![false; num_rows] - } else { - vec![] + let visited_left_side = match self.join_type { + JoinType::Left => vec![false; num_rows], + JoinType::Inner | JoinType::Right => vec![], }; Ok(Box::pin(HashJoinStream { schema: self.schema.clone(), @@ -1058,10 +1057,13 @@ impl Stream for HashJoinStream { self.num_output_batches += 1; self.num_output_rows += batch.num_rows(); - if self.join_type == JoinType::Left { - left_side.iter().flatten().for_each(|x| { - self.visited_left_side[x as usize] = true; - }); + match self.join_type { + JoinType::Left => { + left_side.iter().flatten().for_each(|x| { + self.visited_left_side[x as usize] = true; + }); + } + JoinType::Inner | JoinType::Right => {} } } Some(result.map(|x| x.0)) @@ -1069,24 +1071,28 @@ impl Stream for HashJoinStream { other => { let start = Instant::now(); // For the left join, produce rows for unmatched rows - if self.join_type == JoinType::Left && !self.is_exhausted { - let result = produce_unmatched( - &self.visited_left_side, - &self.schema, - &self.column_indices, - &self.left_data, - ); - if let Ok(ref batch) = result { - self.num_input_batches += 1; - self.num_input_rows += batch.num_rows(); + match self.join_type { + JoinType::Left if !self.is_exhausted => { + let result = produce_unmatched( + &self.visited_left_side, + &self.schema, + &self.column_indices, + &self.left_data, + ); if let Ok(ref batch) = result { - self.join_time += start.elapsed().as_millis() as usize; - self.num_output_batches += 1; - self.num_output_rows += batch.num_rows(); + self.num_input_batches += 1; + self.num_input_rows += batch.num_rows(); + if let Ok(ref batch) = result { + self.join_time += + start.elapsed().as_millis() as usize; + self.num_output_batches += 1; + self.num_output_rows += batch.num_rows(); + } } + self.is_exhausted = true; + return Some(result); } - self.is_exhausted = true; - return Some(result); + JoinType::Left | JoinType::Inner | JoinType::Right => {} } debug!(