Skip to content

Commit

Permalink
dry
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao committed Mar 13, 2023
1 parent fdf7cb3 commit c907a56
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 12 deletions.
5 changes: 1 addition & 4 deletions src/stream/src/executor/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,7 @@ where
}

fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option<Watermark> {
upstream_indices
.iter()
.position(|&idx| idx == watermark.col_idx)
.map(|idx| watermark.with_idx(idx))
watermark.transform_with_indices(upstream_indices)
}

fn mapping_message(msg: Message, upstream_indices: &[usize]) -> Option<Message> {
Expand Down
5 changes: 1 addition & 4 deletions src/stream/src/executor/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,7 @@ fn mapping_chunk(chunk: StreamChunk, upstream_indices: &[usize]) -> StreamChunk
}

fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option<Watermark> {
upstream_indices
.iter()
.position(|&idx| idx == watermark.col_idx)
.map(|idx| watermark.with_idx(idx))
watermark.transform_with_indices(upstream_indices)
}

impl ChainExecutor {
Expand Down
5 changes: 1 addition & 4 deletions src/stream/src/executor/rearranged_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ fn mapping(upstream_indices: &[usize], msg: Message) -> Option<Message> {
}

fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option<Watermark> {
upstream_indices
.iter()
.position(|&idx| idx == watermark.col_idx)
.map(|idx| watermark.with_idx(idx))
watermark.transform_with_indices(upstream_indices)
}

#[derive(Debug)]
Expand Down

0 comments on commit c907a56

Please sign in to comment.