Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use RawTable API in hash join #827

Merged
merged 4 commits into from
Aug 7, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 36 additions & 71 deletions datafusion/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use arrow::{
datatypes::{UInt32Type, UInt64Type},
};
use smallvec::{smallvec, SmallVec};
use std::sync::Arc;
use std::{any::Any, usize};
use std::{hash::Hasher, sync::Arc};
use std::{time::Instant, vec};

use async_trait::async_trait;
Expand All @@ -49,6 +49,8 @@ use arrow::array::{
UInt64Array, UInt8Array,
};

use hashbrown::raw::RawTable;

use super::expressions::Column;
use super::hash_utils::create_hashes;
use super::{
Expand All @@ -65,6 +67,7 @@ use super::{
use crate::physical_plan::coalesce_batches::concat_batches;
use crate::physical_plan::{PhysicalExpr, SQLMetric};
use log::debug;
use std::fmt;

// Maps a `u64` hash value based on the left ["on" values] to a list of indices with this key's value.
//
Expand All @@ -78,7 +81,14 @@ use log::debug;
// but the values don't match. Those are checked in the [equal_rows] macro
// TODO: speed up collission check and move away from using a hashbrown HashMap
// https://github.com/apache/arrow-datafusion/issues/50
type JoinHashMap = HashMap<(), SmallVec<[u64; 1]>, IdHashBuilder>;
struct JoinHashMap(RawTable<(u64, SmallVec<[u64; 1]>)>);

impl fmt::Debug for JoinHashMap {
fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> fmt::Result {
Ok(())
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Todo

}

type JoinLeftData = Arc<(JoinHashMap, RecordBatch)>;

/// join execution plan executes partitions in parallel and combines them into a set of
Expand Down Expand Up @@ -303,10 +313,8 @@ impl ExecutionPlan for HashJoinExec {
Ok(acc)
})
.await?;
let mut hashmap = JoinHashMap::with_capacity_and_hasher(
num_rows,
IdHashBuilder {},
);
let mut hashmap =
JoinHashMap(RawTable::with_capacity(num_rows));
let mut hashes_buffer = Vec::new();
let mut offset = 0;
for batch in batches.iter() {
Expand Down Expand Up @@ -358,8 +366,7 @@ impl ExecutionPlan for HashJoinExec {
Ok(acc)
})
.await?;
let mut hashmap =
JoinHashMap::with_capacity_and_hasher(num_rows, IdHashBuilder {});
let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows));
let mut hashes_buffer = Vec::new();
let mut offset = 0;
for batch in batches.iter() {
Expand Down Expand Up @@ -460,7 +467,7 @@ impl ExecutionPlan for HashJoinExec {
fn update_hash(
on: &[Column],
batch: &RecordBatch,
hash: &mut JoinHashMap,
hash_map: &mut JoinHashMap,
offset: usize,
random_state: &RandomState,
hashes_buffer: &mut Vec<u64>,
Expand All @@ -476,18 +483,18 @@ fn update_hash(

// insert hashes to key of the hashmap
for (row, hash_value) in hash_values.iter().enumerate() {
match hash.raw_entry_mut().from_hash(*hash_value, |_| true) {
hashbrown::hash_map::RawEntryMut::Occupied(mut entry) => {
entry.get_mut().push((row + offset) as u64);
}
hashbrown::hash_map::RawEntryMut::Vacant(entry) => {
entry.insert_hashed_nocheck(
*hash_value,
(),
smallvec![(row + offset) as u64],
);
}
};
let item = hash_map
.0
.get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand how this works in the case of hash collisions. Don't you also have to check if the existing values in the table (aka the _) are the same as new row that came in to ensure two rows don't have the same hash values?

Although now that I write this, I am not sure how the existing structure handles hash collisions either as it seems to only look at the hash values and not the values in the rows themselves.

I found creating a test for collisions incredibly hard, btw, as I couldn't actually find any u32 values that collided and I blew out the memory on my machine searching for u64 values that collided 😆

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand how this works in the case of hash collisions. Don't you also have to check if the existing values in the table (aka the _) are the same as new row that came in to ensure two rows don't have the same hash values?

This doesn't check for actual values but AFAIK is required to avoid mapping/finding different hash values in the same bucket and finding the wrong bucket for a different hash. I tested it earlier and it seems the case should also be extremely rare too (couldn't get a single collission within 100M values). Problem of having the hash values in a same bucket is that another u64 hash could point to the wrong bucket in the probing phase.
I couldn't find a way yet to test this properly. The case of hash collissions by having the offsets present in different u64 has a unit test.

See here for the note where it says that there could be a same bucket for different hashes:
https://docs.rs/hashbrown/0.11.2/src/hashbrown/raw/mod.rs.html#1043-1053

Also the u64 output of the hasher seem of high quality so very hard to get collissions there indeed (at least for single columns, with multiple columns combine_hashes also affects the quality).
The actual case for detecting whether there is a colission of the values for the offsets happens at the moment elsewhere in the code.

I am not sure whether it makes sense to do the "value equality" check inside the eq parameter of the insert for the hash join. Probably it will only add time if there are not a lot of actual u64 collissions, as it should be done in the probing too anyway.

if let Some((_, indices)) = item {
indices.push((row + offset) as u64);
} else {
hash_map.0.insert(
*hash_value,
(*hash_value, smallvec![(row + offset) as u64]),
|(hash, _)| *hash,
);
}
}
Ok(())
}
Expand Down Expand Up @@ -678,7 +685,7 @@ fn build_join_indexes(
// This possibly contains rows with hash collisions,
// So we have to check here whether rows are equal or not
if let Some((_, indices)) =
left.raw_entry().from_hash(*hash_value, |_| true)
left.0.get(*hash_value, |(hash, _)| *hash_value == *hash)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

caching the hash in the value is pretty slick

{
for &i in indices {
// Check hash collisions
Expand Down Expand Up @@ -710,7 +717,7 @@ fn build_join_indexes(
// First visit all of the rows
for (row, hash_value) in hash_values.iter().enumerate() {
if let Some((_, indices)) =
left.raw_entry().from_hash(*hash_value, |_| true)
left.0.get(*hash_value, |(hash, _)| *hash_value == *hash)
{
for &i in indices {
// Collision check
Expand All @@ -728,7 +735,7 @@ fn build_join_indexes(
let mut right_indices = UInt32Builder::new(0);

for (row, hash_value) in hash_values.iter().enumerate() {
match left.raw_entry().from_hash(*hash_value, |_| true) {
match left.0.get(*hash_value, |(hash, _)| *hash_value == *hash) {
Some((_, indices)) => {
for &i in indices {
if equal_rows(
Expand All @@ -755,38 +762,6 @@ fn build_join_indexes(
}
}
}
use core::hash::BuildHasher;

/// `Hasher` that returns the same `u64` value as a hash, to avoid re-hashing
/// it when inserting/indexing or regrowing the `HashMap`
struct IdHasher {
hash: u64,
}

impl Hasher for IdHasher {
fn finish(&self) -> u64 {
self.hash
}

fn write_u64(&mut self, i: u64) {
self.hash = i;
}

fn write(&mut self, _bytes: &[u8]) {
unreachable!("IdHasher should only be used for u64 keys")
}
}

#[derive(Debug)]
struct IdHashBuilder {}

impl BuildHasher for IdHashBuilder {
type Hasher = IdHasher;

fn build_hasher(&self) -> Self::Hasher {
IdHasher { hash: 0 }
}
}

macro_rules! equal_rows_elem {
($array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident) => {{
Expand Down Expand Up @@ -1776,7 +1751,7 @@ mod tests {

#[test]
fn join_with_hash_collision() -> Result<()> {
let mut hashmap_left = HashMap::with_capacity_and_hasher(2, IdHashBuilder {});
let mut hashmap_left = RawTable::with_capacity(2);
let left = build_table_i32(
("a", &vec![10, 20]),
("x", &vec![100, 200]),
Expand All @@ -1788,27 +1763,17 @@ mod tests {
let hashes =
create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?;

// Create hash collisions
match hashmap_left.raw_entry_mut().from_hash(hashes[0], |_| true) {
hashbrown::hash_map::RawEntryMut::Vacant(entry) => {
entry.insert_hashed_nocheck(hashes[0], (), smallvec![0, 1])
}
_ => unreachable!("Hash should not be vacant"),
};
match hashmap_left.raw_entry_mut().from_hash(hashes[1], |_| true) {
hashbrown::hash_map::RawEntryMut::Vacant(entry) => {
entry.insert_hashed_nocheck(hashes[1], (), smallvec![0, 1])
}
_ => unreachable!("Hash should not be vacant"),
};
// Create hash collisions (same hashes)
hashmap_left.insert(hashes[0], (hashes[0], smallvec![0, 1]), |(h, _)| *h);
hashmap_left.insert(hashes[1], (hashes[1], smallvec![0, 1]), |(h, _)| *h);

let right = build_table_i32(
("a", &vec![10, 20]),
("b", &vec![0, 0]),
("c", &vec![30, 40]),
);

let left_data = JoinLeftData::new((hashmap_left, left));
let left_data = JoinLeftData::new((JoinHashMap(hashmap_left), left));
let (l, r) = build_join_indexes(
&left_data,
&right,
Expand Down