Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): enable hashtable state pass from partial to final #9809

Merged
merged 10 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions src/common/hashtable/src/tail_array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright 2021 Datafuse Labs.
sundy-li marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::alloc::Allocator;

use super::table0::Entry;
use super::traits::Keyable;

const SIZE: usize = 4096;

pub struct TailArray<K, V, A: Allocator> {
allocator: A,
pub(crate) datas: Vec<Box<[Entry<K, V>; SIZE], A>>,
pub(crate) num_items: usize,
}

impl<K, V, A> TailArray<K, V, A>
where
K: Keyable,
A: Allocator + Clone,
{
pub fn new(allocator: A) -> Self {
Self {
datas: vec![],
num_items: 0,
allocator,
}
}

pub fn insert(&mut self, key: K) -> &mut Entry<K, V> {
let pos = self.num_items % SIZE;
if pos == 0 {
let data = unsafe {
Box::<[Entry<K, V>; SIZE], A>::new_zeroed_in(self.allocator.clone()).assume_init()
};
self.datas.push(data);
}

let tail = self.datas.last_mut().unwrap();
unsafe { tail[pos].set_key(key) };

self.num_items += 1;
&mut tail[pos]
}

pub fn iter(&self) -> TailArrayIter<'_, K, V> {
TailArrayIter {
values: self.datas.iter().map(|v| v.as_ref().as_ref()).collect(),
num_items: self.num_items,
i: 0,
}
}

#[allow(dead_code)]
pub fn iter_mut(&mut self) -> TailArrayIterMut<'_, K, V> {
TailArrayIterMut {
values: self.datas.iter_mut().map(|v| v.as_mut().as_mut()).collect(),
num_items: self.num_items,
i: 0,
}
}

pub fn len(&self) -> usize {
self.num_items
}

pub fn capacity(&self) -> usize {
self.datas.len() * SIZE
}
}

pub struct TailArrayIter<'a, K, V> {
values: Vec<&'a [Entry<K, V>]>,
num_items: usize,
i: usize,
}

impl<'a, K, V> Iterator for TailArrayIter<'a, K, V> {
type Item = &'a Entry<K, V>;

fn next(&mut self) -> Option<Self::Item> {
if self.i >= self.num_items {
None
} else {
let array = self.i / SIZE;
let pos = self.i % SIZE;

let v = self.values[array];
let res = &v[pos];
self.i += 1;
Some(res)
}
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(self.num_items - self.i, Some(self.num_items - self.i))
}
}

pub struct TailArrayIterMut<'a, K, V> {
values: Vec<&'a mut [Entry<K, V>]>,
num_items: usize,
i: usize,
}

impl<'a, K, V> Iterator for TailArrayIterMut<'a, K, V>
where Self: 'a
{
type Item = &'a mut Entry<K, V> where Self: 'a ;

fn next(&mut self) -> Option<Self::Item> {
if self.i >= self.num_items {
None
} else {
let array = self.i / SIZE;
let pos = self.i % SIZE;

let v = &mut self.values[array];
let res = unsafe { &mut *(v.as_ptr().add(pos) as *mut _) };
self.i += 1;
Some(res)
}
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(self.num_items - self.i, Some(self.num_items - self.i))
}
}
5 changes: 0 additions & 5 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,11 +306,6 @@ impl DataBlock {
self.meta.as_ref()
}

#[inline]
pub fn meta(&self) -> Result<Option<BlockMetaInfoPtr>> {
Ok(self.meta.clone())
}

