Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Improve HashJoinExec documentation #7953

Merged
merged 2 commits into from
Oct 29, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 126 additions & 12 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Defines the join plan for executing partitions in parallel and then joining the results
//! into a set of partitions.
//! [`HashJoinExec`] Partitioned Hash Join Operator

use std::fmt;
use std::mem::size_of;
Expand Down Expand Up @@ -78,29 +77,140 @@ use futures::{ready, Stream, StreamExt, TryStreamExt};

type JoinLeftData = (JoinHashMap, RecordBatch, MemoryReservation);

/// Join execution plan executes partitions in parallel and combines them into a set of
/// partitions.
/// Join execution plan: Evaluates eqijoin predicates in parallel on multiple
/// partitions using a hash table and an optional filter list to apply post
/// join.
///
/// Filter expression expected to contain non-equality predicates that can not be pushed
/// down to any of join inputs.
/// In case of outer join, filter applied to only matched rows.
/// # Join Expressions
///
/// This implementation is optimized for evaluating eqijoin predicates (
/// `<col1> = <col2>`) expressions, which are represented as a list of `Columns`
/// in [`Self::on`].
///
/// Non-equality predicates, which can not pushed down to a join inputs (e.g.
/// `<col1> != <col2>`) are known as "filter expressions" and are evaluated
/// after the equijoin predicates.
///
/// # "Build Side" vs "Probe Side"
///
/// HashJoin takes two inputs, which are referred to as the "build" and the
/// "probe". The build side is the first child, and the probe side is the second
/// child.
///
/// The two inputs are treated differently and it is VERY important that the
/// *smaller* input is placed on the build side to minimize the work of creating
/// the hash table.
///
/// ```text
/// ┌───────────┐
/// │ HashJoin │
/// │ │
/// └───────────┘
/// │ │
/// ┌─────┘ └─────┐
/// ▼ ▼
/// ┌────────────┐ ┌─────────────┐
/// │ Input │ │ Input │
/// │ [0] │ │ [1] │
/// └────────────┘ └─────────────┘
///
/// "build side" "probe side"
/// ```
///
/// Execution proceeds in 2 stages:
///
/// 1. the **build phase** where a hash table is created from the tuples of the
/// build side.
///
/// 2. the **probe phase** where the tuples of the probe side are streamed
/// through, checking for matches of the join keys in the hash table.
///
/// ```text
/// ┌────────────────┐ ┌────────────────┐
/// │ ┌─────────┐ │ │ ┌─────────┐ │
/// │ │ Hash │ │ │ │ Hash │ │
/// │ │ Table │ │ │ │ Table │ │
/// │ │(keys are│ │ │ │(keys are│ │
/// │ │equi join│ │ │ │equi join│ │ Stage 2: batches from
/// Stage 1: the │ │columns) │ │ │ │columns) │ │ the probe side are
/// *entire* build │ │ │ │ │ │ │ │ streamed through, and
/// side is read │ └─────────┘ │ │ └─────────┘ │ checked against the
/// into the hash │ ▲ │ │ ▲ │ contents of the hash
/// table │ HashJoin │ │ HashJoin │ table
/// └──────┼─────────┘ └──────────┼─────┘
/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
/// │ │
///
/// │ │
/// ┌────────────┐ ┌────────────┐
/// │RecordBatch │ │RecordBatch │
/// └────────────┘ └────────────┘
/// ┌────────────┐ ┌────────────┐
/// │RecordBatch │ │RecordBatch │
/// └────────────┘ └────────────┘
/// ... ...
/// ┌────────────┐ ┌────────────┐
/// │RecordBatch │ │RecordBatch │
/// └────────────┘ └────────────┘
///
/// build side probe side
///
/// ```
///
/// # Example "Optimal" Plans
///
/// The differences in the inputs means that for classic "Star Schema Query",
/// the optimal plan will be a **"Right Deep Tree"** . A Star Schema Query is
/// one where there is one large table and several smaller "dimension" tables,
/// joined on `Foreign Key = Primary Key` predicates.
///
/// A "Right Deep Tree" looks like this large table as the probe side on the
/// lowest join:
///
/// ```text
/// ┌───────────┐
/// │ HashJoin │
/// │ │
/// └───────────┘
/// │ │
/// ┌───────┘ └──────────┐
/// ▼ ▼
/// ┌───────────────┐ ┌───────────┐
/// │ small table 1 │ │ HashJoin │
/// │ "dimension" │ │ │
/// └───────────────┘ └───┬───┬───┘
/// ┌──────────┘ └───────┐
/// │ │
/// ▼ ▼
/// ┌───────────────┐ ┌───────────┐
/// │ small table 2 │ │ HashJoin │
/// │ "dimension" │ │ │
/// └───────────────┘ └───┬───┬───┘
/// ┌────────┘ └────────┐
/// │ │
/// ▼ ▼
/// ┌───────────────┐ ┌───────────────┐
/// │ small table 3 │ │ large table │
/// │ "dimension" │ │ "fact" │
/// └───────────────┘ └───────────────┘
/// ```
#[derive(Debug)]
pub struct HashJoinExec {
/// left (build) side which gets hashed
pub left: Arc<dyn ExecutionPlan>,
/// right (probe) side which are filtered by the hash table
pub right: Arc<dyn ExecutionPlan>,
/// Set of common columns used to join on
/// Set of equijoin columns from the relations: `(left_col, right_col)`
pub on: Vec<(Column, Column)>,
/// Filters which are applied while finding matching rows
pub filter: Option<JoinFilter>,
/// How the join is performed
/// How the join is performed (`OUTER`, `INNER`, etc)
pub join_type: JoinType,
/// The schema once the join is applied
/// The output schema for the join
schema: SchemaRef,
/// Build-side data
left_fut: OnceAsync<JoinLeftData>,
/// Shares the `RandomState` for the hashing algorithm
/// Shared the `RandomState` for the hashing algorithm
random_state: RandomState,
/// Output order
output_order: Option<Vec<PhysicalSortExpr>>,
Expand All @@ -110,12 +220,16 @@ pub struct HashJoinExec {
metrics: ExecutionPlanMetricsSet,
/// Information of index and left / right placement of columns
column_indices: Vec<ColumnIndex>,
/// If null_equals_null is true, null == null else null != null
/// Null matching behavior: If `null_equals_null` is true, rows that have
/// `null`s in both left and right equijoin columns will be matched.
/// Otherwise, rows that have `null`s in the join columns will not be
/// matched and thus will not appear in the output.
pub null_equals_null: bool,
}

impl HashJoinExec {
/// Tries to create a new [HashJoinExec].
///
/// # Error
/// This function errors when it is not possible to join the left and right sides on keys `on`.
pub fn try_new(
Expand Down