From 158650d45d4105b3d8bf57de429dad2ac23ba047 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Tue, 31 Jan 2023 00:11:24 +0800 Subject: [PATCH 1/9] update --- src/common/hashtable/src/hashtable.rs | 63 ++++++++- src/common/hashtable/src/lib.rs | 1 + .../hashtable/src/simple_unsized_hashtable.rs | 49 +++++++ src/common/hashtable/src/tail_array.rs | 133 ++++++++++++++++++ src/common/hashtable/src/traits.rs | 6 + .../hashtable/src/twolevel_hashtable.rs | 6 + src/common/hashtable/src/unsized_hashtable.rs | 65 +++++++++ .../aggregator/aggregator_partial.rs | 57 ++++++++ .../aggregator/aggregator_twolevel.rs | 4 + .../transforms/transform_aggregator.rs | 2 + 10 files changed, 379 insertions(+), 7 deletions(-) create mode 100644 src/common/hashtable/src/tail_array.rs diff --git a/src/common/hashtable/src/hashtable.rs b/src/common/hashtable/src/hashtable.rs index b4594388e3c2..d76088740a02 100644 --- a/src/common/hashtable/src/hashtable.rs +++ b/src/common/hashtable/src/hashtable.rs @@ -27,6 +27,9 @@ use super::table0::Table0IterMut; use super::traits::HashtableLike; use super::traits::Keyable; use super::utils::ZeroEntry; +use crate::tail_array::TailArray; +use crate::tail_array::TailArrayIter; +use crate::tail_array::TailArrayIterMut; use crate::FastHash; pub struct Hashtable> @@ -36,6 +39,7 @@ where { pub(crate) zero: ZeroEntry, pub(crate) table: Table0, A>, A>, + pub(crate) tails: Option>, } unsafe impl Send for Hashtable {} @@ -77,6 +81,7 @@ where Self { table: Table0::with_capacity_in(capacity, allocator), zero: ZeroEntry(None), + tails: None, } } #[inline(always)] @@ -145,6 +150,11 @@ where return Ok(zero); } } + + if let Some(tails) = &mut self.tails { + return Ok(tails.insert(key)); + } + self.table.check_grow(); self.table.insert(key) } @@ -159,8 +169,10 @@ where } } pub fn iter(&self) -> HashtableIter<'_, K, V> { + let tail_iter = self.tails.as_ref().map(|tails| tails.iter()); HashtableIter { - inner: self.zero.iter().chain(self.table.iter()), + inner: Some(self.zero.iter().chain(self.table.iter())), + tail_iter, } } } @@ -191,7 +203,8 @@ where } pub struct HashtableIter<'a, K, V> { - pub inner: std::iter::Chain>, Table0Iter<'a, K, V>>, + pub inner: Option>, Table0Iter<'a, K, V>>>, + pub tail_iter: Option>, } impl<'a, K, V> Iterator for HashtableIter<'a, K, V> @@ -200,12 +213,26 @@ where K: Keyable type Item = &'a Entry; fn next(&mut self) -> Option { - self.inner.next() + if let Some(it) = self.inner.as_mut() { + if let Some(e) = it.next() { + return Some(e); + } + self.inner = None; + } + + if let Some(it) = self.tail_iter.as_mut() { + if let Some(e) = it.next() { + return Some(e); + } + self.tail_iter = None; + } + None } } pub struct HashtableIterMut<'a, K, V> { - inner: std::iter::Chain>, Table0IterMut<'a, K, V>>, + inner: Option>, Table0IterMut<'a, K, V>>>, + tail_iter: Option>, } impl<'a, K, V> Iterator for HashtableIterMut<'a, K, V> @@ -214,14 +241,27 @@ where K: Keyable type Item = &'a mut Entry; fn next(&mut self) -> Option { - self.inner.next() + if let Some(it) = self.inner.as_mut() { + if let Some(e) = it.next() { + return Some(e); + } + self.inner = None; + } + + if let Some(it) = self.tail_iter.as_mut() { + if let Some(e) = it.next() { + return Some(e); + } + self.tail_iter = None; + } + None } } impl HashtableLike for Hashtable where K: Keyable + FastHash, - A: Allocator + Clone + 'static, + A: Allocator + Default + Clone + 'static, { type Key = K; type Value = V; @@ -309,13 +349,22 @@ where } fn iter(&self) -> Self::Iterator<'_> { + let tail_iter = self.tails.as_ref().map(|tails| tails.iter()); HashtableIter { - inner: self.zero.iter().chain(self.table.iter()), + inner: Some(self.zero.iter().chain(self.table.iter())), + tail_iter, } } fn clear(&mut self) { self.zero.0.take(); self.table.clear(); + let _ = self.tails.take(); + } + + fn enable_tail_array(&mut self) { + if self.tails.is_none() { + self.tails = Some(TailArray::new(Default::default())); + } } } diff --git a/src/common/hashtable/src/lib.rs b/src/common/hashtable/src/lib.rs index 4eaec69a5c10..cd0a12b28f2e 100644 --- a/src/common/hashtable/src/lib.rs +++ b/src/common/hashtable/src/lib.rs @@ -30,6 +30,7 @@ mod simple_unsized_hashtable; #[allow(dead_code)] mod table1; mod table_empty; +mod tail_array; mod traits; mod twolevel_hashtable; mod unsized_hashtable; diff --git a/src/common/hashtable/src/simple_unsized_hashtable.rs b/src/common/hashtable/src/simple_unsized_hashtable.rs index 3cb4be0c53ec..7a364409b0d4 100644 --- a/src/common/hashtable/src/simple_unsized_hashtable.rs +++ b/src/common/hashtable/src/simple_unsized_hashtable.rs @@ -32,6 +32,9 @@ use crate::table0::Table0IterMut; use crate::table_empty::TableEmpty; use crate::table_empty::TableEmptyIter; use crate::table_empty::TableEmptyIterMut; +use crate::tail_array::TailArray; +use crate::tail_array::TailArrayIter; +use crate::tail_array::TailArrayIterMut; use crate::unsized_hashtable::FallbackKey; /// Simple unsized hashtable is used for storing unsized keys in arena. It can be worked with HashMethodSerializer. @@ -46,6 +49,7 @@ where pub(crate) key_size: usize, pub(crate) table_empty: TableEmpty, pub(crate) table: Table0, A>, A>, + pub(crate) tails: Option>, pub(crate) _phantom: PhantomData, } @@ -113,6 +117,7 @@ where key_size: 0, table_empty: TableEmpty::new_in(allocator.clone()), table: Table0::with_capacity_in(capacity, allocator), + tails: None, _phantom: PhantomData, } } @@ -160,6 +165,13 @@ where ) }), _ => { + if let Some(tails) = &mut self.tails { + let key = FallbackKey::new(key); + return Ok(SimpleUnsizedHashtableEntryMutRef( + SimpleUnsizedHashtableEntryMutRefInner::Table(tails.insert(key)), + )); + } + self.table.check_grow(); self.table .insert(FallbackKey::new(key)) @@ -195,6 +207,7 @@ where K: UnsizedKeyable + ?Sized { it_empty: Option>, it: Option>, + tail_it: Option>, _phantom: PhantomData<&'a mut K>, } @@ -220,6 +233,15 @@ where K: UnsizedKeyable + ?Sized } self.it = None; } + + if let Some(it) = self.tail_it.as_mut() { + if let Some(e) = it.next() { + return Some(SimpleUnsizedHashtableEntryRef( + SimpleUnsizedHashtableEntryRefInner::Table(e), + )); + } + self.tail_it = None; + } None } } @@ -229,6 +251,7 @@ where K: UnsizedKeyable + ?Sized { it_empty: Option>, it: Option>, + tail_it: Option>, _phantom: PhantomData<&'a mut K>, } @@ -255,6 +278,15 @@ where K: UnsizedKeyable + ?Sized } self.it = None; } + + if let Some(it) = self.tail_it.as_mut() { + if let Some(e) = it.next() { + return Some(SimpleUnsizedHashtableEntryMutRef( + SimpleUnsizedHashtableEntryMutRefInner::Table(e), + )); + } + self.tail_it = None; + } None } } @@ -535,6 +567,15 @@ where A: Allocator + Clone + Default }), _ => { + if let Some(tails) = &mut self.tails { + let s = self.arena.alloc_slice_copy(key); + let key = FallbackKey::new(s); + + return Ok(SimpleUnsizedHashtableEntryMutRef( + SimpleUnsizedHashtableEntryMutRefInner::Table(tails.insert(key)), + )); + } + self.table.check_grow(); match self.table.insert(FallbackKey::new(key)) { Ok(e) => { @@ -604,13 +645,21 @@ where A: Allocator + Clone + Default SimpleUnsizedHashtableIter { it_empty: Some(self.table_empty.iter()), it: Some(self.table.iter()), + tail_it: self.tails.as_ref().map(|x| x.iter()), _phantom: PhantomData, } } + fn enable_tail_array(&mut self) { + if self.tails.is_none() { + self.tails = Some(TailArray::new(Default::default())); + } + } + fn clear(&mut self) { self.table_empty.clear(); self.table.clear(); + let _ = self.tails.take(); drop(std::mem::take(&mut self.arena)); } } diff --git a/src/common/hashtable/src/tail_array.rs b/src/common/hashtable/src/tail_array.rs new file mode 100644 index 000000000000..4767da98ab91 --- /dev/null +++ b/src/common/hashtable/src/tail_array.rs @@ -0,0 +1,133 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::alloc::Allocator; + +use super::table0::Entry; +use super::traits::Keyable; +use crate::container::Container; +use crate::container::StackContainer; + +const SIZE: usize = 8192; + +pub struct TailArray { + allocator: A, + pub(crate) datas: Vec, SIZE, A>>, + pub(crate) num_items: usize, +} + +impl TailArray +where + K: Keyable, + A: Allocator + Clone, +{ + pub fn new(allocator: A) -> Self { + Self { + datas: vec![], + num_items: 0, + allocator, + } + } + + pub fn insert(&mut self, key: K) -> &mut Entry { + let pos = self.num_items % SIZE; + if pos == 0 { + let container = unsafe { StackContainer::new_zeroed(SIZE, self.allocator.clone()) }; + self.datas.push(container); + } + + let tail = self.datas.last_mut().unwrap(); + unsafe { tail[pos].set_key(key) }; + + self.num_items += 1; + &mut tail[pos] + } + + pub fn iter(&self) -> TailArrayIter<'_, K, V> { + TailArrayIter { + values: self.datas.iter().map(|v| v.as_ref()).collect(), + num_items: self.num_items, + i: 0, + } + } + + #[allow(dead_code)] + pub fn iter_mut(&mut self) -> TailArrayIterMut<'_, K, V> { + TailArrayIterMut { + values: self.datas.iter_mut().map(|v| v.as_mut()).collect(), + num_items: self.num_items, + i: 0, + } + } +} + +pub struct TailArrayIter<'a, K, V> { + values: Vec<&'a [Entry]>, + num_items: usize, + i: usize, +} + +impl<'a, K, V> Iterator for TailArrayIter<'a, K, V> { + type Item = &'a Entry; + + fn next(&mut self) -> Option { + if self.i >= self.num_items { + None + } else { + let array = self.i / SIZE; + let pos = self.i % SIZE; + + let v = self.values[array]; + let res = &v.as_ref()[pos]; + self.i += 1; + Some(res) + } + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + (self.num_items - self.i, Some(self.num_items - self.i)) + } +} + +pub struct TailArrayIterMut<'a, K, V> { + values: Vec<&'a mut [Entry]>, + num_items: usize, + i: usize, +} + +impl<'a, K, V> Iterator for TailArrayIterMut<'a, K, V> +where Self: 'a +{ + type Item = &'a mut Entry where Self: 'a ; + + fn next(&mut self) -> Option { + if self.i >= self.num_items { + None + } else { + let array = self.i / SIZE; + let pos = self.i % SIZE; + + let v = &mut self.values[array]; + let res = unsafe { &mut *(v.as_ptr().add(pos) as *mut _) }; + self.i += 1; + Some(res) + } + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + (self.num_items - self.i, Some(self.num_items - self.i)) + } +} diff --git a/src/common/hashtable/src/traits.rs b/src/common/hashtable/src/traits.rs index 8eb9a8d0802c..3b7e27f679eb 100644 --- a/src/common/hashtable/src/traits.rs +++ b/src/common/hashtable/src/traits.rs @@ -491,5 +491,11 @@ pub trait HashtableLike { fn iter(&self) -> Self::Iterator<'_>; + /// Note: don't call this function unless you want what you are doing. + /// This function will make hashtable not like `hashtable` but a linked list. + /// If it's enabled, we will push all new keys into the tail of the linked list. + /// This is used in partial aggregation + fn enable_tail_array(&mut self) {} + fn clear(&mut self); } diff --git a/src/common/hashtable/src/twolevel_hashtable.rs b/src/common/hashtable/src/twolevel_hashtable.rs index 17fd52ca8400..94c34807b7bd 100644 --- a/src/common/hashtable/src/twolevel_hashtable.rs +++ b/src/common/hashtable/src/twolevel_hashtable.rs @@ -165,6 +165,12 @@ impl> Hashtable inner_table.clear(); } } + + fn enable_tail_array(&mut self) { + for inner_table in &mut self.tables { + inner_table.enable_tail_array(); + } + } } pub struct TwoLevelHashtableIter { diff --git a/src/common/hashtable/src/unsized_hashtable.rs b/src/common/hashtable/src/unsized_hashtable.rs index 5fc79484d30c..a26db2314ab9 100644 --- a/src/common/hashtable/src/unsized_hashtable.rs +++ b/src/common/hashtable/src/unsized_hashtable.rs @@ -37,6 +37,9 @@ use crate::table0::Table0IterMut; use crate::table_empty::TableEmpty; use crate::table_empty::TableEmptyIter; use crate::table_empty::TableEmptyIterMut; +use crate::tail_array::TailArray; +use crate::tail_array::TailArrayIter; +use crate::tail_array::TailArrayIterMut; pub struct UnsizedHashtable> where @@ -50,6 +53,7 @@ where pub(crate) table2: Table0, V, HeapContainer, V>, A>, A>, pub(crate) table3: Table0, V, HeapContainer, V>, A>, A>, pub(crate) table4: Table0, A>, A>, + pub(crate) tails: Option>, pub(crate) _phantom: PhantomData, } @@ -123,6 +127,7 @@ where table2: Table0::with_capacity_in(capacity, allocator.clone()), table3: Table0::with_capacity_in(capacity, allocator.clone()), table4: Table0::with_capacity_in(capacity, allocator), + tails: None, _phantom: PhantomData, } } @@ -160,6 +165,16 @@ where key: *const K, ) -> Result, UnsizedHashtableEntryMutRef<'_, K, V>> { let key = (*key).as_bytes(); + + if !key.is_empty() { + if let Some(tails) = &mut self.tails { + let key = FallbackKey::new(key); + return Ok(UnsizedHashtableEntryMutRef( + UnsizedHashtableEntryMutRefInner::Table4(tails.insert(key)), + )); + } + } + match key.len() { _ if key.last().copied() == Some(0) => { self.table4.check_grow(); @@ -271,6 +286,7 @@ where K: UnsizedKeyable + ?Sized it_2: Option, V>>, it_3: Option, V>>, it_4: Option>, + tail_it: Option>, _phantom: PhantomData<&'a mut K>, } @@ -320,6 +336,15 @@ where K: UnsizedKeyable + ?Sized } self.it_4 = None; } + + if let Some(it) = self.tail_it.as_mut() { + if let Some(e) = it.next() { + return Some(UnsizedHashtableEntryRef( + UnsizedHashtableEntryRefInner::Table4(e), + )); + } + self.tail_it = None; + } None } } @@ -332,6 +357,8 @@ where K: UnsizedKeyable + ?Sized it_2: Option, V>>, it_3: Option, V>>, it_4: Option>, + + tail_it: Option>, _phantom: PhantomData<&'a mut K>, } @@ -381,6 +408,15 @@ where K: UnsizedKeyable + ?Sized } self.it_4 = None; } + + if let Some(it) = self.tail_it.as_mut() { + if let Some(e) = it.next() { + return Some(UnsizedHashtableEntryMutRef( + UnsizedHashtableEntryMutRefInner::Table4(e), + )); + } + self.tail_it = None; + } None } } @@ -877,6 +913,16 @@ where A: Allocator + Clone + Default key: &Self::Key, ) -> Result, Self::EntryMutRef<'_>> { let key = key.as_bytes(); + + if !key.is_empty() { + if let Some(tails) = &mut self.tails { + let key = FallbackKey::new(key); + return Ok(UnsizedHashtableEntryMutRef( + UnsizedHashtableEntryMutRefInner::Table4(tails.insert(key)), + )); + } + } + match key.len() { _ if key.last().copied() == Some(0) => { self.table4.check_grow(); @@ -988,6 +1034,16 @@ where A: Allocator + Clone + Default hash: u64, ) -> Result, Self::EntryMutRef<'_>> { let key = key.as_bytes(); + + if !key.is_empty() { + if let Some(tails) = &mut self.tails { + let key = FallbackKey::new(key); + return Ok(UnsizedHashtableEntryMutRef( + UnsizedHashtableEntryMutRefInner::Table4(tails.insert(key)), + )); + } + } + match key.len() { _ if key.last().copied() == Some(0) => { self.table4.check_grow(); @@ -1103,16 +1159,25 @@ where A: Allocator + Clone + Default it_2: Some(self.table2.iter()), it_3: Some(self.table3.iter()), it_4: Some(self.table4.iter()), + tail_it: self.tails.as_ref().map(|x| x.iter()), _phantom: PhantomData, } } + fn enable_tail_array(&mut self) { + if self.tails.is_none() { + self.tails = Some(TailArray::new(Default::default())); + } + } + fn clear(&mut self) { self.table0.clear(); self.table1.clear(); self.table2.clear(); self.table3.clear(); self.table4.clear(); + + let _ = self.tails.take(); // NOTE: Bump provides the reset function to free memory, but it will cause memory leakage(maybe a bug). // In fact, we don't need to call the drop function. rust will call it, But we call it to improve the readability of the code. drop(std::mem::take(&mut self.arena)); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs index 8b7653ff24ba..ee36418005db 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs @@ -43,6 +43,8 @@ where Method: HashMethod + PolymorphicKeysHelper pub hash_table: Method::HashTable, pub params: Arc, pub generated: bool, + pub should_expand_table: bool, + pub input_rows: usize, } impl + Send> @@ -57,6 +59,8 @@ impl + S area: Some(Area::create()), states_dropped: false, generated: false, + should_expand_table: true, + input_rows: 0, }) } @@ -183,6 +187,8 @@ impl + S let mut group_key_builder = self .method .keys_column_builder(state_groups_len, value_size); + + // TODO use batch for group_entity in self.hash_table.iter() { let place = Into::::into(*group_entity.get()); @@ -208,6 +214,46 @@ impl + S columns.push(group_key_col); Ok(vec![DataBlock::new_from_columns(columns)]) } + + fn should_expand_table(&self) -> bool { + /// ideas from https://github.com/apache/impala/blob/b3e9c4a65fa63da6f373c9ecec41fe4247e5e7d8/be/src/exec/grouping-aggregator.cc + static STREAMING_HT_MIN_REDUCTION: [(usize, f64); 3] = + [(0, 0.0), (256 * 1024, 1.1), (2 * 1024 * 1024, 2.0)]; + + let ht_mem = self.hash_table.bytes_len(); + let ht_rows = self.hash_table.len(); + + if ht_rows == 0 { + return true; + } + + let mut cache_level = 0; + loop { + if cache_level + 1 < STREAMING_HT_MIN_REDUCTION.len() + && ht_mem >= STREAMING_HT_MIN_REDUCTION[cache_level + 1].0 + { + cache_level += 1; + continue; + } + break; + } + + let aggregated_input_rows = self.input_rows; + let current_reduction = aggregated_input_rows as f64 / ht_rows as f64; + // TODO ADD estimated reduction, currently we use current reduction + let estimated_reduction = current_reduction; + let min_reduction = STREAMING_HT_MIN_REDUCTION[cache_level].1; + + if estimated_reduction <= min_reduction { + println!( + "YY: {:?} {:?} {}", + estimated_reduction, + min_reduction, + estimated_reduction > min_reduction + ); + } + estimated_reduction > min_reduction + } } impl + Send> Aggregator @@ -216,6 +262,7 @@ impl + S const NAME: &'static str = "GroupByPartialTransform"; fn consume(&mut self, block: DataBlock) -> Result<()> { + self.input_rows += block.num_rows(); let block = block.convert_to_full(); // 1.1 and 1.2. let group_columns = Self::group_columns(&block, &self.params.group_columns); @@ -243,6 +290,16 @@ impl + S fn generate(&mut self) -> Result> { self.generate_data() } + + fn check_expandsion(&mut self) { + if self.should_expand_table { + self.should_expand_table = self.should_expand_table(); + + if !self.should_expand_table { + self.hash_table.enable_tail_array(); + } + } + } } impl> diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_twolevel.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_twolevel.rs index 3d112f888972..a8680ee9bd0d 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_twolevel.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_twolevel.rs @@ -114,6 +114,8 @@ where method: two_level_method, hash_table: two_level_hashtable, generated: false, + should_expand_table: true, + input_rows: self.input_rows, }, }) } @@ -252,6 +254,8 @@ impl Aggregator for TwoLevelAggregator { #[inline(always)] fn consume(&mut self, data: DataBlock) -> Result<()> { + // only checked in two level aggregator for high cardinality group by + self.inner.check_expandsion(); self.inner.consume(data) } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs b/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs index 394c2f972b3f..39b00d71452e 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs @@ -103,6 +103,8 @@ impl TransformAggregator { pub trait Aggregator: Sized + Send { const NAME: &'static str; + fn check_expandsion(&mut self) {} + fn consume(&mut self, data: DataBlock) -> Result<()>; // Generate could be called multiple times util it returns empty. fn generate(&mut self) -> Result>; From 3e5bb9c5a283c7f79c0476342fb562347e450e74 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Tue, 31 Jan 2023 15:35:24 +0800 Subject: [PATCH 2/9] enable hashstate info --- src/common/hashtable/src/container.rs | 2 +- src/common/hashtable/src/hashtable.rs | 43 +++++++-- .../hashtable/src/simple_unsized_hashtable.rs | 28 +++++- src/common/hashtable/src/tail_array.rs | 26 ++++-- src/common/hashtable/src/unsized_hashtable.rs | 23 ++++- src/common/hashtable/tests/it/main.rs | 58 ++++++++++++ src/query/expression/src/block.rs | 5 - .../api/rpc/exchange/exchange_sink_merge.rs | 2 +- .../api/rpc/exchange/exchange_sink_shuffle.rs | 2 +- .../api/rpc/exchange/exchange_transform.rs | 2 +- .../src/api/rpc/flight_scatter_hash.rs | 8 +- .../api/rpc/packets/packet_data_precommit.rs | 2 +- .../service/src/pipelines/pipeline_builder.rs | 31 ++++++- .../aggregator/aggregate_hashstate_info.rs | 68 ++++++++++++++ .../aggregator/aggregator_final_parallel.rs | 91 +++++++++++++++---- .../aggregator/aggregator_partial.rs | 27 ++++-- .../aggregator/aggregator_twolevel.rs | 19 +++- .../processors/transforms/aggregator/mod.rs | 2 + .../group_by/aggregator_keys_builder.rs | 14 +-- .../group_by/aggregator_polymorphic_keys.rs | 24 ++--- .../transforms/group_by/aggregator_state.rs | 26 ++++++ .../processors/transforms/group_by/mod.rs | 1 + .../transforms/transform_aggregator.rs | 17 +++- .../transforms/transform_convert_grouping.rs | 51 +++++++++-- .../fuse/src/operations/operation_log.rs | 4 +- 25 files changed, 473 insertions(+), 103 deletions(-) create mode 100644 src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_hashstate_info.rs diff --git a/src/common/hashtable/src/container.rs b/src/common/hashtable/src/container.rs index 551b9a36a0fd..427b4d55b4d0 100644 --- a/src/common/hashtable/src/container.rs +++ b/src/common/hashtable/src/container.rs @@ -66,7 +66,7 @@ unsafe impl Container for HeapContainer { #[inline(always)] fn len(&self) -> usize { - self.as_ref().len() + self.0.len() } fn heap_bytes(&self) -> usize { diff --git a/src/common/hashtable/src/hashtable.rs b/src/common/hashtable/src/hashtable.rs index d76088740a02..07ac20d977af 100644 --- a/src/common/hashtable/src/hashtable.rs +++ b/src/common/hashtable/src/hashtable.rs @@ -90,11 +90,22 @@ where } #[inline(always)] pub fn len(&self) -> usize { - self.zero.is_some() as usize + self.table.len() + // AsRef it's cost + let tail_len = match &self.tails { + Some(tails) => tails.len(), + None => 0, + }; + + self.zero.is_some() as usize + self.table.len() + tail_len } #[inline(always)] pub fn capacity(&self) -> usize { - self.zero.is_some() as usize + self.table.capacity() + // AsRef it's cost + let tail_capacity = match &self.tails { + Some(tails) => tails.capacity(), + None => 0, + }; + self.zero.is_some() as usize + self.table.capacity() + tail_capacity } #[inline(always)] pub fn entry(&self, key: &K) -> Option<&Entry> { @@ -171,7 +182,8 @@ where pub fn iter(&self) -> HashtableIter<'_, K, V> { let tail_iter = self.tails.as_ref().map(|tails| tails.iter()); HashtableIter { - inner: Some(self.zero.iter().chain(self.table.iter())), + empty_iter: self.zero.iter(), + table0_iter: Some(self.table.iter()), tail_iter, } } @@ -203,7 +215,8 @@ where } pub struct HashtableIter<'a, K, V> { - pub inner: Option>, Table0Iter<'a, K, V>>>, + pub empty_iter: std::option::Iter<'a, Entry>, + pub table0_iter: Option>, pub tail_iter: Option>, } @@ -213,11 +226,15 @@ where K: Keyable type Item = &'a Entry; fn next(&mut self) -> Option { - if let Some(it) = self.inner.as_mut() { + if let Some(e) = self.empty_iter.next() { + return Some(e); + } + + if let Some(it) = self.table0_iter.as_mut() { if let Some(e) = it.next() { return Some(e); } - self.inner = None; + self.table0_iter = None; } if let Some(it) = self.tail_iter.as_mut() { @@ -231,7 +248,8 @@ where K: Keyable } pub struct HashtableIterMut<'a, K, V> { - inner: Option>, Table0IterMut<'a, K, V>>>, + empty_iter: std::option::IterMut<'a, Entry>, + table0_iter: Option>, tail_iter: Option>, } @@ -241,11 +259,15 @@ where K: Keyable type Item = &'a mut Entry; fn next(&mut self) -> Option { - if let Some(it) = self.inner.as_mut() { + if let Some(e) = self.empty_iter.next() { + return Some(e); + } + + if let Some(it) = self.table0_iter.as_mut() { if let Some(e) = it.next() { return Some(e); } - self.inner = None; + self.table0_iter = None; } if let Some(it) = self.tail_iter.as_mut() { @@ -351,7 +373,8 @@ where fn iter(&self) -> Self::Iterator<'_> { let tail_iter = self.tails.as_ref().map(|tails| tails.iter()); HashtableIter { - inner: Some(self.zero.iter().chain(self.table.iter())), + empty_iter: self.zero.iter(), + table0_iter: Some(self.table.iter()), tail_iter, } } diff --git a/src/common/hashtable/src/simple_unsized_hashtable.rs b/src/common/hashtable/src/simple_unsized_hashtable.rs index 7a364409b0d4..2112b07fc587 100644 --- a/src/common/hashtable/src/simple_unsized_hashtable.rs +++ b/src/common/hashtable/src/simple_unsized_hashtable.rs @@ -129,12 +129,24 @@ where #[inline(always)] pub fn len(&self) -> usize { - self.table_empty.len() + self.table.len() + // AsRef it's cost + let tail_len = match &self.tails { + Some(tails) => tails.len(), + None => 0, + }; + + self.table_empty.len() + self.table.len() + tail_len } #[inline(always)] pub fn capacity(&self) -> usize { - self.table_empty.capacity() + self.table.capacity() + // AsRef it's cost + let tail_capacity = match &self.tails { + Some(tails) => tails.capacity(), + None => 0, + }; + + self.table_empty.capacity() + self.table.capacity() + tail_capacity } /// # Safety @@ -166,6 +178,7 @@ where }), _ => { if let Some(tails) = &mut self.tails { + self.key_size += key.len(); let key = FallbackKey::new(key); return Ok(SimpleUnsizedHashtableEntryMutRef( SimpleUnsizedHashtableEntryMutRefInner::Table(tails.insert(key)), @@ -568,6 +581,7 @@ where A: Allocator + Clone + Default _ => { if let Some(tails) = &mut self.tails { + self.key_size += key.len(); let s = self.arena.alloc_slice_copy(key); let key = FallbackKey::new(s); @@ -618,6 +632,16 @@ where A: Allocator + Clone + Default ) }), _ => { + if let Some(tails) = &mut self.tails { + self.key_size += key.len(); + let s = self.arena.alloc_slice_copy(key); + let key = FallbackKey::new_with_hash(s, hash); + + return Ok(SimpleUnsizedHashtableEntryMutRef( + SimpleUnsizedHashtableEntryMutRefInner::Table(tails.insert(key)), + )); + } + self.table.check_grow(); match self .table diff --git a/src/common/hashtable/src/tail_array.rs b/src/common/hashtable/src/tail_array.rs index 4767da98ab91..646a14a63350 100644 --- a/src/common/hashtable/src/tail_array.rs +++ b/src/common/hashtable/src/tail_array.rs @@ -16,14 +16,12 @@ use std::alloc::Allocator; use super::table0::Entry; use super::traits::Keyable; -use crate::container::Container; -use crate::container::StackContainer; -const SIZE: usize = 8192; +const SIZE: usize = 4096; pub struct TailArray { allocator: A, - pub(crate) datas: Vec, SIZE, A>>, + pub(crate) datas: Vec; SIZE], A>>, pub(crate) num_items: usize, } @@ -43,8 +41,10 @@ where pub fn insert(&mut self, key: K) -> &mut Entry { let pos = self.num_items % SIZE; if pos == 0 { - let container = unsafe { StackContainer::new_zeroed(SIZE, self.allocator.clone()) }; - self.datas.push(container); + let data = unsafe { + Box::<[Entry; SIZE], A>::new_zeroed_in(self.allocator.clone()).assume_init() + }; + self.datas.push(data); } let tail = self.datas.last_mut().unwrap(); @@ -56,7 +56,7 @@ where pub fn iter(&self) -> TailArrayIter<'_, K, V> { TailArrayIter { - values: self.datas.iter().map(|v| v.as_ref()).collect(), + values: self.datas.iter().map(|v| v.as_ref().as_ref()).collect(), num_items: self.num_items, i: 0, } @@ -65,11 +65,19 @@ where #[allow(dead_code)] pub fn iter_mut(&mut self) -> TailArrayIterMut<'_, K, V> { TailArrayIterMut { - values: self.datas.iter_mut().map(|v| v.as_mut()).collect(), + values: self.datas.iter_mut().map(|v| v.as_mut().as_mut()).collect(), num_items: self.num_items, i: 0, } } + + pub fn len(&self) -> usize { + self.num_items + } + + pub fn capacity(&self) -> usize { + self.datas.len() * SIZE + } } pub struct TailArrayIter<'a, K, V> { @@ -89,7 +97,7 @@ impl<'a, K, V> Iterator for TailArrayIter<'a, K, V> { let pos = self.i % SIZE; let v = self.values[array]; - let res = &v.as_ref()[pos]; + let res = &v[pos]; self.i += 1; Some(res) } diff --git a/src/common/hashtable/src/unsized_hashtable.rs b/src/common/hashtable/src/unsized_hashtable.rs index a26db2314ab9..0f2100eb95fd 100644 --- a/src/common/hashtable/src/unsized_hashtable.rs +++ b/src/common/hashtable/src/unsized_hashtable.rs @@ -139,20 +139,34 @@ where #[inline(always)] pub fn len(&self) -> usize { + // AsRef it's cost + let tail_len = match &self.tails { + Some(tails) => tails.len(), + None => 0, + }; + self.table0.len() + self.table1.len() + self.table2.len() + self.table3.len() + self.table4.len() + + tail_len } #[inline(always)] pub fn capacity(&self) -> usize { + // AsRef it's cost + let tail_capacity = match &self.tails { + Some(tails) => tails.capacity(), + None => 0, + }; + self.table0.capacity() + self.table1.capacity() + self.table2.capacity() + self.table3.capacity() + self.table4.capacity() + + tail_capacity } /// # Safety @@ -168,6 +182,7 @@ where if !key.is_empty() { if let Some(tails) = &mut self.tails { + self.key_size += key.len(); let key = FallbackKey::new(key); return Ok(UnsizedHashtableEntryMutRef( UnsizedHashtableEntryMutRefInner::Table4(tails.insert(key)), @@ -916,7 +931,9 @@ where A: Allocator + Clone + Default if !key.is_empty() { if let Some(tails) = &mut self.tails { - let key = FallbackKey::new(key); + self.key_size += key.len(); + let s = self.arena.alloc_slice_copy(key); + let key = FallbackKey::new(s); return Ok(UnsizedHashtableEntryMutRef( UnsizedHashtableEntryMutRefInner::Table4(tails.insert(key)), )); @@ -1037,7 +1054,9 @@ where A: Allocator + Clone + Default if !key.is_empty() { if let Some(tails) = &mut self.tails { - let key = FallbackKey::new(key); + self.key_size += key.len(); + let s = self.arena.alloc_slice_copy(key); + let key = FallbackKey::new(s); return Ok(UnsizedHashtableEntryMutRef( UnsizedHashtableEntryMutRefInner::Table4(tails.insert(key)), )); diff --git a/src/common/hashtable/tests/it/main.rs b/src/common/hashtable/tests/it/main.rs index 13c503acdafd..1949e92146a4 100644 --- a/src/common/hashtable/tests/it/main.rs +++ b/src/common/hashtable/tests/it/main.rs @@ -12,9 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; +use ahash::HashSet; +use ahash::HashSetExt; use common_hashtable::HashMap; use common_hashtable::HashtableLike; use common_hashtable::StackHashMap; @@ -138,3 +141,58 @@ fn test_unsized_hash_map() { drop(hashtable); assert_eq!(COUNT.load(Ordering::Relaxed), 0); } + +#[test] +fn test_tail_array() { + let mut table0 = HashMap::::new(); + let mut table1 = HashMap::::new(); + + unsafe { + for i in 0..100 { + match table0.insert_and_entry(i) { + Ok(x) => { + x.write(i); + } + Err(_) => {} + } + + match table1.insert_and_entry(i) { + Ok(x) => { + x.write(i); + } + Err(_) => {} + } + } + assert_eq!(table0.len(), table1.len()); + table0.enable_tail_array(); + + for i in 50..100 { + match table0.insert_and_entry(i) { + Ok(x) => { + x.write(i); + } + Err(_) => {} + } + + match table1.insert_and_entry(i) { + Ok(x) => { + x.write(i); + } + Err(_) => {} + } + } + assert_eq!(table0.len(), 150); + assert_eq!(table1.len(), 100); + + let mut set = HashSet::new(); + // all keys in table0 could be found in table1 + for key in table0.iter().map(|e| e.key()) { + set.insert(*key); + assert!(table1.get(key).is_some()); + } + assert_eq!(set.len(), 100); + } + + let hashtable = UnsizedHashMap::<[u8], u64>::new(); + let _: Box = Box::new(hashtable); +} diff --git a/src/query/expression/src/block.rs b/src/query/expression/src/block.rs index 4d9c1236137f..9acce95a6277 100644 --- a/src/query/expression/src/block.rs +++ b/src/query/expression/src/block.rs @@ -306,11 +306,6 @@ impl DataBlock { self.meta.as_ref() } - #[inline] - pub fn meta(&self) -> Result> { - Ok(self.meta.clone()) - } - pub fn from_arrow_chunk>( arrow_chunk: &ArrowChunk, schema: &DataSchema, diff --git a/src/query/service/src/api/rpc/exchange/exchange_sink_merge.rs b/src/query/service/src/api/rpc/exchange/exchange_sink_merge.rs index 2e2db195aae1..ab6b806aff45 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_sink_merge.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_sink_merge.rs @@ -103,7 +103,7 @@ impl Processor for ExchangeMergeSink { let mut meta = vec![]; meta.write_scalar_own(data_block.num_rows() as u32)?; - bincode::serialize_into(&mut meta, &data_block.meta()?) + bincode::serialize_into(&mut meta, &data_block.get_meta()) .map_err(|_| ErrorCode::BadBytes("block meta serialize error when exchange"))?; let chunks = data_block.try_into()?; diff --git a/src/query/service/src/api/rpc/exchange/exchange_sink_shuffle.rs b/src/query/service/src/api/rpc/exchange/exchange_sink_shuffle.rs index 0bad9a9a3e2a..5df6221c1b4c 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_sink_shuffle.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_sink_shuffle.rs @@ -124,7 +124,7 @@ impl Processor for ExchangePublisherSink { let mut meta = vec![]; meta.write_scalar_own(data_block.num_rows() as u32)?; - bincode::serialize_into(&mut meta, &data_block.meta()?) + bincode::serialize_into(&mut meta, &data_block.get_meta()) .map_err(|_| ErrorCode::BadBytes("block meta serialize error when exchange"))?; let chunks = data_block.try_into()?; diff --git a/src/query/service/src/api/rpc/exchange/exchange_transform.rs b/src/query/service/src/api/rpc/exchange/exchange_transform.rs index dc07e0d27e75..62d53014f023 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_transform.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_transform.rs @@ -232,7 +232,7 @@ impl Processor for ExchangeTransform { } else { let mut meta = vec![]; meta.write_scalar_own(data_block.num_rows() as u32)?; - bincode::serialize_into(&mut meta, &data_block.meta()?).map_err(|_| { + bincode::serialize_into(&mut meta, &data_block.get_meta()).map_err(|_| { ErrorCode::BadBytes("block meta serialize error when exchange") })?; diff --git a/src/query/service/src/api/rpc/flight_scatter_hash.rs b/src/query/service/src/api/rpc/flight_scatter_hash.rs index cba7c98c60ee..5cd2508bbf83 100644 --- a/src/query/service/src/api/rpc/flight_scatter_hash.rs +++ b/src/query/service/src/api/rpc/flight_scatter_hash.rs @@ -124,10 +124,10 @@ impl FlightScatter for OneHashKeyFlightScatter { let indices = get_hash_values(&indices, num)?; let data_blocks = DataBlock::scatter(data_block, &indices, self.scatter_size)?; - let block_meta = data_block.meta()?; + let block_meta = data_block.get_meta(); let mut res = Vec::with_capacity(data_blocks.len()); for data_block in data_blocks { - res.push(data_block.add_meta(block_meta.clone())?); + res.push(data_block.add_meta(block_meta.cloned())?); } Ok(res) @@ -150,12 +150,12 @@ impl FlightScatter for HashFlightScatter { Ok(vec![0; num]) }?; - let block_meta = data_block.meta()?; + let block_meta = data_block.get_meta(); let data_blocks = DataBlock::scatter(data_block, &indices, self.scatter_size)?; let mut res = Vec::with_capacity(data_blocks.len()); for data_block in data_blocks { - res.push(data_block.add_meta(block_meta.clone())?); + res.push(data_block.add_meta(block_meta.cloned())?); } Ok(res) diff --git a/src/query/service/src/api/rpc/packets/packet_data_precommit.rs b/src/query/service/src/api/rpc/packets/packet_data_precommit.rs index d718e2252b5d..2f95b13894d6 100644 --- a/src/query/service/src/api/rpc/packets/packet_data_precommit.rs +++ b/src/query/service/src/api/rpc/packets/packet_data_precommit.rs @@ -40,7 +40,7 @@ impl PrecommitBlock { pub fn write(self, bytes: &mut T) -> Result<()> { let data_block = self.0; - let serialized_meta = bincode::serialize(&data_block.meta()?).map_err_to_code( + let serialized_meta = bincode::serialize(&data_block.get_meta()).map_err_to_code( ErrorCode::BadBytes, || "precommit block serialize error when exchange", )?; diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index 392cafcf8b5b..81935ced04c3 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -51,6 +51,7 @@ use common_sql::plans::JoinType; use common_sql::ColumnBinding; use common_sql::IndexType; +use super::processors::transforms::group_by::ArenaHolder; use crate::pipelines::processors::transforms::efficiently_memory_final_aggregator; use crate::pipelines::processors::transforms::HashJoinDesc; use crate::pipelines::processors::transforms::RightSemiAntiJoinCompactor; @@ -84,6 +85,7 @@ pub struct PipelineBuilder { ctx: Arc, main_pipeline: Pipeline, pub pipelines: Vec, + pub arena_holder: Option, } impl PipelineBuilder { @@ -92,6 +94,7 @@ impl PipelineBuilder { ctx, pipelines: vec![], main_pipeline: Pipeline::create(), + arena_holder: None, } } @@ -323,16 +326,29 @@ impl PipelineBuilder { None, )?; + let pass_state_to_final = self.enable_memory_efficient_aggregator(¶ms); + + let arena_holder = ArenaHolder::create(); + self.arena_holder = Some(arena_holder.clone()); + self.main_pipeline.add_transform(|input, output| { TransformAggregator::try_create_partial( AggregatorTransformParams::try_create(input, output, ¶ms)?, self.ctx.clone(), + pass_state_to_final, + arena_holder.clone(), ) })?; Ok(()) } + fn enable_memory_efficient_aggregator(&self, params: &Arc) -> bool { + self.ctx.get_cluster().is_empty() + && !params.group_columns.is_empty() + && self.main_pipeline.output_len() > 1 + } + fn build_aggregate_final(&mut self, aggregate: &AggregateFinal) -> Result<()> { self.build_pipeline(&aggregate.input)?; @@ -344,11 +360,16 @@ impl PipelineBuilder { aggregate.limit, )?; - if self.ctx.get_cluster().is_empty() - && !params.group_columns.is_empty() - && self.main_pipeline.output_len() > 1 - { - return efficiently_memory_final_aggregator(params, &mut self.main_pipeline); + if self.enable_memory_efficient_aggregator(¶ms) { + let arena_holder = self + .arena_holder + .take() + .ok_or_else(|| ErrorCode::Internal("ArenaHolder is not initialized"))?; + return efficiently_memory_final_aggregator( + params, + arena_holder, + &mut self.main_pipeline, + ); } self.main_pipeline.resize(1)?; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_hashstate_info.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_hashstate_info.rs new file mode 100644 index 000000000000..0d43680539d1 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_hashstate_info.rs @@ -0,0 +1,68 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_expression::BlockMetaInfo; +use common_expression::BlockMetaInfoPtr; +use serde::Deserialize; +use serde::Deserializer; +use serde::Serialize; +use serde::Serializer; + +#[derive(Debug)] +pub struct AggregateHashStateInfo { + pub bucket: usize, + // a subhashtable state + pub hash_state: Box, +} + +impl AggregateHashStateInfo { + pub fn create(bucket: usize, hash_state: Box) -> BlockMetaInfoPtr { + Box::new(AggregateHashStateInfo { bucket, hash_state }) + } +} + +impl Serialize for AggregateHashStateInfo { + fn serialize(&self, _: S) -> Result + where S: Serializer { + unreachable!("AggregateHashStateInfo does not support exchanging between multiple nodes") + } +} + +impl<'de> Deserialize<'de> for AggregateHashStateInfo { + fn deserialize(_: D) -> Result + where D: Deserializer<'de> { + unreachable!("AggregateHashStateInfo does not support exchanging between multiple nodes") + } +} + +#[typetag::serde(name = "aggregate_info")] +impl BlockMetaInfo for AggregateHashStateInfo { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn Any { + self + } + + fn clone_self(&self) -> Box { + unimplemented!("Unimplemented clone for AggregateHashStateInfo") + } + + fn equals(&self, _: &Box) -> bool { + unimplemented!("Unimplemented equals for AggregateHashStateInfo") + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs index 6c2f55cfc467..e4a5b76fea40 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs @@ -32,6 +32,7 @@ use common_hashtable::HashtableLike; use tracing::info; use super::estimated_key_size; +use super::AggregateHashStateInfo; use crate::pipelines::processors::transforms::aggregator::aggregate_info::AggregateInfo; use crate::pipelines::processors::transforms::group_by::Area; use crate::pipelines::processors::transforms::group_by::GroupColumnsBuilder; @@ -150,8 +151,8 @@ where Method: HashMethod + PolymorphicKeysHelper + Send + 'static hash_table: Method::HashTable, pub(crate) reach_limit: bool, - // used for deserialization only, so we can reuse it during the loop - temp_place: Option, + // used for deserialization only if has agg, so we can reuse it during the loop + temp_place: StateAddr, } impl BucketAggregator @@ -161,8 +162,8 @@ where Method: HashMethod + PolymorphicKeysHelper + Send + 'static let mut area = Area::create(); let hash_table = method.create_hash_table()?; let temp_place = match params.aggregate_functions.is_empty() { - true => None, - false => Some(params.alloc_layout(&mut area)), + true => StateAddr::new(0), + false => params.alloc_layout(&mut area), }; Ok(Self { @@ -175,11 +176,67 @@ where Method: HashMethod + PolymorphicKeysHelper + Send + 'static }) } + fn merge_partial_hashstates(&mut self, hashtable: &mut Method::HashTable) -> Result<()> { + if !HAS_AGG { + unsafe { + for key in hashtable.iter() { + let _ = self.hash_table.insert_and_entry(key.key()); + } + if let Some(limit) = self.params.limit { + if self.hash_table.len() >= limit { + return Ok(()); + } + } + } + } else { + let aggregate_functions = &self.params.aggregate_functions; + let offsets_aggregate_states = &self.params.offsets_aggregate_states; + + for entry in hashtable.iter() { + let key = entry.key(); + unsafe { + match self.hash_table.insert(key) { + Ok(e) => { + // just set new places and the arena will be keeped in partial state + e.write(*entry.get()); + } + Err(place) => { + // place already exists + // that means we should merge the aggregation + let place = StateAddr::new(*place); + let old_place = StateAddr::new(*entry.get()); + + for (idx, aggregate_function) in aggregate_functions.iter().enumerate() + { + let final_place = place.next(offsets_aggregate_states[idx]); + let state_place = old_place.next(offsets_aggregate_states[idx]); + aggregate_function.merge(final_place, state_place)?; + aggregate_function.drop_state(state_place); + } + } + } + } + } + } + hashtable.clear(); + Ok(()) + } + pub fn merge_blocks(&mut self, blocks: Vec) -> Result> { if blocks.is_empty() { return Ok(vec![]); } - for data_block in blocks { + for mut data_block in blocks { + if let Some(mut meta) = data_block.take_meta() { + let info = meta + .as_mut_any() + .downcast_mut::() + .unwrap(); + let hashtable = info.hash_state.downcast_mut::().unwrap(); + self.merge_partial_hashstates(hashtable)?; + continue; + } + let block = data_block.convert_to_full(); // 1.1 and 1.2. let aggregate_function_len = self.params.aggregate_functions.len(); @@ -224,17 +281,15 @@ where Method: HashMethod + PolymorphicKeysHelper + Send + 'static let aggregate_functions = &self.params.aggregate_functions; let offsets_aggregate_states = &self.params.offsets_aggregate_states; - if let Some(temp_place) = self.temp_place { - for (row, place) in places.iter() { - for (idx, aggregate_function) in aggregate_functions.iter().enumerate() { - let final_place = place.next(offsets_aggregate_states[idx]); - let state_place = temp_place.next(offsets_aggregate_states[idx]); - - let mut data = - unsafe { states_binary_columns[idx].index_unchecked(*row) }; - aggregate_function.deserialize(state_place, &mut data)?; - aggregate_function.merge(final_place, state_place)?; - } + + for (row, place) in places.iter() { + for (idx, aggregate_function) in aggregate_functions.iter().enumerate() { + let final_place = place.next(offsets_aggregate_states[idx]); + let state_place = self.temp_place.next(offsets_aggregate_states[idx]); + + let mut data = unsafe { states_binary_columns[idx].index_unchecked(*row) }; + aggregate_function.deserialize(state_place, &mut data)?; + aggregate_function.merge(final_place, state_place)?; } } } @@ -374,9 +429,9 @@ where Method: HashMethod + PolymorphicKeysHelper + Send + 'static } } - if let Some(temp_place) = self.temp_place { + if HAS_AGG { for (state_offset, function) in state_offsets.iter().zip(functions.iter()) { - let place = temp_place.next(*state_offset); + let place = self.temp_place.next(*state_offset); unsafe { function.drop_state(place) } } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs index ee36418005db..e16e875bb1e7 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs @@ -28,6 +28,7 @@ use common_hashtable::HashtableLike; use super::estimated_key_size; use crate::pipelines::processors::transforms::group_by::Area; +use crate::pipelines::processors::transforms::group_by::ArenaHolder; use crate::pipelines::processors::transforms::group_by::KeysColumnBuilder; use crate::pipelines::processors::transforms::group_by::PolymorphicKeysHelper; use crate::pipelines::processors::transforms::transform_aggregator::Aggregator; @@ -45,12 +46,20 @@ where Method: HashMethod + PolymorphicKeysHelper pub generated: bool, pub should_expand_table: bool, pub input_rows: usize, + pub pass_state_to_final: bool, + pub arena_holder: ArenaHolder, + pub two_level_mode: bool, } impl + Send> PartialAggregator { - pub fn create(method: Method, params: Arc) -> Result { + pub fn create( + method: Method, + params: Arc, + pass_state_to_final: bool, + arena_holder: ArenaHolder, + ) -> Result { let hash_table = method.create_hash_table()?; Ok(Self { params, @@ -61,6 +70,9 @@ impl + S generated: false, should_expand_table: true, input_rows: 0, + pass_state_to_final, + arena_holder, + two_level_mode: false, }) } @@ -167,7 +179,6 @@ impl + S #[inline(always)] fn generate_data(&mut self) -> Result> { if self.generated || self.hash_table.len() == 0 { - self.drop_states(); return Ok(vec![]); } self.generated = true; @@ -245,12 +256,11 @@ impl + S let min_reduction = STREAMING_HT_MIN_REDUCTION[cache_level].1; if estimated_reduction <= min_reduction { - println!( - "YY: {:?} {:?} {}", + tracing::info!( + "HashTable expansion is disabled because the reduce factor is too low: estimated_reduction: {}, min_reduction: {}", estimated_reduction, - min_reduction, - estimated_reduction > min_reduction - ); + min_reduction + ) } estimated_reduction > min_reduction } @@ -332,9 +342,8 @@ impl> } } } - - self.hash_table.clear(); drop(self.area.take()); + self.hash_table.clear(); self.states_dropped = true; } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_twolevel.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_twolevel.rs index a8680ee9bd0d..f94dd02ce1a3 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_twolevel.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_twolevel.rs @@ -33,6 +33,7 @@ use tracing::info; use super::estimated_key_size; use crate::pipelines::processors::transforms::aggregator::aggregate_info::AggregateInfo; use crate::pipelines::processors::transforms::aggregator::aggregator_final_parallel::ParallelFinalAggregator; +use crate::pipelines::processors::transforms::aggregator::AggregateHashStateInfo; use crate::pipelines::processors::transforms::aggregator::PartialAggregator; use crate::pipelines::processors::transforms::aggregator::SingleStateAggregator; use crate::pipelines::processors::transforms::group_by::KeysColumnBuilder; @@ -116,6 +117,9 @@ where generated: false, should_expand_table: true, input_rows: self.input_rows, + pass_state_to_final: self.pass_state_to_final, + arena_holder: self.arena_holder.clone(), + two_level_mode: true, }, }) } @@ -157,6 +161,14 @@ where continue; } + if agg.pass_state_to_final { + let table = std::mem::replace(inner_table, agg.method.method.create_hash_table()?); + let rows = table.len(); + let meta = AggregateHashStateInfo::create(bucket, Box::new(table)); + let block = DataBlock::new_with_meta(vec![], rows, Some(meta)); + return Ok(vec![block]); + } + let capacity = inner_table.len(); let iterator = inner_table.iter(); @@ -215,8 +227,11 @@ where return Ok(data_blocks); } - drop(agg.area.take()); - agg.states_dropped = true; + if agg.pass_state_to_final { + agg.arena_holder.put_area(agg.area.take()); + agg.states_dropped = true; + } + Ok(data_blocks) } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs index 0a74b193b18d..9844a0022565 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod aggregate_hashstate_info; mod aggregate_info; mod aggregator_final; mod aggregator_final_parallel; @@ -21,6 +22,7 @@ mod aggregator_single_key; mod aggregator_twolevel; mod utils; +pub use aggregate_hashstate_info::AggregateHashStateInfo; pub use aggregate_info::AggregateInfo; pub use aggregate_info::OverflowInfo; pub use aggregator_final_parallel::BucketAggregator; diff --git a/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_keys_builder.rs b/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_keys_builder.rs index d2b7fc667bed..74c9077793d8 100644 --- a/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_keys_builder.rs +++ b/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_keys_builder.rs @@ -49,25 +49,25 @@ impl<'a, T: Number> KeysColumnBuilder for FixedKeysColumnBuilder<'a, T> { } } -pub struct SerializedKeysColumnBuilder<'a> { +pub struct StringKeysColumnBuilder<'a> { pub inner_builder: StringColumnBuilder, - initial: usize, + _initial: usize, _phantom: PhantomData<&'a ()>, } -impl<'a> SerializedKeysColumnBuilder<'a> { +impl<'a> StringKeysColumnBuilder<'a> { pub fn create(capacity: usize, value_capacity: usize) -> Self { - SerializedKeysColumnBuilder { + StringKeysColumnBuilder { inner_builder: StringColumnBuilder::with_capacity(capacity, value_capacity), _phantom: PhantomData, - initial: value_capacity, + _initial: value_capacity, } } } -impl<'a> KeysColumnBuilder for SerializedKeysColumnBuilder<'a> { +impl<'a> KeysColumnBuilder for StringKeysColumnBuilder<'a> { type T = &'a [u8]; fn append_value(&mut self, v: &'a [u8]) { @@ -76,7 +76,7 @@ impl<'a> KeysColumnBuilder for SerializedKeysColumnBuilder<'a> { } fn finish(self) -> Column { - debug_assert!(self.initial == self.inner_builder.data.len()); + debug_assert_eq!(self._initial, self.inner_builder.data.len()); Column::String(self.inner_builder.build()) } } diff --git a/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_polymorphic_keys.rs b/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_polymorphic_keys.rs index a4a18e774da7..b39652077005 100644 --- a/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_polymorphic_keys.rs +++ b/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_polymorphic_keys.rs @@ -45,7 +45,7 @@ use crate::pipelines::processors::transforms::group_by::aggregator_groups_builde use crate::pipelines::processors::transforms::group_by::aggregator_groups_builder::SerializedKeysGroupColumnsBuilder; use crate::pipelines::processors::transforms::group_by::aggregator_keys_builder::FixedKeysColumnBuilder; use crate::pipelines::processors::transforms::group_by::aggregator_keys_builder::KeysColumnBuilder; -use crate::pipelines::processors::transforms::group_by::aggregator_keys_builder::SerializedKeysColumnBuilder; +use crate::pipelines::processors::transforms::group_by::aggregator_keys_builder::StringKeysColumnBuilder; use crate::pipelines::processors::transforms::group_by::aggregator_keys_iter::FixedKeysColumnIter; use crate::pipelines::processors::transforms::group_by::aggregator_keys_iter::KeysColumnIter; use crate::pipelines::processors::transforms::group_by::aggregator_keys_iter::SerializedKeysColumnIter; @@ -66,7 +66,7 @@ use crate::pipelines::processors::AggregatorParams; // use common_expression::HashMethodSerializer; // use databend_query::pipelines::processors::transforms::group_by::PolymorphicKeysHelper; // use databend_query::pipelines::processors::transforms::group_by::aggregator_state::SerializedKeysAggregatorState; -// use databend_query::pipelines::processors::transforms::group_by::aggregator_keys_builder::SerializedKeysColumnBuilder; +// use databend_query::pipelines::processors::transforms::group_by::aggregator_keys_builder::StringKeysColumnBuilder; // // impl PolymorphicKeysHelper for HashMethodSerializer { // type State = SerializedKeysAggregatorState; @@ -78,9 +78,9 @@ use crate::pipelines::processors::AggregatorParams; // } // } // -// type ColumnBuilder = SerializedKeysColumnBuilder; +// type ColumnBuilder = StringKeysColumnBuilder; // fn state_array_builder(&self, capacity: usize) -> Self::ColumnBuilder { -// SerializedKeysColumnBuilder { +// StringKeysColumnBuilder { // inner_builder: MutableStringColumn::with_capacity(capacity), // } // } @@ -89,7 +89,7 @@ use crate::pipelines::processors::AggregatorParams; pub trait PolymorphicKeysHelper { const SUPPORT_TWO_LEVEL: bool; - type HashTable: HashtableLike + Send; + type HashTable: HashtableLike + Send + Sync + 'static; fn create_hash_table(&self) -> Result; type ColumnBuilder<'a>: KeysColumnBuilder @@ -402,13 +402,13 @@ impl PolymorphicKeysHelper for HashMethodSingleString { Ok(UnsizedHashMap::new()) } - type ColumnBuilder<'a> = SerializedKeysColumnBuilder<'a>; + type ColumnBuilder<'a> = StringKeysColumnBuilder<'a>; fn keys_column_builder( &self, capacity: usize, value_capacity: usize, - ) -> SerializedKeysColumnBuilder<'_> { - SerializedKeysColumnBuilder::create(capacity, value_capacity) + ) -> StringKeysColumnBuilder<'_> { + StringKeysColumnBuilder::create(capacity, value_capacity) } type KeysColumnIter = SerializedKeysColumnIter; @@ -442,13 +442,13 @@ impl PolymorphicKeysHelper for HashMethodSerializer { Ok(SimpleUnsizedHashMap::new()) } - type ColumnBuilder<'a> = SerializedKeysColumnBuilder<'a>; + type ColumnBuilder<'a> = StringKeysColumnBuilder<'a>; fn keys_column_builder( &self, capacity: usize, value_capacity: usize, - ) -> SerializedKeysColumnBuilder<'_> { - SerializedKeysColumnBuilder::create(capacity, value_capacity) + ) -> StringKeysColumnBuilder<'_> { + StringKeysColumnBuilder::create(capacity, value_capacity) } type KeysColumnIter = SerializedKeysColumnIter; @@ -475,7 +475,7 @@ impl PolymorphicKeysHelper for HashMethodSerializer { #[derive(Clone)] pub struct TwoLevelHashMethod { - method: Method, + pub(crate) method: Method, } impl TwoLevelHashMethod { diff --git a/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_state.rs b/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_state.rs index 95dfbb31ba10..dea3643c9c4a 100644 --- a/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_state.rs @@ -13,9 +13,12 @@ // limitations under the License. use std::alloc::Layout; +use std::collections::VecDeque; use std::ptr::NonNull; +use std::sync::Arc; use bumpalo::Bump; +use parking_lot::RwLock; pub struct Area { bump: Bump, @@ -32,3 +35,26 @@ impl Area { } unsafe impl Send for Area {} + +#[derive(Clone)] +pub struct ArenaHolder { + values: Arc>>, +} + +impl ArenaHolder { + pub fn create() -> ArenaHolder { + ArenaHolder { + values: Arc::new(RwLock::new(VecDeque::new())), + } + } + + pub fn put_area(&self, area: Option) { + if let Some(area) = area { + let mut values = self.values.write(); + values.push_back(area); + tracing::info!("Putting arena into holder, current size: {}", values.len()); + } + } +} + +unsafe impl Send for ArenaHolder {} diff --git a/src/query/service/src/pipelines/processors/transforms/group_by/mod.rs b/src/query/service/src/pipelines/processors/transforms/group_by/mod.rs index f839227d36ad..61984a0083a6 100644 --- a/src/query/service/src/pipelines/processors/transforms/group_by/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/group_by/mod.rs @@ -26,5 +26,6 @@ pub use aggregator_keys_iter::KeysColumnIter; pub use aggregator_polymorphic_keys::PolymorphicKeysHelper; pub use aggregator_polymorphic_keys::TwoLevelHashMethod; pub use aggregator_state::Area; +pub use aggregator_state::ArenaHolder; pub use aggregator_state_entity::StateEntityMutRef; pub use aggregator_state_entity::StateEntityRef; diff --git a/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs b/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs index 39b00d71452e..fdcbcec17405 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs @@ -20,6 +20,7 @@ use common_exception::ErrorCode; use common_exception::Result; use common_expression::*; +use super::group_by::ArenaHolder; use crate::pipelines::processors::port::InputPort; use crate::pipelines::processors::port::OutputPort; use crate::pipelines::processors::processor::Event; @@ -69,6 +70,8 @@ impl TransformAggregator { pub fn try_create_partial( transform_params: AggregatorTransformParams, ctx: Arc, + pass_state_to_final: bool, + arena_holder: ArenaHolder, ) -> Result { let aggregator_params = transform_params.aggregator_params.clone(); @@ -86,14 +89,24 @@ impl TransformAggregator { HashMethodKind::T(method) => AggregatorTransform::create( ctx, transform_params, - PartialAggregator::::create(method, aggregator_params)?, + PartialAggregator::::create( + method, + aggregator_params, + pass_state_to_final, + arena_holder + )?, ), }), false => with_mappedhash_method!(|T| match transform_params.method.clone() { HashMethodKind::T(method) => AggregatorTransform::create( ctx, transform_params, - PartialAggregator::::create(method, aggregator_params)?, + PartialAggregator::::create( + method, + aggregator_params, + pass_state_to_final, + arena_holder + )?, ), }), } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs b/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs index 68c55417526e..9a13ce3ca0a5 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs @@ -37,6 +37,8 @@ use serde::Deserializer; use serde::Serialize; use serde::Serializer; +use super::aggregator::AggregateHashStateInfo; +use super::group_by::ArenaHolder; use crate::pipelines::processors::transforms::aggregator::AggregateInfo; use crate::pipelines::processors::transforms::aggregator::BucketAggregator; use crate::pipelines::processors::transforms::group_by::KeysColumnIter; @@ -101,6 +103,8 @@ struct InputPortState { bucket: isize, } +/// A helper class that Map +/// AggregateInfo/AggregateHashStateInfo ---> ConvertGroupingMetaInfo { meta: blocks with Option } pub struct TransformConvertGrouping> { output: Arc, inputs: Vec, @@ -186,11 +190,10 @@ impl> TransformConvertGroupin } fn add_bucket(&mut self, data_block: DataBlock) -> isize { - let data_block_meta: Option<&AggregateInfo> = data_block + if let Some(info) = data_block .get_meta() - .and_then(|meta| meta.as_any().downcast_ref::()); - - if let Some(info) = data_block_meta { + .and_then(|meta| meta.as_any().downcast_ref::()) + { if info.overflow.is_none() && info.bucket > SINGLE_LEVEL_BUCKET_NUM { let bucket = info.bucket; match self.buckets_blocks.entry(bucket) { @@ -206,6 +209,23 @@ impl> TransformConvertGroupin } } + // check if it's local state + if let Some(info) = data_block + .get_meta() + .and_then(|meta| meta.as_any().downcast_ref::()) + { + let bucket = info.bucket as isize; + match self.buckets_blocks.entry(bucket) { + Entry::Vacant(v) => { + v.insert(vec![data_block]); + } + Entry::Occupied(mut v) => { + v.get_mut().push(data_block); + } + }; + return bucket; + } + self.unsplitted_blocks.push(data_block); SINGLE_LEVEL_BUCKET_NUM } @@ -389,6 +409,7 @@ fn build_convert_grouping + S method: Method, pipeline: &mut Pipeline, params: Arc, + arena_holder: ArenaHolder, ) -> Result<()> { let input_nums = pipeline.output_len(); let transform = TransformConvertGrouping::create(method.clone(), params.clone(), input_nums)?; @@ -405,12 +426,19 @@ fn build_convert_grouping + S pipeline.resize(input_nums)?; pipeline.add_transform(|input, output| { - MergeBucketTransform::try_create(input, output, method.clone(), params.clone()) + MergeBucketTransform::try_create( + input, + output, + method.clone(), + params.clone(), + arena_holder.clone(), + ) }) } pub fn efficiently_memory_final_aggregator( params: Arc, + arena_holder: ArenaHolder, pipeline: &mut Pipeline, ) -> Result<()> { let group_cols = ¶ms.group_columns; @@ -419,7 +447,7 @@ pub fn efficiently_memory_final_aggregator( let method = DataBlock::choose_hash_method(&sample_block, group_cols)?; with_hash_method!(|T| match method { - HashMethodKind::T(v) => build_convert_grouping(v, pipeline, params.clone()), + HashMethodKind::T(v) => build_convert_grouping(v, pipeline, params.clone(), arena_holder), }) } @@ -432,6 +460,9 @@ struct MergeBucketTransform + input_block: Option, output_blocks: Vec, + + // holds the state from partial group by + _arena_holder: ArenaHolder, } impl + Send + 'static> @@ -442,6 +473,7 @@ impl + Send + 'static> output: Arc, method: Method, params: Arc, + arena_holder: ArenaHolder, ) -> Result { Ok(ProcessorPtr::create(Box::new(MergeBucketTransform { input, @@ -450,6 +482,7 @@ impl + Send + 'static> params, input_block: None, output_blocks: vec![], + _arena_holder: arena_holder, }))) } } @@ -505,9 +538,9 @@ impl + Send + 'static> Proces fn process(&mut self) -> Result<()> { if let Some(mut data_block) = self.input_block.take() { let mut blocks = vec![]; - if let Some(meta) = data_block.take_meta() { - if let Some(meta) = meta.as_any().downcast_ref::() { - blocks.extend(meta.blocks.iter().cloned()); + if let Some(mut meta) = data_block.take_meta() { + if let Some(meta) = meta.as_mut_any().downcast_mut::() { + std::mem::swap(&mut blocks, &mut meta.blocks); } } diff --git a/src/query/storages/fuse/src/operations/operation_log.rs b/src/query/storages/fuse/src/operations/operation_log.rs index d5df741ebd2d..0fbd1323cf36 100644 --- a/src/query/storages/fuse/src/operations/operation_log.rs +++ b/src/query/storages/fuse/src/operations/operation_log.rs @@ -53,10 +53,10 @@ impl TryFrom<&DataBlock> for AppendOperationLogEntry { fn try_from(block: &DataBlock) -> Result { let err = ErrorCode::Internal(format!( "invalid data block meta of AppendOperation log, {:?}", - block.meta() + block.get_meta() )); - if let Some(meta) = block.meta()? { + if let Some(meta) = block.get_meta() { let cast = meta.as_any().downcast_ref::(); return match cast { None => Err(err), From ed533d5b2a46ca6924b1bf2cd901e5bf01def3ce Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Tue, 31 Jan 2023 16:36:07 +0800 Subject: [PATCH 3/9] remove tail array in hashtable --- src/common/hashtable/src/container.rs | 2 +- src/common/hashtable/src/hashtable.rs | 90 ++----------------- src/common/hashtable/src/lib.rs | 1 - .../hashtable/src/simple_unsized_hashtable.rs | 77 +--------------- src/common/hashtable/src/traits.rs | 6 -- .../hashtable/src/twolevel_hashtable.rs | 6 -- src/common/hashtable/src/unsized_hashtable.rs | 84 ----------------- src/common/hashtable/tests/it/main.rs | 58 ------------ .../aggregator/aggregator_partial.rs | 51 ----------- .../aggregator/aggregator_twolevel.rs | 3 - .../transforms/transform_aggregator.rs | 2 - 11 files changed, 12 insertions(+), 368 deletions(-) diff --git a/src/common/hashtable/src/container.rs b/src/common/hashtable/src/container.rs index 427b4d55b4d0..551b9a36a0fd 100644 --- a/src/common/hashtable/src/container.rs +++ b/src/common/hashtable/src/container.rs @@ -66,7 +66,7 @@ unsafe impl Container for HeapContainer { #[inline(always)] fn len(&self) -> usize { - self.0.len() + self.as_ref().len() } fn heap_bytes(&self) -> usize { diff --git a/src/common/hashtable/src/hashtable.rs b/src/common/hashtable/src/hashtable.rs index 07ac20d977af..b4594388e3c2 100644 --- a/src/common/hashtable/src/hashtable.rs +++ b/src/common/hashtable/src/hashtable.rs @@ -27,9 +27,6 @@ use super::table0::Table0IterMut; use super::traits::HashtableLike; use super::traits::Keyable; use super::utils::ZeroEntry; -use crate::tail_array::TailArray; -use crate::tail_array::TailArrayIter; -use crate::tail_array::TailArrayIterMut; use crate::FastHash; pub struct Hashtable> @@ -39,7 +36,6 @@ where { pub(crate) zero: ZeroEntry, pub(crate) table: Table0, A>, A>, - pub(crate) tails: Option>, } unsafe impl Send for Hashtable {} @@ -81,7 +77,6 @@ where Self { table: Table0::with_capacity_in(capacity, allocator), zero: ZeroEntry(None), - tails: None, } } #[inline(always)] @@ -90,22 +85,11 @@ where } #[inline(always)] pub fn len(&self) -> usize { - // AsRef it's cost - let tail_len = match &self.tails { - Some(tails) => tails.len(), - None => 0, - }; - - self.zero.is_some() as usize + self.table.len() + tail_len + self.zero.is_some() as usize + self.table.len() } #[inline(always)] pub fn capacity(&self) -> usize { - // AsRef it's cost - let tail_capacity = match &self.tails { - Some(tails) => tails.capacity(), - None => 0, - }; - self.zero.is_some() as usize + self.table.capacity() + tail_capacity + self.zero.is_some() as usize + self.table.capacity() } #[inline(always)] pub fn entry(&self, key: &K) -> Option<&Entry> { @@ -161,11 +145,6 @@ where return Ok(zero); } } - - if let Some(tails) = &mut self.tails { - return Ok(tails.insert(key)); - } - self.table.check_grow(); self.table.insert(key) } @@ -180,11 +159,8 @@ where } } pub fn iter(&self) -> HashtableIter<'_, K, V> { - let tail_iter = self.tails.as_ref().map(|tails| tails.iter()); HashtableIter { - empty_iter: self.zero.iter(), - table0_iter: Some(self.table.iter()), - tail_iter, + inner: self.zero.iter().chain(self.table.iter()), } } } @@ -215,9 +191,7 @@ where } pub struct HashtableIter<'a, K, V> { - pub empty_iter: std::option::Iter<'a, Entry>, - pub table0_iter: Option>, - pub tail_iter: Option>, + pub inner: std::iter::Chain>, Table0Iter<'a, K, V>>, } impl<'a, K, V> Iterator for HashtableIter<'a, K, V> @@ -226,31 +200,12 @@ where K: Keyable type Item = &'a Entry; fn next(&mut self) -> Option { - if let Some(e) = self.empty_iter.next() { - return Some(e); - } - - if let Some(it) = self.table0_iter.as_mut() { - if let Some(e) = it.next() { - return Some(e); - } - self.table0_iter = None; - } - - if let Some(it) = self.tail_iter.as_mut() { - if let Some(e) = it.next() { - return Some(e); - } - self.tail_iter = None; - } - None + self.inner.next() } } pub struct HashtableIterMut<'a, K, V> { - empty_iter: std::option::IterMut<'a, Entry>, - table0_iter: Option>, - tail_iter: Option>, + inner: std::iter::Chain>, Table0IterMut<'a, K, V>>, } impl<'a, K, V> Iterator for HashtableIterMut<'a, K, V> @@ -259,31 +214,14 @@ where K: Keyable type Item = &'a mut Entry; fn next(&mut self) -> Option { - if let Some(e) = self.empty_iter.next() { - return Some(e); - } - - if let Some(it) = self.table0_iter.as_mut() { - if let Some(e) = it.next() { - return Some(e); - } - self.table0_iter = None; - } - - if let Some(it) = self.tail_iter.as_mut() { - if let Some(e) = it.next() { - return Some(e); - } - self.tail_iter = None; - } - None + self.inner.next() } } impl HashtableLike for Hashtable where K: Keyable + FastHash, - A: Allocator + Default + Clone + 'static, + A: Allocator + Clone + 'static, { type Key = K; type Value = V; @@ -371,23 +309,13 @@ where } fn iter(&self) -> Self::Iterator<'_> { - let tail_iter = self.tails.as_ref().map(|tails| tails.iter()); HashtableIter { - empty_iter: self.zero.iter(), - table0_iter: Some(self.table.iter()), - tail_iter, + inner: self.zero.iter().chain(self.table.iter()), } } fn clear(&mut self) { self.zero.0.take(); self.table.clear(); - let _ = self.tails.take(); - } - - fn enable_tail_array(&mut self) { - if self.tails.is_none() { - self.tails = Some(TailArray::new(Default::default())); - } } } diff --git a/src/common/hashtable/src/lib.rs b/src/common/hashtable/src/lib.rs index cd0a12b28f2e..4eaec69a5c10 100644 --- a/src/common/hashtable/src/lib.rs +++ b/src/common/hashtable/src/lib.rs @@ -30,7 +30,6 @@ mod simple_unsized_hashtable; #[allow(dead_code)] mod table1; mod table_empty; -mod tail_array; mod traits; mod twolevel_hashtable; mod unsized_hashtable; diff --git a/src/common/hashtable/src/simple_unsized_hashtable.rs b/src/common/hashtable/src/simple_unsized_hashtable.rs index 2112b07fc587..3cb4be0c53ec 100644 --- a/src/common/hashtable/src/simple_unsized_hashtable.rs +++ b/src/common/hashtable/src/simple_unsized_hashtable.rs @@ -32,9 +32,6 @@ use crate::table0::Table0IterMut; use crate::table_empty::TableEmpty; use crate::table_empty::TableEmptyIter; use crate::table_empty::TableEmptyIterMut; -use crate::tail_array::TailArray; -use crate::tail_array::TailArrayIter; -use crate::tail_array::TailArrayIterMut; use crate::unsized_hashtable::FallbackKey; /// Simple unsized hashtable is used for storing unsized keys in arena. It can be worked with HashMethodSerializer. @@ -49,7 +46,6 @@ where pub(crate) key_size: usize, pub(crate) table_empty: TableEmpty, pub(crate) table: Table0, A>, A>, - pub(crate) tails: Option>, pub(crate) _phantom: PhantomData, } @@ -117,7 +113,6 @@ where key_size: 0, table_empty: TableEmpty::new_in(allocator.clone()), table: Table0::with_capacity_in(capacity, allocator), - tails: None, _phantom: PhantomData, } } @@ -129,24 +124,12 @@ where #[inline(always)] pub fn len(&self) -> usize { - // AsRef it's cost - let tail_len = match &self.tails { - Some(tails) => tails.len(), - None => 0, - }; - - self.table_empty.len() + self.table.len() + tail_len + self.table_empty.len() + self.table.len() } #[inline(always)] pub fn capacity(&self) -> usize { - // AsRef it's cost - let tail_capacity = match &self.tails { - Some(tails) => tails.capacity(), - None => 0, - }; - - self.table_empty.capacity() + self.table.capacity() + tail_capacity + self.table_empty.capacity() + self.table.capacity() } /// # Safety @@ -177,14 +160,6 @@ where ) }), _ => { - if let Some(tails) = &mut self.tails { - self.key_size += key.len(); - let key = FallbackKey::new(key); - return Ok(SimpleUnsizedHashtableEntryMutRef( - SimpleUnsizedHashtableEntryMutRefInner::Table(tails.insert(key)), - )); - } - self.table.check_grow(); self.table .insert(FallbackKey::new(key)) @@ -220,7 +195,6 @@ where K: UnsizedKeyable + ?Sized { it_empty: Option>, it: Option>, - tail_it: Option>, _phantom: PhantomData<&'a mut K>, } @@ -246,15 +220,6 @@ where K: UnsizedKeyable + ?Sized } self.it = None; } - - if let Some(it) = self.tail_it.as_mut() { - if let Some(e) = it.next() { - return Some(SimpleUnsizedHashtableEntryRef( - SimpleUnsizedHashtableEntryRefInner::Table(e), - )); - } - self.tail_it = None; - } None } } @@ -264,7 +229,6 @@ where K: UnsizedKeyable + ?Sized { it_empty: Option>, it: Option>, - tail_it: Option>, _phantom: PhantomData<&'a mut K>, } @@ -291,15 +255,6 @@ where K: UnsizedKeyable + ?Sized } self.it = None; } - - if let Some(it) = self.tail_it.as_mut() { - if let Some(e) = it.next() { - return Some(SimpleUnsizedHashtableEntryMutRef( - SimpleUnsizedHashtableEntryMutRefInner::Table(e), - )); - } - self.tail_it = None; - } None } } @@ -580,16 +535,6 @@ where A: Allocator + Clone + Default }), _ => { - if let Some(tails) = &mut self.tails { - self.key_size += key.len(); - let s = self.arena.alloc_slice_copy(key); - let key = FallbackKey::new(s); - - return Ok(SimpleUnsizedHashtableEntryMutRef( - SimpleUnsizedHashtableEntryMutRefInner::Table(tails.insert(key)), - )); - } - self.table.check_grow(); match self.table.insert(FallbackKey::new(key)) { Ok(e) => { @@ -632,16 +577,6 @@ where A: Allocator + Clone + Default ) }), _ => { - if let Some(tails) = &mut self.tails { - self.key_size += key.len(); - let s = self.arena.alloc_slice_copy(key); - let key = FallbackKey::new_with_hash(s, hash); - - return Ok(SimpleUnsizedHashtableEntryMutRef( - SimpleUnsizedHashtableEntryMutRefInner::Table(tails.insert(key)), - )); - } - self.table.check_grow(); match self .table @@ -669,21 +604,13 @@ where A: Allocator + Clone + Default SimpleUnsizedHashtableIter { it_empty: Some(self.table_empty.iter()), it: Some(self.table.iter()), - tail_it: self.tails.as_ref().map(|x| x.iter()), _phantom: PhantomData, } } - fn enable_tail_array(&mut self) { - if self.tails.is_none() { - self.tails = Some(TailArray::new(Default::default())); - } - } - fn clear(&mut self) { self.table_empty.clear(); self.table.clear(); - let _ = self.tails.take(); drop(std::mem::take(&mut self.arena)); } } diff --git a/src/common/hashtable/src/traits.rs b/src/common/hashtable/src/traits.rs index 3b7e27f679eb..8eb9a8d0802c 100644 --- a/src/common/hashtable/src/traits.rs +++ b/src/common/hashtable/src/traits.rs @@ -491,11 +491,5 @@ pub trait HashtableLike { fn iter(&self) -> Self::Iterator<'_>; - /// Note: don't call this function unless you want what you are doing. - /// This function will make hashtable not like `hashtable` but a linked list. - /// If it's enabled, we will push all new keys into the tail of the linked list. - /// This is used in partial aggregation - fn enable_tail_array(&mut self) {} - fn clear(&mut self); } diff --git a/src/common/hashtable/src/twolevel_hashtable.rs b/src/common/hashtable/src/twolevel_hashtable.rs index 94c34807b7bd..17fd52ca8400 100644 --- a/src/common/hashtable/src/twolevel_hashtable.rs +++ b/src/common/hashtable/src/twolevel_hashtable.rs @@ -165,12 +165,6 @@ impl> Hashtable inner_table.clear(); } } - - fn enable_tail_array(&mut self) { - for inner_table in &mut self.tables { - inner_table.enable_tail_array(); - } - } } pub struct TwoLevelHashtableIter { diff --git a/src/common/hashtable/src/unsized_hashtable.rs b/src/common/hashtable/src/unsized_hashtable.rs index 0f2100eb95fd..5fc79484d30c 100644 --- a/src/common/hashtable/src/unsized_hashtable.rs +++ b/src/common/hashtable/src/unsized_hashtable.rs @@ -37,9 +37,6 @@ use crate::table0::Table0IterMut; use crate::table_empty::TableEmpty; use crate::table_empty::TableEmptyIter; use crate::table_empty::TableEmptyIterMut; -use crate::tail_array::TailArray; -use crate::tail_array::TailArrayIter; -use crate::tail_array::TailArrayIterMut; pub struct UnsizedHashtable> where @@ -53,7 +50,6 @@ where pub(crate) table2: Table0, V, HeapContainer, V>, A>, A>, pub(crate) table3: Table0, V, HeapContainer, V>, A>, A>, pub(crate) table4: Table0, A>, A>, - pub(crate) tails: Option>, pub(crate) _phantom: PhantomData, } @@ -127,7 +123,6 @@ where table2: Table0::with_capacity_in(capacity, allocator.clone()), table3: Table0::with_capacity_in(capacity, allocator.clone()), table4: Table0::with_capacity_in(capacity, allocator), - tails: None, _phantom: PhantomData, } } @@ -139,34 +134,20 @@ where #[inline(always)] pub fn len(&self) -> usize { - // AsRef it's cost - let tail_len = match &self.tails { - Some(tails) => tails.len(), - None => 0, - }; - self.table0.len() + self.table1.len() + self.table2.len() + self.table3.len() + self.table4.len() - + tail_len } #[inline(always)] pub fn capacity(&self) -> usize { - // AsRef it's cost - let tail_capacity = match &self.tails { - Some(tails) => tails.capacity(), - None => 0, - }; - self.table0.capacity() + self.table1.capacity() + self.table2.capacity() + self.table3.capacity() + self.table4.capacity() - + tail_capacity } /// # Safety @@ -179,17 +160,6 @@ where key: *const K, ) -> Result, UnsizedHashtableEntryMutRef<'_, K, V>> { let key = (*key).as_bytes(); - - if !key.is_empty() { - if let Some(tails) = &mut self.tails { - self.key_size += key.len(); - let key = FallbackKey::new(key); - return Ok(UnsizedHashtableEntryMutRef( - UnsizedHashtableEntryMutRefInner::Table4(tails.insert(key)), - )); - } - } - match key.len() { _ if key.last().copied() == Some(0) => { self.table4.check_grow(); @@ -301,7 +271,6 @@ where K: UnsizedKeyable + ?Sized it_2: Option, V>>, it_3: Option, V>>, it_4: Option>, - tail_it: Option>, _phantom: PhantomData<&'a mut K>, } @@ -351,15 +320,6 @@ where K: UnsizedKeyable + ?Sized } self.it_4 = None; } - - if let Some(it) = self.tail_it.as_mut() { - if let Some(e) = it.next() { - return Some(UnsizedHashtableEntryRef( - UnsizedHashtableEntryRefInner::Table4(e), - )); - } - self.tail_it = None; - } None } } @@ -372,8 +332,6 @@ where K: UnsizedKeyable + ?Sized it_2: Option, V>>, it_3: Option, V>>, it_4: Option>, - - tail_it: Option>, _phantom: PhantomData<&'a mut K>, } @@ -423,15 +381,6 @@ where K: UnsizedKeyable + ?Sized } self.it_4 = None; } - - if let Some(it) = self.tail_it.as_mut() { - if let Some(e) = it.next() { - return Some(UnsizedHashtableEntryMutRef( - UnsizedHashtableEntryMutRefInner::Table4(e), - )); - } - self.tail_it = None; - } None } } @@ -928,18 +877,6 @@ where A: Allocator + Clone + Default key: &Self::Key, ) -> Result, Self::EntryMutRef<'_>> { let key = key.as_bytes(); - - if !key.is_empty() { - if let Some(tails) = &mut self.tails { - self.key_size += key.len(); - let s = self.arena.alloc_slice_copy(key); - let key = FallbackKey::new(s); - return Ok(UnsizedHashtableEntryMutRef( - UnsizedHashtableEntryMutRefInner::Table4(tails.insert(key)), - )); - } - } - match key.len() { _ if key.last().copied() == Some(0) => { self.table4.check_grow(); @@ -1051,18 +988,6 @@ where A: Allocator + Clone + Default hash: u64, ) -> Result, Self::EntryMutRef<'_>> { let key = key.as_bytes(); - - if !key.is_empty() { - if let Some(tails) = &mut self.tails { - self.key_size += key.len(); - let s = self.arena.alloc_slice_copy(key); - let key = FallbackKey::new(s); - return Ok(UnsizedHashtableEntryMutRef( - UnsizedHashtableEntryMutRefInner::Table4(tails.insert(key)), - )); - } - } - match key.len() { _ if key.last().copied() == Some(0) => { self.table4.check_grow(); @@ -1178,25 +1103,16 @@ where A: Allocator + Clone + Default it_2: Some(self.table2.iter()), it_3: Some(self.table3.iter()), it_4: Some(self.table4.iter()), - tail_it: self.tails.as_ref().map(|x| x.iter()), _phantom: PhantomData, } } - fn enable_tail_array(&mut self) { - if self.tails.is_none() { - self.tails = Some(TailArray::new(Default::default())); - } - } - fn clear(&mut self) { self.table0.clear(); self.table1.clear(); self.table2.clear(); self.table3.clear(); self.table4.clear(); - - let _ = self.tails.take(); // NOTE: Bump provides the reset function to free memory, but it will cause memory leakage(maybe a bug). // In fact, we don't need to call the drop function. rust will call it, But we call it to improve the readability of the code. drop(std::mem::take(&mut self.arena)); diff --git a/src/common/hashtable/tests/it/main.rs b/src/common/hashtable/tests/it/main.rs index 1949e92146a4..13c503acdafd 100644 --- a/src/common/hashtable/tests/it/main.rs +++ b/src/common/hashtable/tests/it/main.rs @@ -12,12 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::any::Any; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; -use ahash::HashSet; -use ahash::HashSetExt; use common_hashtable::HashMap; use common_hashtable::HashtableLike; use common_hashtable::StackHashMap; @@ -141,58 +138,3 @@ fn test_unsized_hash_map() { drop(hashtable); assert_eq!(COUNT.load(Ordering::Relaxed), 0); } - -#[test] -fn test_tail_array() { - let mut table0 = HashMap::::new(); - let mut table1 = HashMap::::new(); - - unsafe { - for i in 0..100 { - match table0.insert_and_entry(i) { - Ok(x) => { - x.write(i); - } - Err(_) => {} - } - - match table1.insert_and_entry(i) { - Ok(x) => { - x.write(i); - } - Err(_) => {} - } - } - assert_eq!(table0.len(), table1.len()); - table0.enable_tail_array(); - - for i in 50..100 { - match table0.insert_and_entry(i) { - Ok(x) => { - x.write(i); - } - Err(_) => {} - } - - match table1.insert_and_entry(i) { - Ok(x) => { - x.write(i); - } - Err(_) => {} - } - } - assert_eq!(table0.len(), 150); - assert_eq!(table1.len(), 100); - - let mut set = HashSet::new(); - // all keys in table0 could be found in table1 - for key in table0.iter().map(|e| e.key()) { - set.insert(*key); - assert!(table1.get(key).is_some()); - } - assert_eq!(set.len(), 100); - } - - let hashtable = UnsizedHashMap::<[u8], u64>::new(); - let _: Box = Box::new(hashtable); -} diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs index e16e875bb1e7..086a3534aa5c 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs @@ -44,7 +44,6 @@ where Method: HashMethod + PolymorphicKeysHelper pub hash_table: Method::HashTable, pub params: Arc, pub generated: bool, - pub should_expand_table: bool, pub input_rows: usize, pub pass_state_to_final: bool, pub arena_holder: ArenaHolder, @@ -68,7 +67,6 @@ impl + S area: Some(Area::create()), states_dropped: false, generated: false, - should_expand_table: true, input_rows: 0, pass_state_to_final, arena_holder, @@ -225,45 +223,6 @@ impl + S columns.push(group_key_col); Ok(vec![DataBlock::new_from_columns(columns)]) } - - fn should_expand_table(&self) -> bool { - /// ideas from https://github.com/apache/impala/blob/b3e9c4a65fa63da6f373c9ecec41fe4247e5e7d8/be/src/exec/grouping-aggregator.cc - static STREAMING_HT_MIN_REDUCTION: [(usize, f64); 3] = - [(0, 0.0), (256 * 1024, 1.1), (2 * 1024 * 1024, 2.0)]; - - let ht_mem = self.hash_table.bytes_len(); - let ht_rows = self.hash_table.len(); - - if ht_rows == 0 { - return true; - } - - let mut cache_level = 0; - loop { - if cache_level + 1 < STREAMING_HT_MIN_REDUCTION.len() - && ht_mem >= STREAMING_HT_MIN_REDUCTION[cache_level + 1].0 - { - cache_level += 1; - continue; - } - break; - } - - let aggregated_input_rows = self.input_rows; - let current_reduction = aggregated_input_rows as f64 / ht_rows as f64; - // TODO ADD estimated reduction, currently we use current reduction - let estimated_reduction = current_reduction; - let min_reduction = STREAMING_HT_MIN_REDUCTION[cache_level].1; - - if estimated_reduction <= min_reduction { - tracing::info!( - "HashTable expansion is disabled because the reduce factor is too low: estimated_reduction: {}, min_reduction: {}", - estimated_reduction, - min_reduction - ) - } - estimated_reduction > min_reduction - } } impl + Send> Aggregator @@ -300,16 +259,6 @@ impl + S fn generate(&mut self) -> Result> { self.generate_data() } - - fn check_expandsion(&mut self) { - if self.should_expand_table { - self.should_expand_table = self.should_expand_table(); - - if !self.should_expand_table { - self.hash_table.enable_tail_array(); - } - } - } } impl> diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_twolevel.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_twolevel.rs index f94dd02ce1a3..2e9cea89e03a 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_twolevel.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_twolevel.rs @@ -115,7 +115,6 @@ where method: two_level_method, hash_table: two_level_hashtable, generated: false, - should_expand_table: true, input_rows: self.input_rows, pass_state_to_final: self.pass_state_to_final, arena_holder: self.arena_holder.clone(), @@ -269,8 +268,6 @@ impl Aggregator for TwoLevelAggregator { #[inline(always)] fn consume(&mut self, data: DataBlock) -> Result<()> { - // only checked in two level aggregator for high cardinality group by - self.inner.check_expandsion(); self.inner.consume(data) } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs b/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs index fdcbcec17405..dd5b421f5b4a 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs @@ -116,8 +116,6 @@ impl TransformAggregator { pub trait Aggregator: Sized + Send { const NAME: &'static str; - fn check_expandsion(&mut self) {} - fn consume(&mut self, data: DataBlock) -> Result<()>; // Generate could be called multiple times util it returns empty. fn generate(&mut self) -> Result>; From 5e777a93347b842d34f35c182e24108a702b3c2c Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Tue, 31 Jan 2023 17:22:59 +0800 Subject: [PATCH 4/9] keep holder in meta --- .../service/src/pipelines/pipeline_builder.rs | 18 ++------------- .../aggregator/aggregate_hashstate_info.rs | 15 +++++++++++-- .../aggregator/aggregator_final_parallel.rs | 20 ++++++++++++----- .../aggregator/aggregator_partial.rs | 15 +++++++++---- .../aggregator/aggregator_twolevel.rs | 15 ++++++++----- .../transforms/group_by/aggregator_state.rs | 22 +++++++++---------- .../transforms/transform_aggregator.rs | 5 +---- .../transforms/transform_convert_grouping.rs | 19 +++------------- 8 files changed, 66 insertions(+), 63 deletions(-) diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index 81935ced04c3..ecb71a9e4168 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -51,7 +51,7 @@ use common_sql::plans::JoinType; use common_sql::ColumnBinding; use common_sql::IndexType; -use super::processors::transforms::group_by::ArenaHolder; + use crate::pipelines::processors::transforms::efficiently_memory_final_aggregator; use crate::pipelines::processors::transforms::HashJoinDesc; use crate::pipelines::processors::transforms::RightSemiAntiJoinCompactor; @@ -85,7 +85,6 @@ pub struct PipelineBuilder { ctx: Arc, main_pipeline: Pipeline, pub pipelines: Vec, - pub arena_holder: Option, } impl PipelineBuilder { @@ -94,7 +93,6 @@ impl PipelineBuilder { ctx, pipelines: vec![], main_pipeline: Pipeline::create(), - arena_holder: None, } } @@ -328,15 +326,11 @@ impl PipelineBuilder { let pass_state_to_final = self.enable_memory_efficient_aggregator(¶ms); - let arena_holder = ArenaHolder::create(); - self.arena_holder = Some(arena_holder.clone()); - self.main_pipeline.add_transform(|input, output| { TransformAggregator::try_create_partial( AggregatorTransformParams::try_create(input, output, ¶ms)?, self.ctx.clone(), pass_state_to_final, - arena_holder.clone(), ) })?; @@ -361,15 +355,7 @@ impl PipelineBuilder { )?; if self.enable_memory_efficient_aggregator(¶ms) { - let arena_holder = self - .arena_holder - .take() - .ok_or_else(|| ErrorCode::Internal("ArenaHolder is not initialized"))?; - return efficiently_memory_final_aggregator( - params, - arena_holder, - &mut self.main_pipeline, - ); + return efficiently_memory_final_aggregator(params, &mut self.main_pipeline); } self.main_pipeline.resize(1)?; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_hashstate_info.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_hashstate_info.rs index 0d43680539d1..0990afb73ee4 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_hashstate_info.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_hashstate_info.rs @@ -21,16 +21,27 @@ use serde::Deserializer; use serde::Serialize; use serde::Serializer; +use crate::pipelines::processors::transforms::group_by::ArenaHolder; + #[derive(Debug)] pub struct AggregateHashStateInfo { pub bucket: usize, // a subhashtable state pub hash_state: Box, + pub state_holder: Option, } impl AggregateHashStateInfo { - pub fn create(bucket: usize, hash_state: Box) -> BlockMetaInfoPtr { - Box::new(AggregateHashStateInfo { bucket, hash_state }) + pub fn create( + bucket: usize, + hash_state: Box, + state_holder: Option, + ) -> BlockMetaInfoPtr { + Box::new(AggregateHashStateInfo { + bucket, + hash_state, + state_holder, + }) } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs index e4a5b76fea40..5fe46e498dfa 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs @@ -35,6 +35,7 @@ use super::estimated_key_size; use super::AggregateHashStateInfo; use crate::pipelines::processors::transforms::aggregator::aggregate_info::AggregateInfo; use crate::pipelines::processors::transforms::group_by::Area; +use crate::pipelines::processors::transforms::group_by::ArenaHolder; use crate::pipelines::processors::transforms::group_by::GroupColumnsBuilder; use crate::pipelines::processors::transforms::group_by::KeysColumnIter; use crate::pipelines::processors::transforms::group_by::PolymorphicKeysHelper; @@ -149,6 +150,7 @@ where Method: HashMethod + PolymorphicKeysHelper + Send + 'static method: Method, params: Arc, hash_table: Method::HashTable, + state_holders: Vec>, pub(crate) reach_limit: bool, // used for deserialization only if has agg, so we can reuse it during the loop @@ -172,6 +174,7 @@ where Method: HashMethod + PolymorphicKeysHelper + Send + 'static params, hash_table, reach_limit: false, + state_holders: Vec::new(), temp_place, }) } @@ -226,6 +229,7 @@ where Method: HashMethod + PolymorphicKeysHelper + Send + 'static if blocks.is_empty() { return Ok(vec![]); } + for mut data_block in blocks { if let Some(mut meta) = data_block.take_meta() { let info = meta @@ -233,6 +237,7 @@ where Method: HashMethod + PolymorphicKeysHelper + Send + 'static .downcast_mut::() .unwrap(); let hashtable = info.hash_state.downcast_mut::().unwrap(); + self.state_holders.push(info.state_holder.take()); self.merge_partial_hashstates(hashtable)?; continue; } @@ -397,12 +402,8 @@ where Method: HashMethod + PolymorphicKeysHelper + Send + 'static places } -} -impl Drop for BucketAggregator -where Method: HashMethod + PolymorphicKeysHelper + Send + 'static -{ - fn drop(&mut self) { + fn drop_states(&mut self) { let aggregator_params = self.params.as_ref(); let aggregate_functions = &aggregator_params.aggregate_functions; let offsets_aggregate_states = &aggregator_params.offsets_aggregate_states; @@ -435,5 +436,14 @@ where Method: HashMethod + PolymorphicKeysHelper + Send + 'static unsafe { function.drop_state(place) } } } + self.state_holders.clear(); + } +} + +impl Drop for BucketAggregator +where Method: HashMethod + PolymorphicKeysHelper + Send + 'static +{ + fn drop(&mut self) { + self.drop_states(); } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs index 086a3534aa5c..b70207dacdea 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs @@ -40,13 +40,13 @@ where Method: HashMethod + PolymorphicKeysHelper pub states_dropped: bool, pub area: Option, + pub area_holder: Option, pub method: Method, pub hash_table: Method::HashTable, pub params: Arc, pub generated: bool, pub input_rows: usize, pub pass_state_to_final: bool, - pub arena_holder: ArenaHolder, pub two_level_mode: bool, } @@ -57,7 +57,6 @@ impl + S method: Method, params: Arc, pass_state_to_final: bool, - arena_holder: ArenaHolder, ) -> Result { let hash_table = method.create_hash_table()?; Ok(Self { @@ -65,11 +64,11 @@ impl + S method, hash_table, area: Some(Area::create()), + area_holder: None, states_dropped: false, generated: false, input_rows: 0, pass_state_to_final, - arena_holder, two_level_mode: false, }) } @@ -173,6 +172,13 @@ impl + S .map(|&index| block.get_by_offset(index)) .collect::>() } + + pub fn try_holder_state(&mut self) { + let area = self.area.take(); + if self.area.is_some() { + self.area_holder = Some(ArenaHolder::create(area)); + } + } #[inline(always)] fn generate_data(&mut self) -> Result> { @@ -263,7 +269,7 @@ impl + S impl> PartialAggregator -{ +{ pub fn drop_states(&mut self) { if !self.states_dropped { let aggregator_params = self.params.as_ref(); @@ -292,6 +298,7 @@ impl> } } drop(self.area.take()); + drop(self.area_holder.take()); self.hash_table.clear(); self.states_dropped = true; } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_twolevel.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_twolevel.rs index 2e9cea89e03a..d5e665346108 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_twolevel.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_twolevel.rs @@ -110,6 +110,7 @@ where Ok(TwoLevelAggregator:: { inner: PartialAggregator::> { area: self.area.take(), + area_holder: None, params: self.params.clone(), states_dropped: false, method: two_level_method, @@ -117,7 +118,6 @@ where generated: false, input_rows: self.input_rows, pass_state_to_final: self.pass_state_to_final, - arena_holder: self.arena_holder.clone(), two_level_mode: true, }, }) @@ -163,7 +163,12 @@ where if agg.pass_state_to_final { let table = std::mem::replace(inner_table, agg.method.method.create_hash_table()?); let rows = table.len(); - let meta = AggregateHashStateInfo::create(bucket, Box::new(table)); + agg.try_holder_state(); + let meta = AggregateHashStateInfo::create( + bucket, + Box::new(table), + agg.area_holder.clone(), + ); let block = DataBlock::new_with_meta(vec![], rows, Some(meta)); return Ok(vec![block]); } @@ -226,9 +231,9 @@ where return Ok(data_blocks); } - if agg.pass_state_to_final { - agg.arena_holder.put_area(agg.area.take()); - agg.states_dropped = true; + if !agg.pass_state_to_final { + drop(agg.area.take()); + drop(agg.area_holder.take()); } Ok(data_blocks) diff --git a/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_state.rs b/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_state.rs index dea3643c9c4a..a49468bd4854 100644 --- a/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_state.rs @@ -13,12 +13,12 @@ // limitations under the License. use std::alloc::Layout; -use std::collections::VecDeque; +use std::fmt::Debug; use std::ptr::NonNull; use std::sync::Arc; use bumpalo::Bump; -use parking_lot::RwLock; + pub struct Area { bump: Bump, @@ -38,23 +38,23 @@ unsafe impl Send for Area {} #[derive(Clone)] pub struct ArenaHolder { - values: Arc>>, + _data: Arc>, } impl ArenaHolder { - pub fn create() -> ArenaHolder { + pub fn create(area: Option) -> ArenaHolder { + tracing::info!("Putting one arena into holder"); ArenaHolder { - values: Arc::new(RwLock::new(VecDeque::new())), + _data: Arc::new(area), } } +} - pub fn put_area(&self, area: Option) { - if let Some(area) = area { - let mut values = self.values.write(); - values.push_back(area); - tracing::info!("Putting arena into holder, current size: {}", values.len()); - } +impl Debug for ArenaHolder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ArenaHolder").finish() } } unsafe impl Send for ArenaHolder {} +unsafe impl Sync for ArenaHolder {} diff --git a/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs b/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs index dd5b421f5b4a..caf6a31deb02 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs @@ -20,7 +20,7 @@ use common_exception::ErrorCode; use common_exception::Result; use common_expression::*; -use super::group_by::ArenaHolder; + use crate::pipelines::processors::port::InputPort; use crate::pipelines::processors::port::OutputPort; use crate::pipelines::processors::processor::Event; @@ -71,7 +71,6 @@ impl TransformAggregator { transform_params: AggregatorTransformParams, ctx: Arc, pass_state_to_final: bool, - arena_holder: ArenaHolder, ) -> Result { let aggregator_params = transform_params.aggregator_params.clone(); @@ -93,7 +92,6 @@ impl TransformAggregator { method, aggregator_params, pass_state_to_final, - arena_holder )?, ), }), @@ -105,7 +103,6 @@ impl TransformAggregator { method, aggregator_params, pass_state_to_final, - arena_holder )?, ), }), diff --git a/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs b/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs index 9a13ce3ca0a5..59001d5f5e4b 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs @@ -38,7 +38,7 @@ use serde::Serialize; use serde::Serializer; use super::aggregator::AggregateHashStateInfo; -use super::group_by::ArenaHolder; + use crate::pipelines::processors::transforms::aggregator::AggregateInfo; use crate::pipelines::processors::transforms::aggregator::BucketAggregator; use crate::pipelines::processors::transforms::group_by::KeysColumnIter; @@ -409,7 +409,6 @@ fn build_convert_grouping + S method: Method, pipeline: &mut Pipeline, params: Arc, - arena_holder: ArenaHolder, ) -> Result<()> { let input_nums = pipeline.output_len(); let transform = TransformConvertGrouping::create(method.clone(), params.clone(), input_nums)?; @@ -426,19 +425,12 @@ fn build_convert_grouping + S pipeline.resize(input_nums)?; pipeline.add_transform(|input, output| { - MergeBucketTransform::try_create( - input, - output, - method.clone(), - params.clone(), - arena_holder.clone(), - ) + MergeBucketTransform::try_create(input, output, method.clone(), params.clone()) }) } pub fn efficiently_memory_final_aggregator( params: Arc, - arena_holder: ArenaHolder, pipeline: &mut Pipeline, ) -> Result<()> { let group_cols = ¶ms.group_columns; @@ -447,7 +439,7 @@ pub fn efficiently_memory_final_aggregator( let method = DataBlock::choose_hash_method(&sample_block, group_cols)?; with_hash_method!(|T| match method { - HashMethodKind::T(v) => build_convert_grouping(v, pipeline, params.clone(), arena_holder), + HashMethodKind::T(v) => build_convert_grouping(v, pipeline, params.clone()), }) } @@ -460,9 +452,6 @@ struct MergeBucketTransform + input_block: Option, output_blocks: Vec, - - // holds the state from partial group by - _arena_holder: ArenaHolder, } impl + Send + 'static> @@ -473,7 +462,6 @@ impl + Send + 'static> output: Arc, method: Method, params: Arc, - arena_holder: ArenaHolder, ) -> Result { Ok(ProcessorPtr::create(Box::new(MergeBucketTransform { input, @@ -482,7 +470,6 @@ impl + Send + 'static> params, input_block: None, output_blocks: vec![], - _arena_holder: arena_holder, }))) } } From 55ed033030984ccecba6abb23db43001b1799a33 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Tue, 31 Jan 2023 17:44:29 +0800 Subject: [PATCH 5/9] keep holder in meta --- src/query/service/src/pipelines/pipeline_builder.rs | 1 - .../transforms/aggregator/aggregator_final_parallel.rs | 2 +- .../processors/transforms/aggregator/aggregator_partial.rs | 6 +++--- .../processors/transforms/group_by/aggregator_state.rs | 1 - .../pipelines/processors/transforms/transform_aggregator.rs | 1 - .../processors/transforms/transform_convert_grouping.rs | 1 - 6 files changed, 4 insertions(+), 8 deletions(-) diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index ecb71a9e4168..572eb8e792a6 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -51,7 +51,6 @@ use common_sql::plans::JoinType; use common_sql::ColumnBinding; use common_sql::IndexType; - use crate::pipelines::processors::transforms::efficiently_memory_final_aggregator; use crate::pipelines::processors::transforms::HashJoinDesc; use crate::pipelines::processors::transforms::RightSemiAntiJoinCompactor; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs index 5fe46e498dfa..c78c8871ae59 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs @@ -174,7 +174,7 @@ where Method: HashMethod + PolymorphicKeysHelper + Send + 'static params, hash_table, reach_limit: false, - state_holders: Vec::new(), + state_holders: Vec::with_capacity(16), temp_place, }) } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs index b70207dacdea..dac525bc50a4 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs @@ -172,10 +172,10 @@ impl + S .map(|&index| block.get_by_offset(index)) .collect::>() } - + pub fn try_holder_state(&mut self) { let area = self.area.take(); - if self.area.is_some() { + if area.is_some() { self.area_holder = Some(ArenaHolder::create(area)); } } @@ -269,7 +269,7 @@ impl + S impl> PartialAggregator -{ +{ pub fn drop_states(&mut self) { if !self.states_dropped { let aggregator_params = self.params.as_ref(); diff --git a/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_state.rs b/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_state.rs index a49468bd4854..cf94f65c3528 100644 --- a/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_state.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use bumpalo::Bump; - pub struct Area { bump: Bump, } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs b/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs index caf6a31deb02..319fee9df4d9 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs @@ -20,7 +20,6 @@ use common_exception::ErrorCode; use common_exception::Result; use common_expression::*; - use crate::pipelines::processors::port::InputPort; use crate::pipelines::processors::port::OutputPort; use crate::pipelines::processors::processor::Event; diff --git a/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs b/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs index 59001d5f5e4b..b7b653bf4522 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs @@ -38,7 +38,6 @@ use serde::Serialize; use serde::Serializer; use super::aggregator::AggregateHashStateInfo; - use crate::pipelines::processors::transforms::aggregator::AggregateInfo; use crate::pipelines::processors::transforms::aggregator::BucketAggregator; use crate::pipelines::processors::transforms::group_by::KeysColumnIter; From 0f59b80d2e93e24c4d8b10a0e75f58dc540bb0ee Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Tue, 31 Jan 2023 18:23:04 +0800 Subject: [PATCH 6/9] add unique serde info --- .../transforms/aggregator/aggregate_hashstate_info.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_hashstate_info.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_hashstate_info.rs index 0990afb73ee4..60f27e582610 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_hashstate_info.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_hashstate_info.rs @@ -59,7 +59,7 @@ impl<'de> Deserialize<'de> for AggregateHashStateInfo { } } -#[typetag::serde(name = "aggregate_info")] +#[typetag::serde(name = "aggregate_hash_state_info")] impl BlockMetaInfo for AggregateHashStateInfo { fn as_any(&self) -> &dyn Any { self From 1574e560917fded33c58b4ebc2730f2ca8e03f1c Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Tue, 31 Jan 2023 18:44:28 +0800 Subject: [PATCH 7/9] fix unwrap --- .../aggregator/aggregator_final_parallel.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs index c78c8871ae59..557854f6755f 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs @@ -232,14 +232,12 @@ where Method: HashMethod + PolymorphicKeysHelper + Send + 'static for mut data_block in blocks { if let Some(mut meta) = data_block.take_meta() { - let info = meta - .as_mut_any() - .downcast_mut::() - .unwrap(); - let hashtable = info.hash_state.downcast_mut::().unwrap(); - self.state_holders.push(info.state_holder.take()); - self.merge_partial_hashstates(hashtable)?; - continue; + if let Some(info) = meta.as_mut_any().downcast_mut::() { + let hashtable = info.hash_state.downcast_mut::().unwrap(); + self.state_holders.push(info.state_holder.take()); + self.merge_partial_hashstates(hashtable)?; + continue; + } } let block = data_block.convert_to_full(); From 20433b93f74ae977ee2c2808525687fb2f0fa494 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Tue, 31 Jan 2023 20:45:37 +0800 Subject: [PATCH 8/9] fix(query): remove tail array --- src/common/hashtable/src/tail_array.rs | 141 ------------------------- 1 file changed, 141 deletions(-) delete mode 100644 src/common/hashtable/src/tail_array.rs diff --git a/src/common/hashtable/src/tail_array.rs b/src/common/hashtable/src/tail_array.rs deleted file mode 100644 index 646a14a63350..000000000000 --- a/src/common/hashtable/src/tail_array.rs +++ /dev/null @@ -1,141 +0,0 @@ -// Copyright 2021 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::alloc::Allocator; - -use super::table0::Entry; -use super::traits::Keyable; - -const SIZE: usize = 4096; - -pub struct TailArray { - allocator: A, - pub(crate) datas: Vec; SIZE], A>>, - pub(crate) num_items: usize, -} - -impl TailArray -where - K: Keyable, - A: Allocator + Clone, -{ - pub fn new(allocator: A) -> Self { - Self { - datas: vec![], - num_items: 0, - allocator, - } - } - - pub fn insert(&mut self, key: K) -> &mut Entry { - let pos = self.num_items % SIZE; - if pos == 0 { - let data = unsafe { - Box::<[Entry; SIZE], A>::new_zeroed_in(self.allocator.clone()).assume_init() - }; - self.datas.push(data); - } - - let tail = self.datas.last_mut().unwrap(); - unsafe { tail[pos].set_key(key) }; - - self.num_items += 1; - &mut tail[pos] - } - - pub fn iter(&self) -> TailArrayIter<'_, K, V> { - TailArrayIter { - values: self.datas.iter().map(|v| v.as_ref().as_ref()).collect(), - num_items: self.num_items, - i: 0, - } - } - - #[allow(dead_code)] - pub fn iter_mut(&mut self) -> TailArrayIterMut<'_, K, V> { - TailArrayIterMut { - values: self.datas.iter_mut().map(|v| v.as_mut().as_mut()).collect(), - num_items: self.num_items, - i: 0, - } - } - - pub fn len(&self) -> usize { - self.num_items - } - - pub fn capacity(&self) -> usize { - self.datas.len() * SIZE - } -} - -pub struct TailArrayIter<'a, K, V> { - values: Vec<&'a [Entry]>, - num_items: usize, - i: usize, -} - -impl<'a, K, V> Iterator for TailArrayIter<'a, K, V> { - type Item = &'a Entry; - - fn next(&mut self) -> Option { - if self.i >= self.num_items { - None - } else { - let array = self.i / SIZE; - let pos = self.i % SIZE; - - let v = self.values[array]; - let res = &v[pos]; - self.i += 1; - Some(res) - } - } - - #[inline] - fn size_hint(&self) -> (usize, Option) { - (self.num_items - self.i, Some(self.num_items - self.i)) - } -} - -pub struct TailArrayIterMut<'a, K, V> { - values: Vec<&'a mut [Entry]>, - num_items: usize, - i: usize, -} - -impl<'a, K, V> Iterator for TailArrayIterMut<'a, K, V> -where Self: 'a -{ - type Item = &'a mut Entry where Self: 'a ; - - fn next(&mut self) -> Option { - if self.i >= self.num_items { - None - } else { - let array = self.i / SIZE; - let pos = self.i % SIZE; - - let v = &mut self.values[array]; - let res = unsafe { &mut *(v.as_ptr().add(pos) as *mut _) }; - self.i += 1; - Some(res) - } - } - - #[inline] - fn size_hint(&self) -> (usize, Option) { - (self.num_items - self.i, Some(self.num_items - self.i)) - } -} From b6a652e175dcb9a403e4a812aec4b1868808fd46 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Tue, 31 Jan 2023 20:46:42 +0800 Subject: [PATCH 9/9] fix(query): fix copyright --- .../transforms/aggregator/aggregate_hashstate_info.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_hashstate_info.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_hashstate_info.rs index 60f27e582610..fe438a1428ce 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_hashstate_info.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_hashstate_info.rs @@ -1,4 +1,4 @@ -// Copyright 2022 Datafuse Labs. +// Copyright 2023 Datafuse Labs. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License.