Skip to content

Commit

Permalink
Use RawTable API in hash join (#827)
Browse files Browse the repository at this point in the history
* Use rawtable API

* Avoid changes

* Check on hash again

* Test fix
  • Loading branch information
Dandandan authored Aug 7, 2021
1 parent 5a7bbcc commit 2c0c062
Showing 1 changed file with 36 additions and 71 deletions.
107 changes: 36 additions & 71 deletions datafusion/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use arrow::{
datatypes::{UInt32Type, UInt64Type},
};
use smallvec::{smallvec, SmallVec};
use std::sync::Arc;
use std::{any::Any, usize};
use std::{hash::Hasher, sync::Arc};
use std::{time::Instant, vec};

use async_trait::async_trait;
Expand All @@ -49,6 +49,8 @@ use arrow::array::{
UInt64Array, UInt8Array,
};

use hashbrown::raw::RawTable;

use super::expressions::Column;
use super::hash_utils::create_hashes;
use super::{
Expand All @@ -65,6 +67,7 @@ use super::{
use crate::physical_plan::coalesce_batches::concat_batches;
use crate::physical_plan::{PhysicalExpr, SQLMetric};
use log::debug;
use std::fmt;

// Maps a `u64` hash value based on the left ["on" values] to a list of indices with this key's value.
//
Expand All @@ -78,7 +81,14 @@ use log::debug;
// but the values don't match. Those are checked in the [equal_rows] macro
// TODO: speed up collission check and move away from using a hashbrown HashMap
// https://github.com/apache/arrow-datafusion/issues/50
type JoinHashMap = HashMap<(), SmallVec<[u64; 1]>, IdHashBuilder>;
struct JoinHashMap(RawTable<(u64, SmallVec<[u64; 1]>)>);

impl fmt::Debug for JoinHashMap {
fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> fmt::Result {
Ok(())
}
}

type JoinLeftData = Arc<(JoinHashMap, RecordBatch)>;

/// join execution plan executes partitions in parallel and combines them into a set of
Expand Down Expand Up @@ -303,10 +313,8 @@ impl ExecutionPlan for HashJoinExec {
Ok(acc)
})
.await?;
let mut hashmap = JoinHashMap::with_capacity_and_hasher(
num_rows,
IdHashBuilder {},
);
let mut hashmap =
JoinHashMap(RawTable::with_capacity(num_rows));
let mut hashes_buffer = Vec::new();
let mut offset = 0;
for batch in batches.iter() {
Expand Down Expand Up @@ -358,8 +366,7 @@ impl ExecutionPlan for HashJoinExec {
Ok(acc)
})
.await?;
let mut hashmap =
JoinHashMap::with_capacity_and_hasher(num_rows, IdHashBuilder {});
let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows));
let mut hashes_buffer = Vec::new();
let mut offset = 0;
for batch in batches.iter() {
Expand Down Expand Up @@ -460,7 +467,7 @@ impl ExecutionPlan for HashJoinExec {
fn update_hash(
on: &[Column],
batch: &RecordBatch,
hash: &mut JoinHashMap,
hash_map: &mut JoinHashMap,
offset: usize,
random_state: &RandomState,
hashes_buffer: &mut Vec<u64>,
Expand All @@ -476,18 +483,18 @@ fn update_hash(

// insert hashes to key of the hashmap
for (row, hash_value) in hash_values.iter().enumerate() {
match hash.raw_entry_mut().from_hash(*hash_value, |_| true) {
hashbrown::hash_map::RawEntryMut::Occupied(mut entry) => {
entry.get_mut().push((row + offset) as u64);
}
hashbrown::hash_map::RawEntryMut::Vacant(entry) => {
entry.insert_hashed_nocheck(
*hash_value,
(),
smallvec![(row + offset) as u64],
);
}
};
let item = hash_map
.0
.get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
if let Some((_, indices)) = item {
indices.push((row + offset) as u64);
} else {
hash_map.0.insert(
*hash_value,
(*hash_value, smallvec![(row + offset) as u64]),
|(hash, _)| *hash,
);
}
}
Ok(())
}
Expand Down Expand Up @@ -678,7 +685,7 @@ fn build_join_indexes(
// This possibly contains rows with hash collisions,
// So we have to check here whether rows are equal or not
if let Some((_, indices)) =
left.raw_entry().from_hash(*hash_value, |_| true)
left.0.get(*hash_value, |(hash, _)| *hash_value == *hash)
{
for &i in indices {
// Check hash collisions
Expand Down Expand Up @@ -710,7 +717,7 @@ fn build_join_indexes(
// First visit all of the rows
for (row, hash_value) in hash_values.iter().enumerate() {
if let Some((_, indices)) =
left.raw_entry().from_hash(*hash_value, |_| true)
left.0.get(*hash_value, |(hash, _)| *hash_value == *hash)
{
for &i in indices {
// Collision check
Expand All @@ -728,7 +735,7 @@ fn build_join_indexes(
let mut right_indices = UInt32Builder::new(0);

for (row, hash_value) in hash_values.iter().enumerate() {
match left.raw_entry().from_hash(*hash_value, |_| true) {
match left.0.get(*hash_value, |(hash, _)| *hash_value == *hash) {
Some((_, indices)) => {
for &i in indices {
if equal_rows(
Expand All @@ -755,38 +762,6 @@ fn build_join_indexes(
}
}
}
use core::hash::BuildHasher;

/// `Hasher` that returns the same `u64` value as a hash, to avoid re-hashing
/// it when inserting/indexing or regrowing the `HashMap`
struct IdHasher {
hash: u64,
}

impl Hasher for IdHasher {
fn finish(&self) -> u64 {
self.hash
}

fn write_u64(&mut self, i: u64) {
self.hash = i;
}

fn write(&mut self, _bytes: &[u8]) {
unreachable!("IdHasher should only be used for u64 keys")
}
}

#[derive(Debug)]
struct IdHashBuilder {}

impl BuildHasher for IdHashBuilder {
type Hasher = IdHasher;

fn build_hasher(&self) -> Self::Hasher {
IdHasher { hash: 0 }
}
}

macro_rules! equal_rows_elem {
($array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident) => {{
Expand Down Expand Up @@ -1776,7 +1751,7 @@ mod tests {

#[test]
fn join_with_hash_collision() -> Result<()> {
let mut hashmap_left = HashMap::with_capacity_and_hasher(2, IdHashBuilder {});
let mut hashmap_left = RawTable::with_capacity(2);
let left = build_table_i32(
("a", &vec![10, 20]),
("x", &vec![100, 200]),
Expand All @@ -1788,27 +1763,17 @@ mod tests {
let hashes =
create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?;

// Create hash collisions
match hashmap_left.raw_entry_mut().from_hash(hashes[0], |_| true) {
hashbrown::hash_map::RawEntryMut::Vacant(entry) => {
entry.insert_hashed_nocheck(hashes[0], (), smallvec![0, 1])
}
_ => unreachable!("Hash should not be vacant"),
};
match hashmap_left.raw_entry_mut().from_hash(hashes[1], |_| true) {
hashbrown::hash_map::RawEntryMut::Vacant(entry) => {
entry.insert_hashed_nocheck(hashes[1], (), smallvec![0, 1])
}
_ => unreachable!("Hash should not be vacant"),
};
// Create hash collisions (same hashes)
hashmap_left.insert(hashes[0], (hashes[0], smallvec![0, 1]), |(h, _)| *h);
hashmap_left.insert(hashes[1], (hashes[1], smallvec![0, 1]), |(h, _)| *h);

let right = build_table_i32(
("a", &vec![10, 20]),
("b", &vec![0, 0]),
("c", &vec![30, 40]),
);

let left_data = JoinLeftData::new((hashmap_left, left));
let left_data = JoinLeftData::new((JoinHashMap(hashmap_left), left));
let (l, r) = build_join_indexes(
&left_data,
&right,
Expand Down

0 comments on commit 2c0c062

Please sign in to comment.