diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 0e2d552b543b..1a2db87d98a2 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -206,7 +206,7 @@ pub struct HashJoinExec { pub join_type: JoinType, /// The output schema for the join schema: SchemaRef, - /// Build-side data + /// Future that consumes left input and builds the hash table left_fut: OnceAsync, /// Shared the `RandomState` for the hashing algorithm random_state: RandomState, @@ -747,27 +747,38 @@ where Ok(()) } -/// A stream that issues [RecordBatch]es as they arrive from the right of the join. +/// [`Stream`] for [`HashJoinExec`] that does the actual join. +/// +/// This stream: +/// +/// 1. Reads the entire left input (build) and constructs a hash table +/// +/// 2. Streams [RecordBatch]es as they arrive from the right input (probe) and joins +/// them with the contents of the hash table struct HashJoinStream { /// Input schema schema: Arc, - /// columns from the left + /// equijoin columns from the left (build side) on_left: Vec, - /// columns from the right used to compute the hash + /// equijoin columns from the right (probe side) on_right: Vec, - /// join filter + /// optional join filter filter: Option, - /// type of the join + /// type of the join (left, right, semi, etc) join_type: JoinType, - /// future for data from left side + /// future which builds hash table from left side left_fut: OnceFut, - /// Keeps track of the left side rows whether they are visited + /// Which left (probe) side rows have been matches while creating output. + /// For some OUTER joins, we need to know which rows have not been matched + /// to produce the correct. visited_left_side: Option, - /// right + /// right (probe) input right: SendableRecordBatchStream, /// Random state used for hashing initialization random_state: RandomState, - /// There is nothing to process anymore and left side is processed in case of left join + /// The join output is complete. For outer joins, this is used to + /// distinguish when the input stream is exhausted and when any unmatched + /// rows are output. is_exhausted: bool, /// Metrics join_metrics: BuildProbeJoinMetrics, @@ -785,37 +796,51 @@ impl RecordBatchStream for HashJoinStream { } } -// Returns build/probe indices satisfying the equality condition. -// On LEFT.b1 = RIGHT.b2 -// LEFT Table: -// a1 b1 c1 -// 1 1 10 -// 3 3 30 -// 5 5 50 -// 7 7 70 -// 9 8 90 -// 11 8 110 -// 13 10 130 -// RIGHT Table: -// a2 b2 c2 -// 2 2 20 -// 4 4 40 -// 6 6 60 -// 8 8 80 -// 10 10 100 -// 12 10 120 -// The result is -// "+----+----+-----+----+----+-----+", -// "| a1 | b1 | c1 | a2 | b2 | c2 |", -// "+----+----+-----+----+----+-----+", -// "| 9 | 8 | 90 | 8 | 8 | 80 |", -// "| 11 | 8 | 110 | 8 | 8 | 80 |", -// "| 13 | 10 | 130 | 10 | 10 | 100 |", -// "| 13 | 10 | 130 | 12 | 10 | 120 |", -// "+----+----+-----+----+----+-----+" -// And the result of build and probe indices are: -// Build indices: 4, 5, 6, 6 -// Probe indices: 3, 3, 4, 5 +/// Returns build/probe indices satisfying the equality condition. +/// +/// # Example +/// +/// For `LEFT.b1 = RIGHT.b2`: +/// LEFT Table: +/// ```text +/// a1 b1 c1 +/// 1 1 10 +/// 3 3 30 +/// 5 5 50 +/// 7 7 70 +/// 9 8 90 +/// 11 8 110 +/// 13 10 130 +/// ``` +/// +/// RIGHT Table: +/// ```text +/// a2 b2 c2 +/// 2 2 20 +/// 4 4 40 +/// 6 6 60 +/// 8 8 80 +/// 10 10 100 +/// 12 10 120 +/// ``` +/// +/// The result is +/// ```text +/// "+----+----+-----+----+----+-----+", +/// "| a1 | b1 | c1 | a2 | b2 | c2 |", +/// "+----+----+-----+----+----+-----+", +/// "| 9 | 8 | 90 | 8 | 8 | 80 |", +/// "| 11 | 8 | 110 | 8 | 8 | 80 |", +/// "| 13 | 10 | 130 | 10 | 10 | 100 |", +/// "| 13 | 10 | 130 | 12 | 10 | 120 |", +/// "+----+----+-----+----+----+-----+" +/// ``` +/// +/// And the result of build and probe indices are: +/// ```text +/// Build indices: 4, 5, 6, 6 +/// Probe indices: 3, 3, 4, 5 +/// ``` #[allow(clippy::too_many_arguments)] pub fn build_equal_condition_join_indices( build_hashmap: &T, @@ -1003,13 +1028,14 @@ impl HashJoinStream { cx: &mut std::task::Context<'_>, ) -> Poll>> { let build_timer = self.join_metrics.build_time.timer(); + // build hash table from left (build) side, if not yet done let left_data = match ready!(self.left_fut.get(cx)) { Ok(left_data) => left_data, Err(e) => return Poll::Ready(Some(Err(e))), }; build_timer.done(); - // Reserving memory for visited_left_side bitmap in case it hasn't been initialied yet + // Reserving memory for visited_left_side bitmap in case it hasn't been initialized yet // and join_type requires to store it if self.visited_left_side.is_none() && need_produce_result_in_final(self.join_type) @@ -1024,11 +1050,11 @@ impl HashJoinStream { let visited_left_side = self.visited_left_side.get_or_insert_with(|| { let num_rows = left_data.1.num_rows(); if need_produce_result_in_final(self.join_type) { - // these join type need the bitmap to identify which row has be matched or unmatched. - // For the `left semi` join, need to use the bitmap to produce the matched row in the left side - // For the `left` join, need to use the bitmap to produce the unmatched row in the left side with null - // For the `left anti` join, need to use the bitmap to produce the unmatched row in the left side - // For the `full` join, need to use the bitmap to produce the unmatched row in the left side with null + // Some join types need to track which row has be matched or unmatched: + // `left semi` join: need to use the bitmap to produce the matched row in the left side + // `left` join: need to use the bitmap to produce the unmatched row in the left side with null + // `left anti` join: need to use the bitmap to produce the unmatched row in the left side + // `full` join: need to use the bitmap to produce the unmatched row in the left side with null let mut buffer = BooleanBufferBuilder::new(num_rows); buffer.append_n(num_rows, false); buffer @@ -1037,6 +1063,7 @@ impl HashJoinStream { } }); let mut hashes_buffer = vec![]; + // get next right (probe) input batch self.right .poll_next_unpin(cx) .map(|maybe_batch| match maybe_batch {