Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Improve HashJoinStream docstrings #8070

Merged
merged 4 commits into from
Nov 7, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 74 additions & 47 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ pub struct HashJoinExec {
pub join_type: JoinType,
/// The output schema for the join
schema: SchemaRef,
/// Build-side data
/// Future that consumes left input and builds the hash table
left_fut: OnceAsync<JoinLeftData>,
/// Shared the `RandomState` for the hashing algorithm
random_state: RandomState,
Expand Down Expand Up @@ -747,27 +747,38 @@ where
Ok(())
}

/// A stream that issues [RecordBatch]es as they arrive from the right of the join.
/// [`Stream`] for [`HashJoinExec`] that does the actual join.
///
/// This stream:
///
/// 1. Reads the entire left input (build) and constructs a hash table
///
/// 2. Streams [RecordBatch]es as they arrive from the right input (probe) and joins
/// them with the contents of the hash table
struct HashJoinStream {
/// Input schema
schema: Arc<Schema>,
/// columns from the left
/// equijoin columns from the left (build side)
on_left: Vec<Column>,
/// columns from the right used to compute the hash
/// equijoin columns from the right (probe side)
on_right: Vec<Column>,
/// join filter
/// optional join filter
filter: Option<JoinFilter>,
/// type of the join
/// type of the join (left, right, semi, etc)
join_type: JoinType,
/// future for data from left side
/// future which builds hash table from left side
left_fut: OnceFut<JoinLeftData>,
/// Keeps track of the left side rows whether they are visited
/// Which left (probe) side rows have been matches while creating output.
/// For some OUTER joins, we need to know which rows have not been matched
/// to produce the correct.
visited_left_side: Option<BooleanBufferBuilder>,
/// right
/// right (probe) input
right: SendableRecordBatchStream,
/// Random state used for hashing initialization
random_state: RandomState,
/// There is nothing to process anymore and left side is processed in case of left join
/// The join output is complete. For outer joins, this is used to
/// distinguish when the input stream is exhausted and when any unmatched
/// rows are output.
is_exhausted: bool,
/// Metrics
join_metrics: BuildProbeJoinMetrics,
Expand All @@ -785,37 +796,51 @@ impl RecordBatchStream for HashJoinStream {
}
}

// Returns build/probe indices satisfying the equality condition.
// On LEFT.b1 = RIGHT.b2
// LEFT Table:
// a1 b1 c1
// 1 1 10
// 3 3 30
// 5 5 50
// 7 7 70
// 9 8 90
// 11 8 110
// 13 10 130
// RIGHT Table:
// a2 b2 c2
// 2 2 20
// 4 4 40
// 6 6 60
// 8 8 80
// 10 10 100
// 12 10 120
// The result is
// "+----+----+-----+----+----+-----+",
// "| a1 | b1 | c1 | a2 | b2 | c2 |",
// "+----+----+-----+----+----+-----+",
// "| 9 | 8 | 90 | 8 | 8 | 80 |",
// "| 11 | 8 | 110 | 8 | 8 | 80 |",
// "| 13 | 10 | 130 | 10 | 10 | 100 |",
// "| 13 | 10 | 130 | 12 | 10 | 120 |",
// "+----+----+-----+----+----+-----+"
// And the result of build and probe indices are:
// Build indices: 4, 5, 6, 6
// Probe indices: 3, 3, 4, 5
/// Returns build/probe indices satisfying the equality condition.
///
/// # Example
///
/// For `LEFT.b1 = RIGHT.b2`:
/// LEFT Table:
/// ```text
/// a1 b1 c1
/// 1 1 10
/// 3 3 30
/// 5 5 50
/// 7 7 70
/// 9 8 90
/// 11 8 110
/// 13 10 130
/// ```
///
/// RIGHT Table:
/// ```text
/// a2 b2 c2
/// 2 2 20
/// 4 4 40
/// 6 6 60
/// 8 8 80
/// 10 10 100
/// 12 10 120
/// ```
///
/// The result is
/// ```text
/// "+----+----+-----+----+----+-----+",
/// "| a1 | b1 | c1 | a2 | b2 | c2 |",
/// "+----+----+-----+----+----+-----+",
/// "| 9 | 8 | 90 | 8 | 8 | 80 |",
/// "| 11 | 8 | 110 | 8 | 8 | 80 |",
/// "| 13 | 10 | 130 | 10 | 10 | 100 |",
/// "| 13 | 10 | 130 | 12 | 10 | 120 |",
/// "+----+----+-----+----+----+-----+"
/// ```
///
/// And the result of build and probe indices are:
/// ```text
/// Build indices: 4, 5, 6, 6
/// Probe indices: 3, 3, 4, 5
/// ```
#[allow(clippy::too_many_arguments)]
pub fn build_equal_condition_join_indices<T: JoinHashMapType>(
build_hashmap: &T,
Expand Down Expand Up @@ -1003,13 +1028,14 @@ impl HashJoinStream {
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
let build_timer = self.join_metrics.build_time.timer();
// build hash table from left (build) side, if not yet done
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that looks great @alamb
Thanks for leaving comments where is build and where is a probe.

let left_data = match ready!(self.left_fut.get(cx)) {
Ok(left_data) => left_data,
Err(e) => return Poll::Ready(Some(Err(e))),
};
build_timer.done();

// Reserving memory for visited_left_side bitmap in case it hasn't been initialied yet
// Reserving memory for visited_left_side bitmap in case it hasn't been initialized yet
// and join_type requires to store it
if self.visited_left_side.is_none()
&& need_produce_result_in_final(self.join_type)
Expand All @@ -1024,11 +1050,11 @@ impl HashJoinStream {
let visited_left_side = self.visited_left_side.get_or_insert_with(|| {
let num_rows = left_data.1.num_rows();
if need_produce_result_in_final(self.join_type) {
// these join type need the bitmap to identify which row has be matched or unmatched.
// For the `left semi` join, need to use the bitmap to produce the matched row in the left side
// For the `left` join, need to use the bitmap to produce the unmatched row in the left side with null
// For the `left anti` join, need to use the bitmap to produce the unmatched row in the left side
// For the `full` join, need to use the bitmap to produce the unmatched row in the left side with null
// Some join types need to track which row has be matched or unmatched:
// `left semi` join: need to use the bitmap to produce the matched row in the left side
// `left` join: need to use the bitmap to produce the unmatched row in the left side with null
// `left anti` join: need to use the bitmap to produce the unmatched row in the left side
// `full` join: need to use the bitmap to produce the unmatched row in the left side with null
let mut buffer = BooleanBufferBuilder::new(num_rows);
buffer.append_n(num_rows, false);
buffer
Expand All @@ -1037,6 +1063,7 @@ impl HashJoinStream {
}
});
let mut hashes_buffer = vec![];
// get next right (probe) input batch
self.right
.poll_next_unpin(cx)
.map(|maybe_batch| match maybe_batch {
Expand Down