diff --git a/src/frontend/planner_test/tests/testdata/watermark.yaml b/src/frontend/planner_test/tests/testdata/watermark.yaml index 0d15aefde3636..f94fa39dddd30 100644 --- a/src/frontend/planner_test/tests/testdata/watermark.yaml +++ b/src/frontend/planner_test/tests/testdata/watermark.yaml @@ -52,26 +52,26 @@ └─StreamAppendOnlyHashAgg { group_key: [t.ts, t.v1], aggs: [count(t.v2), count], output_watermarks: [t.ts] } └─StreamExchange { dist: HashShard(t.ts, t.v1) } └─StreamTableScan { table: t, columns: [t.ts, t.v1, t.v2, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } -- name: hash join +- name: inner window join sql: | create table t1 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only; create table t2 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only; select t1.ts as t1_ts, t2.ts as ts2, t1.v1 as t1_v1, t1.v2 as t1_v2, t2.v1 as t2_v1, t2.v2 as t2_v2 from t1, t2 where t1.ts = t2.ts; stream_plan: | StreamMaterialize { columns: [t1_ts, ts2, t1_v1, t1_v2, t2_v1, t2_v2, t1._row_id(hidden), t2._row_id(hidden)], stream_key: [t1._row_id, t2._row_id, t1_ts], pk_columns: [t1._row_id, t2._row_id, t1_ts], pk_conflict: "NoCheck", watermark_columns: [t1_ts, ts2] } - └─StreamAppendOnlyHashJoin { type: Inner, predicate: t1.ts = t2.ts, output_watermarks: [t1.ts, t2.ts], output: [t1.ts, t2.ts, t1.v1, t1.v2, t2.v1, t2.v2, t1._row_id, t2._row_id] } + └─StreamWindowJoin { type: Inner, predicate: t1.ts = t2.ts, output_watermarks: [t1.ts, t2.ts], output: [t1.ts, t2.ts, t1.v1, t1.v2, t2.v1, t2.v2, t1._row_id, t2._row_id] } ├─StreamExchange { dist: HashShard(t1.ts) } | └─StreamTableScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } └─StreamExchange { dist: HashShard(t2.ts) } └─StreamTableScan { table: t2, columns: [t2.ts, t2.v1, t2.v2, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) } -- name: left semi hash join +- name: left semi window join sql: | create table t1 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only; create table t2 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only; select t1.ts as t1_ts, t1.v1 as t1_v1, t1.v2 as t1_v2 from t1 where exists (select * from t2 where t1.ts = t2.ts); stream_plan: | StreamMaterialize { columns: [t1_ts, t1_v1, t1_v2, t1._row_id(hidden)], stream_key: [t1._row_id, t1_ts], pk_columns: [t1._row_id, t1_ts], pk_conflict: "NoCheck", watermark_columns: [t1_ts] } - └─StreamHashJoin { type: LeftSemi, predicate: t1.ts = t2.ts, output_watermarks: [t1.ts], output: all } + └─StreamWindowJoin { type: LeftSemi, predicate: t1.ts = t2.ts, output_watermarks: [t1.ts], output: all } ├─StreamExchange { dist: HashShard(t1.ts) } | └─StreamTableScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } └─StreamExchange { dist: HashShard(t2.ts) } diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs index 866d9fe8e87b1..db191c34cee77 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs @@ -283,7 +283,18 @@ impl StreamHashJoin { impl fmt::Display for StreamHashJoin { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut builder = if self.clean_left_state_conjunction_idx.is_some() + let (ljk, rjk) = self + .eq_join_predicate + .eq_indexes() + .first() + .cloned() + .expect("first join key"); + + let mut builder = if self.left().watermark_columns().contains(ljk) + && self.right().watermark_columns().contains(rjk) + { + f.debug_struct("StreamWindowJoin") + } else if self.clean_left_state_conjunction_idx.is_some() && self.clean_right_state_conjunction_idx.is_some() { f.debug_struct("StreamIntervalJoin")