pub fn from_arrow_chunk<A: AsRef<dyn Array>>(
arrow_chunk: &ArrowChunk<A>,
schema: &DataSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl Processor for ExchangeMergeSink {

let mut meta = vec![];
meta.write_scalar_own(data_block.num_rows() as u32)?;
bincode::serialize_into(&mut meta, &data_block.meta()?)
bincode::serialize_into(&mut meta, &data_block.get_meta())
.map_err(|_| ErrorCode::BadBytes("block meta serialize error when exchange"))?;

let chunks = data_block.try_into()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl Processor for ExchangePublisherSink {

let mut meta = vec![];
meta.write_scalar_own(data_block.num_rows() as u32)?;
bincode::serialize_into(&mut meta, &data_block.meta()?)
bincode::serialize_into(&mut meta, &data_block.get_meta())
.map_err(|_| ErrorCode::BadBytes("block meta serialize error when exchange"))?;

let chunks = data_block.try_into()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ impl Processor for ExchangeTransform {
} else {
let mut meta = vec![];
meta.write_scalar_own(data_block.num_rows() as u32)?;
bincode::serialize_into(&mut meta, &data_block.meta()?).map_err(|_| {
bincode::serialize_into(&mut meta, &data_block.get_meta()).map_err(|_| {
ErrorCode::BadBytes("block meta serialize error when exchange")
})?;

Expand Down
8 changes: 4 additions & 4 deletions src/query/service/src/api/rpc/flight_scatter_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ impl FlightScatter for OneHashKeyFlightScatter {
let indices = get_hash_values(&indices, num)?;
let data_blocks = DataBlock::scatter(data_block, &indices, self.scatter_size)?;

let block_meta = data_block.meta()?;
let block_meta = data_block.get_meta();
let mut res = Vec::with_capacity(data_blocks.len());
for data_block in data_blocks {
res.push(data_block.add_meta(block_meta.clone())?);
res.push(data_block.add_meta(block_meta.cloned())?);
}

Ok(res)
Expand All @@ -150,12 +150,12 @@ impl FlightScatter for HashFlightScatter {
Ok(vec![0; num])
}?;

let block_meta = data_block.meta()?;
let block_meta = data_block.get_meta();
let data_blocks = DataBlock::scatter(data_block, &indices, self.scatter_size)?;

let mut res = Vec::with_capacity(data_blocks.len());
for data_block in data_blocks {
res.push(data_block.add_meta(block_meta.clone())?);
res.push(data_block.add_meta(block_meta.cloned())?);
}

Ok(res)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl PrecommitBlock {

pub fn write<T: Write>(self, bytes: &mut T) -> Result<()> {
let data_block = self.0;
let serialized_meta = bincode::serialize(&data_block.meta()?).map_err_to_code(
let serialized_meta = bincode::serialize(&data_block.get_meta()).map_err_to_code(
ErrorCode::BadBytes,
|| "precommit block serialize error when exchange",
)?;
Expand Down
14 changes: 10 additions & 4 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,16 +323,25 @@ impl PipelineBuilder {
None,
)?;

let pass_state_to_final = self.enable_memory_efficient_aggregator(&params);

self.main_pipeline.add_transform(|input, output| {
TransformAggregator::try_create_partial(
AggregatorTransformParams::try_create(input, output, &params)?,
self.ctx.clone(),
pass_state_to_final,
)
})?;

Ok(())
}

fn enable_memory_efficient_aggregator(&self, params: &Arc<AggregatorParams>) -> bool {
self.ctx.get_cluster().is_empty()
&& !params.group_columns.is_empty()
&& self.main_pipeline.output_len() > 1
}

fn build_aggregate_final(&mut self, aggregate: &AggregateFinal) -> Result<()> {
self.build_pipeline(&aggregate.input)?;

Expand All @@ -344,10 +353,7 @@ impl PipelineBuilder {
aggregate.limit,
)?;

if self.ctx.get_cluster().is_empty()
&& !params.group_columns.is_empty()
&& self.main_pipeline.output_len() > 1
{
if self.enable_memory_efficient_aggregator(&params) {
return efficiently_memory_final_aggregator(params, &mut self.main_pipeline);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;

use common_expression::BlockMetaInfo;
use common_expression::BlockMetaInfoPtr;
use serde::Deserialize;
use serde::Deserializer;
use serde::Serialize;
use serde::Serializer;

use crate::pipelines::processors::transforms::group_by::ArenaHolder;

#[derive(Debug)]
pub struct AggregateHashStateInfo {
pub bucket: usize,
// a subhashtable state
pub hash_state: Box<dyn Any + Send + Sync>,
pub state_holder: Option<ArenaHolder>,
}

impl AggregateHashStateInfo {
pub fn create(
bucket: usize,
hash_state: Box<dyn Any + Send + Sync>,
state_holder: Option<ArenaHolder>,
) -> BlockMetaInfoPtr {
Box::new(AggregateHashStateInfo {
bucket,
hash_state,
state_holder,
})
}
}

impl Serialize for AggregateHashStateInfo {
fn serialize<S>(&self, _: S) -> Result<S::Ok, S::Error>
where S: Serializer {
unreachable!("AggregateHashStateInfo does not support exchanging between multiple nodes")
}
}

impl<'de> Deserialize<'de> for AggregateHashStateInfo {
fn deserialize<D>(_: D) -> Result<Self, D::Error>
where D: Deserializer<'de> {
unreachable!("AggregateHashStateInfo does not support exchanging between multiple nodes")
}
}

#[typetag::serde(name = "aggregate_hash_state_info")]
impl BlockMetaInfo for AggregateHashStateInfo {
fn as_any(&self) -> &dyn Any {
self
}

fn as_mut_any(&mut self) -> &mut dyn Any {
self
}

fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
unimplemented!("Unimplemented clone for AggregateHashStateInfo")
}

fn equals(&self, _: &Box<dyn BlockMetaInfo>) -> bool {
unimplemented!("Unimplemented equals for AggregateHashStateInfo")
}
}
Loading