Skip to content

Commit

Permalink
perf(common): Optimize null_bitmap with Set64 (#8941)
Browse files Browse the repository at this point in the history
Co-authored-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
kwannoel and BugenZhao authored Apr 6, 2023
1 parent 29389e1 commit eb20393
Show file tree
Hide file tree
Showing 11 changed files with 116 additions and 51 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 2 additions & 8 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::iter::empty;
use std::marker::PhantomData;
use std::sync::Arc;

use fixedbitset::FixedBitSet;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{Array, DataChunk, RowRef};
Expand Down Expand Up @@ -232,13 +231,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
JoinHashMap::with_capacity_and_hasher(build_row_count, PrecomputedBuildHasher);
let mut next_build_row_with_same_key =
ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
let null_matched = {
let mut null_matched = FixedBitSet::with_capacity(self.null_matched.len());
for (idx, col_null_matched) in self.null_matched.into_iter().enumerate() {
null_matched.set(idx, col_null_matched);
}
null_matched
};

let null_matched = self.null_matched.into();

// Build hash map
for (build_chunk_id, build_chunk) in build_side.iter().enumerate() {
Expand Down
11 changes: 2 additions & 9 deletions src/batch/src/executor/join/lookup_join_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@

use std::marker::PhantomData;

use fixedbitset::FixedBitSet;
use futures::StreamExt;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::error::RwError;
use risingwave_common::hash::{HashKey, PrecomputedBuildHasher};
use risingwave_common::hash::{HashKey, NullBitmap, PrecomputedBuildHasher};
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, ToOwnedDatum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
Expand Down Expand Up @@ -67,13 +66,7 @@ impl<K: HashKey> LookupJoinBase<K> {
pub async fn do_execute(mut self: Box<Self>) {
let outer_side_schema = self.outer_side_input.schema().clone();

let null_matched = {
let mut null_matched = FixedBitSet::with_capacity(self.null_safe.len());
for (idx, col_null_matched) in self.null_safe.iter().copied().enumerate() {
null_matched.set(idx, col_null_matched);
}
null_matched
};
let null_matched: NullBitmap = self.null_safe.into();

let mut outer_side_batch_read_stream: BoxedDataChunkListStream =
utils::batch_read(self.outer_side_input.execute(), AT_LEAST_OUTER_SIDE_ROWS);
Expand Down
1 change: 1 addition & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ serde = { version = "1", features = ["derive"] }
serde_default = "0.1"
serde_json = "1"
serde_with = "2"
smallbitset = "0.6.1"
static_assertions = "1"
strum = "0.24"
strum_macros = "0.24"
Expand Down
8 changes: 4 additions & 4 deletions src/common/benches/bench_hash_key_encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ impl HashKeyDispatcher for HashKeyBenchCaseBuilder {
for null_ratio in NULL_RATIOS {
for chunk_size in CHUNK_SIZES {
let id = format!(
"{} {:?}, {} rows, Pr[null]={}",
"{} rows, {} {:?}, Pr[null]={}",
chunk_size,
self.describe,
calc_hash_key_kind(self.data_types()),
chunk_size,
null_ratio
);
let input_chunk = gen_chunk(self.data_types(), *chunk_size, SEED, *null_ratio);
Expand Down Expand Up @@ -194,11 +194,11 @@ fn case_builders() -> Vec<HashKeyBenchCaseBuilder> {
},
HashKeyBenchCaseBuilder {
data_types: vec![DataType::Int32, DataType::Int32, DataType::Int32],
describe: "composite fixed".to_string(),
describe: "composite fixed, case 1".to_string(),
},
HashKeyBenchCaseBuilder {
data_types: vec![DataType::Int32, DataType::Int64, DataType::Int32],
describe: "composite fixed".to_string(),
describe: "composite fixed, case 2".to_string(),
},
HashKeyBenchCaseBuilder {
data_types: vec![DataType::Int32, DataType::Varchar],
Expand Down
88 changes: 72 additions & 16 deletions src/common/src/hash/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::hash::{BuildHasher, Hash, Hasher};
use std::io::{Cursor, Read};

use chrono::{Datelike, Timelike};
use fixedbitset::FixedBitSet;
use smallbitset::Set64;

use crate::array::serial_array::Serial;
use crate::array::{
Expand All @@ -43,6 +43,59 @@ use crate::util::hash_util::Crc32FastBuilder;
use crate::util::iter_util::ZipEqFast;
use crate::util::value_encoding::{deserialize_datum, serialize_datum_into};

pub static MAX_GROUP_KEYS: usize = 64;

/// Bitmap for null values in key.
/// This is specialized for key,
/// since it usually has few group keys.
#[repr(transparent)]
#[derive(Clone, Debug, PartialEq)]
pub struct NullBitmap {
inner: Set64,
}

impl NullBitmap {
fn empty() -> Self {
NullBitmap {
inner: Set64::empty(),
}
}

fn is_empty(&self) -> bool {
self.inner.is_empty()
}

fn set_true(&mut self, idx: usize) {
self.inner.add_inplace(idx);
}

fn contains(&self, x: usize) -> bool {
self.inner.contains(x)
}

pub fn is_subset(&self, other: &NullBitmap) -> bool {
other.inner.contains_all(self.inner)
}
}

impl EstimateSize for NullBitmap {
fn estimated_heap_size(&self) -> usize {
0
}
}

impl<T: AsRef<[bool]> + IntoIterator<Item = bool>> From<T> for NullBitmap {
fn from(value: T) -> Self {
let mut bitmap = NullBitmap::empty();
for (idx, is_true) in value.into_iter().enumerate() {
if is_true {
bitmap.set_true(idx);
}
}
bitmap
}
}

/// A wrapper for u64 hash result.
#[derive(Default, Clone, Copy, Debug, PartialEq)]
pub struct HashCode(pub u64);
Expand Down Expand Up @@ -144,10 +197,10 @@ pub trait HashKey:
) -> ArrayResult<()>;

fn has_null(&self) -> bool {
!self.null_bitmap().is_clear()
!self.null_bitmap().is_empty()
}

fn null_bitmap(&self) -> &FixedBitSet;
fn null_bitmap(&self) -> &NullBitmap;
}

/// Designed for hash keys with at most `N` serialized bytes.
Expand All @@ -157,7 +210,7 @@ pub trait HashKey:
pub struct FixedSizeKey<const N: usize> {
key: [u8; N],
hash_code: u64,
null_bitmap: FixedBitSet,
null_bitmap: NullBitmap,
}

/// Designed for hash keys which can't be represented by [`FixedSizeKey`].
Expand All @@ -168,7 +221,7 @@ pub struct SerializedKey {
// Key encoding.
key: Vec<u8>,
hash_code: u64,
null_bitmap: FixedBitSet,
null_bitmap: NullBitmap,
}

impl<const N: usize> EstimateSize for FixedSizeKey<N> {
Expand Down Expand Up @@ -516,7 +569,7 @@ impl<'a> HashKeySerDe<'a> for ListRef<'a> {

pub struct FixedSizeKeySerializer<const N: usize> {
buffer: [u8; N],
null_bitmap: FixedBitSet,
null_bitmap: NullBitmap,
null_bitmap_idx: usize,
data_len: usize,
hash_code: u64,
Expand All @@ -536,7 +589,7 @@ impl<const N: usize> HashKeySerializer for FixedSizeKeySerializer<N> {
fn from_hash_code(hash_code: HashCode, _estimated_key_size: usize) -> Self {
Self {
buffer: [0u8; N],
null_bitmap: FixedBitSet::with_capacity(u8::BITS as usize),
null_bitmap: NullBitmap::empty(),
null_bitmap_idx: 0,
data_len: 0,
hash_code: hash_code.0,
Expand All @@ -553,7 +606,9 @@ impl<const N: usize> HashKeySerializer for FixedSizeKeySerializer<N> {
self.buffer[self.data_len..(self.data_len + ret.len())].copy_from_slice(ret);
self.data_len += ret.len();
}
None => self.null_bitmap.insert(self.null_bitmap_idx),
None => {
self.null_bitmap.set_true(self.null_bitmap_idx);
}
};
self.null_bitmap_idx += 1;
}
Expand All @@ -569,7 +624,7 @@ impl<const N: usize> HashKeySerializer for FixedSizeKeySerializer<N> {

pub struct FixedSizeKeyDeserializer<const N: usize> {
cursor: Cursor<[u8; N]>,
null_bitmap: FixedBitSet,
null_bitmap: NullBitmap,
null_bitmap_idx: usize,
}

Expand Down Expand Up @@ -600,7 +655,8 @@ impl<const N: usize> HashKeyDeserializer for FixedSizeKeyDeserializer<N> {
pub struct SerializedKeySerializer {
buffer: Vec<u8>,
hash_code: u64,
null_bitmap: FixedBitSet,
null_bitmap: NullBitmap,
null_bitmap_idx: usize,
}

impl HashKeySerializer for SerializedKeySerializer {
Expand All @@ -610,22 +666,22 @@ impl HashKeySerializer for SerializedKeySerializer {
Self {
buffer: Vec::with_capacity(estimated_value_encoding_size),
hash_code: hash_code.0,
null_bitmap: FixedBitSet::new(),
null_bitmap: NullBitmap::empty(),
null_bitmap_idx: 0,
}
}

fn append<'a, D: HashKeySerDe<'a>>(&mut self, data: Option<D>) {
let len_bitmap = self.null_bitmap.len();
self.null_bitmap.grow(len_bitmap + 1);
match data {
Some(v) => {
serialize_datum_into(&Some(v.to_owned_scalar().into()), &mut self.buffer);
}
None => {
serialize_datum_into(&None, &mut self.buffer);
self.null_bitmap.insert(len_bitmap);
self.null_bitmap.set_true(self.null_bitmap_idx);
}
}
self.null_bitmap_idx += 1;
}

fn into_hash_key(self) -> SerializedKey {
Expand Down Expand Up @@ -721,7 +777,7 @@ impl<const N: usize> HashKey for FixedSizeKey<N> {
Ok(())
}

fn null_bitmap(&self) -> &FixedBitSet {
fn null_bitmap(&self) -> &NullBitmap {
&self.null_bitmap
}
}
Expand Down Expand Up @@ -751,7 +807,7 @@ impl HashKey for SerializedKey {
Ok(())
}

fn null_bitmap(&self) -> &FixedBitSet {
fn null_bitmap(&self) -> &NullBitmap {
&self.null_bitmap
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/planner_test/tests/testdata/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1247,3 +1247,8 @@
BatchExchange { order: [idx_desc.a DESC], dist: Single }
└─BatchSortAgg { group_key: [idx_desc.a], aggs: [count] }
└─BatchScan { table: idx_desc, columns: [idx_desc.a], distribution: UpstreamHashShard(idx_desc.a) }
- name: max group keys exceed 64
sql: |
create table t(v1 int, v2 int, v3 int, v4 int, v5 int, v6 int, v7 int, v8 int, v9 int, v10 int, v11 int, v12 int, v13 int, v14 int, v15 int, v16 int, v17 int, v18 int, v19 int, v20 int, v21 int, v22 int, v23 int, v24 int, v25 int, v26 int, v27 int, v28 int, v29 int, v30 int, v31 int, v32 int, v33 int, v34 int, v35 int, v36 int, v37 int, v38 int, v39 int, v40 int, v41 int, v42 int, v43 int, v44 int, v45 int, v46 int, v47 int, v48 int, v49 int, v50 int, v51 int, v52 int, v53 int, v54 int, v55 int, v56 int, v57 int, v58 int, v59 int, v60 int, v61 int, v62 int, v63 int, v64 int, v65 int);
select * from t group by v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19, v20, v21, v22, v23, v24, v25, v26, v27, v28, v29, v30, v31, v32, v33, v34, v35, v36, v37, v38, v39, v40, v41, v42, v43, v44, v45, v46, v47, v48, v49, v50, v51, v52, v53, v54, v55, v56, v57, v58, v59, v60, v61, v62, v63, v64, v65;
binder_error: 'Bind error: Number of Group Keys: 65, exceeded maximum: 64'
10 changes: 10 additions & 0 deletions src/frontend/src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use std::fmt::Debug;

use itertools::Itertools;
use risingwave_common::catalog::{Field, Schema, PG_CATALOG_SCHEMA_NAME};
use risingwave_common::error::ErrorCode::BindError;
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::hash::MAX_GROUP_KEYS;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_sqlparser::ast::{DataType as AstDataType, Distinct, Expr, Select, SelectItem};
Expand Down Expand Up @@ -178,6 +180,14 @@ impl Binder {

// Bind GROUP BY clause.
self.context.clause = Some(Clause::GroupBy);
let number_of_group_keys = select.group_by.len();
if number_of_group_keys > MAX_GROUP_KEYS {
return Err(BindError(format!(
"Number of Group Keys: {}, exceeded maximum: {}",
number_of_group_keys, MAX_GROUP_KEYS,
))
.into());
}
let group_by = select
.group_by
.into_iter()
Expand Down
11 changes: 2 additions & 9 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ use std::sync::Arc;
use std::time::Duration;

use await_tree::InstrumentAwait;
use fixedbitset::FixedBitSet;
use futures::{pin_mut, StreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use multimap::MultiMap;
use risingwave_common::array::{Op, RowRef, StreamChunk};
use risingwave_common::catalog::Schema;
use risingwave_common::hash::HashKey;
use risingwave_common::hash::{HashKey, NullBitmap};
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, ToOwnedDatum};
use risingwave_common::util::epoch::EpochPair;
Expand Down Expand Up @@ -538,13 +537,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
.map(|&idx| original_schema[idx].clone())
.collect();

let null_matched: FixedBitSet = {
let mut null_matched = FixedBitSet::with_capacity(null_safe.len());
for (idx, col_null_matched) in null_safe.into_iter().enumerate() {
null_matched.set(idx, col_null_matched);
}
null_matched
};
let null_matched: NullBitmap = null_safe.into();

let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
Expand Down
Loading

0 comments on commit eb20393

Please sign in to comment.