Skip to content

Commit

Permalink
fix(streaming): map watermark in dispatcher with output indices (risi…
Browse files Browse the repository at this point in the history
…ngwavelabs#8506)

Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao authored Mar 13, 2023
1 parent cdaa8cf commit 85e450d
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 23 deletions.
5 changes: 1 addition & 4 deletions src/stream/src/executor/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,7 @@ where
}

fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option<Watermark> {
upstream_indices
.iter()
.position(|&idx| idx == watermark.col_idx)
.map(|idx| watermark.with_idx(idx))
watermark.transform_with_indices(upstream_indices)
}

fn mapping_message(msg: Message, upstream_indices: &[usize]) -> Option<Message> {
Expand Down
5 changes: 1 addition & 4 deletions src/stream/src/executor/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,7 @@ fn mapping_chunk(chunk: StreamChunk, upstream_indices: &[usize]) -> StreamChunk
}

fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option<Watermark> {
upstream_indices
.iter()
.position(|&idx| idx == watermark.col_idx)
.map(|idx| watermark.with_idx(idx))
watermark.transform_with_indices(upstream_indices)
}

impl ChainExecutor {
Expand Down
37 changes: 27 additions & 10 deletions src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,14 +446,20 @@ pub trait Dispatcher: Debug + 'static {
#[derive(Debug)]
pub struct RoundRobinDataDispatcher {
outputs: Vec<BoxedOutput>,
output_indices: Vec<usize>,
cur: usize,
dispatcher_id: DispatcherId,
}

impl RoundRobinDataDispatcher {
pub fn new(outputs: Vec<BoxedOutput>, dispatcher_id: DispatcherId) -> Self {
pub fn new(
outputs: Vec<BoxedOutput>,
output_indices: Vec<usize>,
dispatcher_id: DispatcherId,
) -> Self {
Self {
outputs,
output_indices,
cur: 0,
dispatcher_id,
}
Expand All @@ -465,6 +471,7 @@ impl Dispatcher for RoundRobinDataDispatcher {

fn dispatch_data(&mut self, chunk: StreamChunk) -> Self::DataFuture<'_> {
async move {
let chunk = chunk.reorder_columns(&self.output_indices);
self.outputs[self.cur].send(Message::Chunk(chunk)).await?;
self.cur += 1;
self.cur %= self.outputs.len();
Expand All @@ -484,9 +491,11 @@ impl Dispatcher for RoundRobinDataDispatcher {

fn dispatch_watermark(&mut self, watermark: Watermark) -> Self::WatermarkFuture<'_> {
async move {
// always broadcast watermark
for output in &mut self.outputs {
output.send(Message::Watermark(watermark.clone())).await?;
if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
// always broadcast watermark
for output in &mut self.outputs {
output.send(Message::Watermark(watermark.clone())).await?;
}
}
Ok(())
}
Expand Down Expand Up @@ -569,9 +578,11 @@ impl Dispatcher for HashDataDispatcher {

fn dispatch_watermark(&mut self, watermark: Watermark) -> Self::WatermarkFuture<'_> {
async move {
// always broadcast watermark
for output in &mut self.outputs {
output.send(Message::Watermark(watermark.clone())).await?;
if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
// always broadcast watermark
for output in &mut self.outputs {
output.send(Message::Watermark(watermark.clone())).await?;
}
}
Ok(())
}
Expand Down Expand Up @@ -751,8 +762,11 @@ impl Dispatcher for BroadcastDispatcher {

fn dispatch_watermark(&mut self, watermark: Watermark) -> Self::WatermarkFuture<'_> {
async move {
for output in self.outputs.values_mut() {
output.send(Message::Watermark(watermark.clone())).await?;
if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
// always broadcast watermark
for output in self.outputs.values_mut() {
output.send(Message::Watermark(watermark.clone())).await?;
}
}
Ok(())
}
Expand Down Expand Up @@ -851,7 +865,10 @@ impl Dispatcher for SimpleDispatcher {
.exactly_one()
.expect("expect exactly one output");

output.send(Message::Watermark(watermark)).await
if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
output.send(Message::Watermark(watermark)).await?;
}
Ok(())
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/stream/src/executor/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ async fn test_merger_sum_aggr() {
let dispatcher = DispatchExecutor::new(
receiver_op,
vec![DispatcherImpl::RoundRobin(RoundRobinDataDispatcher::new(
inputs, 0,
inputs,
vec![0],
0,
))],
0,
ctx,
Expand Down
9 changes: 9 additions & 0 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,15 @@ impl Watermark {
})
}

/// Transform the watermark with the given output indices. If this watermark is not in the
/// output, return `None`.
pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
output_indices
.iter()
.position(|p| *p == self.col_idx)
.map(|new_col_idx| self.with_idx(new_col_idx))
}

pub fn to_protobuf(&self) -> ProstWatermark {
ProstWatermark {
column: Some(ProstInputRef {
Expand Down
5 changes: 1 addition & 4 deletions src/stream/src/executor/rearranged_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ fn mapping(upstream_indices: &[usize], msg: Message) -> Option<Message> {
}

fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option<Watermark> {
upstream_indices
.iter()
.position(|&idx| idx == watermark.col_idx)
.map(|idx| watermark.with_idx(idx))
watermark.transform_with_indices(upstream_indices)
}

#[derive(Debug)]
Expand Down

0 comments on commit 85e450d

Please sign in to comment.