From 2c0c06248667bfeb9c56a4c2119b3a7994b9fc1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 7 Aug 2021 07:55:04 +0200 Subject: [PATCH] Use `RawTable` API in hash join (#827) * Use rawtable API * Avoid changes * Check on hash again * Test fix --- datafusion/src/physical_plan/hash_join.rs | 107 ++++++++-------------- 1 file changed, 36 insertions(+), 71 deletions(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 1a174bb11d10..1a57c404e96e 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -29,8 +29,8 @@ use arrow::{ datatypes::{UInt32Type, UInt64Type}, }; use smallvec::{smallvec, SmallVec}; +use std::sync::Arc; use std::{any::Any, usize}; -use std::{hash::Hasher, sync::Arc}; use std::{time::Instant, vec}; use async_trait::async_trait; @@ -49,6 +49,8 @@ use arrow::array::{ UInt64Array, UInt8Array, }; +use hashbrown::raw::RawTable; + use super::expressions::Column; use super::hash_utils::create_hashes; use super::{ @@ -65,6 +67,7 @@ use super::{ use crate::physical_plan::coalesce_batches::concat_batches; use crate::physical_plan::{PhysicalExpr, SQLMetric}; use log::debug; +use std::fmt; // Maps a `u64` hash value based on the left ["on" values] to a list of indices with this key's value. // @@ -78,7 +81,14 @@ use log::debug; // but the values don't match. Those are checked in the [equal_rows] macro // TODO: speed up collission check and move away from using a hashbrown HashMap // https://github.com/apache/arrow-datafusion/issues/50 -type JoinHashMap = HashMap<(), SmallVec<[u64; 1]>, IdHashBuilder>; +struct JoinHashMap(RawTable<(u64, SmallVec<[u64; 1]>)>); + +impl fmt::Debug for JoinHashMap { + fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> fmt::Result { + Ok(()) + } +} + type JoinLeftData = Arc<(JoinHashMap, RecordBatch)>; /// join execution plan executes partitions in parallel and combines them into a set of @@ -303,10 +313,8 @@ impl ExecutionPlan for HashJoinExec { Ok(acc) }) .await?; - let mut hashmap = JoinHashMap::with_capacity_and_hasher( - num_rows, - IdHashBuilder {}, - ); + let mut hashmap = + JoinHashMap(RawTable::with_capacity(num_rows)); let mut hashes_buffer = Vec::new(); let mut offset = 0; for batch in batches.iter() { @@ -358,8 +366,7 @@ impl ExecutionPlan for HashJoinExec { Ok(acc) }) .await?; - let mut hashmap = - JoinHashMap::with_capacity_and_hasher(num_rows, IdHashBuilder {}); + let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows)); let mut hashes_buffer = Vec::new(); let mut offset = 0; for batch in batches.iter() { @@ -460,7 +467,7 @@ impl ExecutionPlan for HashJoinExec { fn update_hash( on: &[Column], batch: &RecordBatch, - hash: &mut JoinHashMap, + hash_map: &mut JoinHashMap, offset: usize, random_state: &RandomState, hashes_buffer: &mut Vec, @@ -476,18 +483,18 @@ fn update_hash( // insert hashes to key of the hashmap for (row, hash_value) in hash_values.iter().enumerate() { - match hash.raw_entry_mut().from_hash(*hash_value, |_| true) { - hashbrown::hash_map::RawEntryMut::Occupied(mut entry) => { - entry.get_mut().push((row + offset) as u64); - } - hashbrown::hash_map::RawEntryMut::Vacant(entry) => { - entry.insert_hashed_nocheck( - *hash_value, - (), - smallvec![(row + offset) as u64], - ); - } - }; + let item = hash_map + .0 + .get_mut(*hash_value, |(hash, _)| *hash_value == *hash); + if let Some((_, indices)) = item { + indices.push((row + offset) as u64); + } else { + hash_map.0.insert( + *hash_value, + (*hash_value, smallvec![(row + offset) as u64]), + |(hash, _)| *hash, + ); + } } Ok(()) } @@ -678,7 +685,7 @@ fn build_join_indexes( // This possibly contains rows with hash collisions, // So we have to check here whether rows are equal or not if let Some((_, indices)) = - left.raw_entry().from_hash(*hash_value, |_| true) + left.0.get(*hash_value, |(hash, _)| *hash_value == *hash) { for &i in indices { // Check hash collisions @@ -710,7 +717,7 @@ fn build_join_indexes( // First visit all of the rows for (row, hash_value) in hash_values.iter().enumerate() { if let Some((_, indices)) = - left.raw_entry().from_hash(*hash_value, |_| true) + left.0.get(*hash_value, |(hash, _)| *hash_value == *hash) { for &i in indices { // Collision check @@ -728,7 +735,7 @@ fn build_join_indexes( let mut right_indices = UInt32Builder::new(0); for (row, hash_value) in hash_values.iter().enumerate() { - match left.raw_entry().from_hash(*hash_value, |_| true) { + match left.0.get(*hash_value, |(hash, _)| *hash_value == *hash) { Some((_, indices)) => { for &i in indices { if equal_rows( @@ -755,38 +762,6 @@ fn build_join_indexes( } } } -use core::hash::BuildHasher; - -/// `Hasher` that returns the same `u64` value as a hash, to avoid re-hashing -/// it when inserting/indexing or regrowing the `HashMap` -struct IdHasher { - hash: u64, -} - -impl Hasher for IdHasher { - fn finish(&self) -> u64 { - self.hash - } - - fn write_u64(&mut self, i: u64) { - self.hash = i; - } - - fn write(&mut self, _bytes: &[u8]) { - unreachable!("IdHasher should only be used for u64 keys") - } -} - -#[derive(Debug)] -struct IdHashBuilder {} - -impl BuildHasher for IdHashBuilder { - type Hasher = IdHasher; - - fn build_hasher(&self) -> Self::Hasher { - IdHasher { hash: 0 } - } -} macro_rules! equal_rows_elem { ($array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident) => {{ @@ -1776,7 +1751,7 @@ mod tests { #[test] fn join_with_hash_collision() -> Result<()> { - let mut hashmap_left = HashMap::with_capacity_and_hasher(2, IdHashBuilder {}); + let mut hashmap_left = RawTable::with_capacity(2); let left = build_table_i32( ("a", &vec![10, 20]), ("x", &vec![100, 200]), @@ -1788,19 +1763,9 @@ mod tests { let hashes = create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?; - // Create hash collisions - match hashmap_left.raw_entry_mut().from_hash(hashes[0], |_| true) { - hashbrown::hash_map::RawEntryMut::Vacant(entry) => { - entry.insert_hashed_nocheck(hashes[0], (), smallvec![0, 1]) - } - _ => unreachable!("Hash should not be vacant"), - }; - match hashmap_left.raw_entry_mut().from_hash(hashes[1], |_| true) { - hashbrown::hash_map::RawEntryMut::Vacant(entry) => { - entry.insert_hashed_nocheck(hashes[1], (), smallvec![0, 1]) - } - _ => unreachable!("Hash should not be vacant"), - }; + // Create hash collisions (same hashes) + hashmap_left.insert(hashes[0], (hashes[0], smallvec![0, 1]), |(h, _)| *h); + hashmap_left.insert(hashes[1], (hashes[1], smallvec![0, 1]), |(h, _)| *h); let right = build_table_i32( ("a", &vec![10, 20]), @@ -1808,7 +1773,7 @@ mod tests { ("c", &vec![30, 40]), ); - let left_data = JoinLeftData::new((hashmap_left, left)); + let left_data = JoinLeftData::new((JoinHashMap(hashmap_left), left)); let (l, r) = build_join_indexes( &left_data, &right,