Skip to content

Commit

Permalink
feat(stream): handling watermark in temporal join (#12302)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Sep 14, 2023
1 parent 1ecea63 commit db0c099
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 10 deletions.
71 changes: 71 additions & 0 deletions e2e_test/streaming/temporal_join/temporal_join_watermark.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table stream(id1 int, a1 int, b1 int, v1 timestamp with time zone, watermark for v1 as v1 - INTERVAL '10' SECOND) APPEND ONLY;

# FIXME. If we don't insert at first, it would cause a panic when create eowc_mv.
statement ok
insert into stream values (1, 1, 1, '2023-09-14 06:00:00');

statement ok
create table version(id2 int, a2 int, b2 int, primary key (id2));

statement ok
create materialized view temporal_join_mv as select id1, a1, id2, v1 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2;

statement ok
create materialized view eowc_mv as select window_start, count(id1) from tumble(temporal_join_mv, v1, interval '5 s') group by window_start emit on window close;

query IIII rowsort
select * from temporal_join_mv;
----
1 1 NULL 2023-09-14 06:00:00+00:00

query IIII rowsort
select * from eowc_mv;
----

statement ok
insert into stream values (2, 2, 2, '2023-09-14 06:00:25');

sleep 5s

query IIII rowsort
select * from temporal_join_mv;
----
1 1 NULL 2023-09-14 06:00:00+00:00
2 2 NULL 2023-09-14 06:00:25+00:00


query IIII rowsort
select * from eowc_mv;
----
2023-09-14 06:00:00+00:00 1

statement ok
insert into stream values (3, 3, 3, '2023-09-14 06:00:45');

sleep 5s

query IIII rowsort
select * from temporal_join_mv;
----
1 1 NULL 2023-09-14 06:00:00+00:00
2 2 NULL 2023-09-14 06:00:25+00:00
3 3 NULL 2023-09-14 06:00:45+00:00

query IIII rowsort
select * from eowc_mv;
----
2023-09-14 06:00:00+00:00 1
2023-09-14 06:00:25+00:00 1

statement ok
drop table stream cascade;

statement ok
drop table version cascade;



48 changes: 38 additions & 10 deletions src/stream/src/executor/temporal_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ use crate::cache::{cache_may_stale, new_with_hasher_in, ManagedLruCache};
use crate::common::metrics::MetricsInfo;
use crate::common::JoinStreamChunkBuilder;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{ActorContextRef, BoxedExecutor, JoinType, JoinTypePrimitive, PkIndices};
use crate::executor::{
ActorContextRef, BoxedExecutor, JoinType, JoinTypePrimitive, PkIndices, Watermark,
};
use crate::task::AtomicU64Ref;

pub struct TemporalJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive> {
Expand Down Expand Up @@ -235,15 +237,16 @@ impl<K: HashKey, S: StateStore> TemporalSide<K, S> {
enum InternalMessage {
Chunk(StreamChunk),
Barrier(Vec<StreamChunk>, Barrier),
WaterMark(Watermark),
}

#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
pub async fn chunks_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
async fn chunks_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
#[for_await]
for item in stream {
match item? {
Message::Watermark(_) => {
// TODO: https://github.com/risingwavelabs/risingwave/issues/6042
// ignore
}
Message::Chunk(c) => yield c,
Message::Barrier(b) if b.epoch != expected_barrier.epoch => {
Expand All @@ -254,6 +257,23 @@ pub async fn chunks_until_barrier(stream: impl MessageStream, expected_barrier:
}
}

#[try_stream(ok = InternalMessage, error = StreamExecutorError)]
async fn internal_messages_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
#[for_await]
for item in stream {
match item? {
Message::Watermark(w) => {
yield InternalMessage::WaterMark(w);
}
Message::Chunk(c) => yield InternalMessage::Chunk(c),
Message::Barrier(b) if b.epoch != expected_barrier.epoch => {
return Err(StreamExecutorError::align_barrier(expected_barrier, b));
}
Message::Barrier(_) => return Ok(()),
}
}
}

// Align the left and right inputs according to their barriers,
// such that in the produced stream, an aligned interval starts with
// any number of `InternalMessage::Chunk(left_chunk)` and followed by
Expand Down Expand Up @@ -285,18 +305,20 @@ async fn align_input(left: Box<dyn Executor>, right: Box<dyn Executor>) {
}
Some(Either::Right(Ok(Message::Barrier(b)))) => {
#[for_await]
for chunk in chunks_until_barrier(left.by_ref(), b.clone()) {
yield InternalMessage::Chunk(chunk?);
for internal_message in
internal_messages_until_barrier(left.by_ref(), b.clone())
{
yield internal_message?;
}
yield InternalMessage::Barrier(right_chunks, b);
break 'inner;
}
Some(Either::Left(Err(e)) | Either::Right(Err(e))) => return Err(e),
Some(
Either::Left(Ok(Message::Watermark(_)))
| Either::Right(Ok(Message::Watermark(_))),
) => {
// TODO: https://github.com/risingwavelabs/risingwave/issues/6042
Some(Either::Left(Ok(Message::Watermark(w)))) => {
yield InternalMessage::WaterMark(w);
}
Some(Either::Right(Ok(Message::Watermark(_)))) => {
// ignore right side watermark
}
None => return Ok(()),
}
Expand Down Expand Up @@ -381,6 +403,8 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> TemporalJoinExecutor
self.right.schema().len(),
);

let left_to_output: HashMap<usize, usize> = HashMap::from_iter(left_map.iter().cloned());

let right_stream_key_indices = self.right.pk_indices().to_vec();

let null_matched = K::Bitmap::from_bool_vec(self.null_safe);
Expand All @@ -398,6 +422,10 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> TemporalJoinExecutor
.with_label_values(&[&table_id_str, &actor_id_str])
.set(self.right_table.cache.len() as i64);
match msg? {
InternalMessage::WaterMark(watermark) => {
let output_watermark_col_idx = *left_to_output.get(&watermark.col_idx).unwrap();
yield Message::Watermark(watermark.with_idx(output_watermark_col_idx));
}
InternalMessage::Chunk(chunk) => {
// Compact chunk, otherwise the following keys and chunk rows might fail to zip.
let chunk = chunk.compact();
Expand Down

0 comments on commit db0c099

Please sign in to comment.