Skip to content

Commit

Permalink
refactor(stream sort): use new SortBuffer in SortExecutor (rising…
Browse files Browse the repository at this point in the history
…wavelabs#8891)

Signed-off-by: Richard Chien <stdrc@outlook.com>
  • Loading branch information
stdrc authored Apr 6, 2023
1 parent efe56ec commit 8f8c191
Show file tree
Hide file tree
Showing 6 changed files with 783 additions and 383 deletions.
38 changes: 18 additions & 20 deletions src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use futures_async_stream::try_stream;
use iter_chunks::IterChunks;
use itertools::Itertools;
use risingwave_common::array::stream_record::Record;
use risingwave_common::array::StreamChunk;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::Schema;
use risingwave_common::hash::{HashKey, PrecomputedBuildHasher};
Expand Down Expand Up @@ -128,9 +128,11 @@ impl<K: HashKey, S: StateStore> ExecutorInner<K, S> {
}
}

trait Emitter: Default {
trait Emitter {
type StateStore: StateStore;

fn new(result_table: &StateTable<Self::StateStore>) -> Self;

fn emit_from_changes<'a>(
&'a mut self,
chunk_builder: &'a mut ChunkBuilder,
Expand All @@ -154,6 +156,12 @@ struct EmitOnUpdates<S: StateStore> {
impl<S: StateStore> Emitter for EmitOnUpdates<S> {
type StateStore = S;

fn new(_result_table: &StateTable<Self::StateStore>) -> Self {
Self {
_phantom: PhantomData,
}
}

#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
async fn emit_from_changes<'a>(
&'a mut self,
Expand Down Expand Up @@ -182,21 +190,19 @@ impl<S: StateStore> Emitter for EmitOnUpdates<S> {
}
}

impl<S: StateStore> Default for EmitOnUpdates<S> {
fn default() -> Self {
Self {
_phantom: PhantomData,
}
}
}

struct EmitOnWindowClose<S: StateStore> {
buffer: SortBuffer<S>,
}

impl<S: StateStore> Emitter for EmitOnWindowClose<S> {
type StateStore = S;

fn new(result_table: &StateTable<Self::StateStore>) -> Self {
Self {
buffer: SortBuffer::new(0, result_table),
}
}

#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
async fn emit_from_changes<'a>(
&'a mut self,
Expand All @@ -222,22 +228,14 @@ impl<S: StateStore> Emitter for EmitOnWindowClose<S> {
#[for_await]
for row in self.buffer.consume(watermark.clone(), result_table) {
let row = row?;
if let Some(chunk) = chunk_builder.append_record(Record::Insert { new_row: row }) {
if let Some(chunk) = chunk_builder.append_row(Op::Insert, row) {
yield chunk;
}
}
}
}
}

impl<S: StateStore> Default for EmitOnWindowClose<S> {
fn default() -> Self {
Self {
buffer: SortBuffer::new(),
}
}
}

struct ExecutionVars<K: HashKey, S: StateStore, E: Emitter<StateStore = S>> {
stats: ExecutionStats,

Expand Down Expand Up @@ -640,7 +638,7 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
buffered_watermarks: vec![None; this.group_key_indices.len()],
window_watermark: None,
chunk_builder: ChunkBuilder::new(this.chunk_size, &this.info.schema.data_types()),
chunk_emitter: E::default(),
chunk_emitter: E::new(&this.result_table),
};

// TODO(rc): use something like a `ColumnMapping` type
Expand Down
3 changes: 2 additions & 1 deletion src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ mod sink;
mod sort;
mod sort_buffer;
mod sort_buffer_v0;
mod sort_v0;
pub mod source;
mod stream_reader;
pub mod subtask;
Expand Down Expand Up @@ -130,7 +131,7 @@ pub use rearranged_chain::RearrangedChainExecutor;
pub use receiver::ReceiverExecutor;
use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
pub use sink::SinkExecutor;
pub use sort::SortExecutor;
pub use sort::*;
pub use source::*;
pub use temporal_join::*;
pub use top_n::{
Expand Down
Loading

0 comments on commit 8f8c191

Please sign in to comment.