Skip to content

Commit

Permalink
fix(watermark): avoid panic in watermark derivation (close risingwave…
Browse files Browse the repository at this point in the history
  • Loading branch information
soundOfDestiny authored Mar 21, 2023
1 parent 870ba34 commit 694c446
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
12 changes: 12 additions & 0 deletions src/frontend/planner_test/tests/testdata/watermark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@
| └─StreamTableScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t2.ts) }
└─StreamTableScan { table: t2, columns: [t2.ts, t2.v1, t2.v2, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
- name: left semi hash join
sql: |
create table t1 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
create table t2 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
select t1.ts as t1_ts, t1.v1 as t1_v1, t1.v2 as t1_v2 from t1 where exists (select * from t2 where t1.ts = t2.ts);
stream_plan: |
StreamMaterialize { columns: [t1_ts, t1_v1, t1_v2, t1._row_id(hidden)], pk_columns: [t1._row_id, t1_ts], pk_conflict: "no check", watermark_columns: [t1_ts] }
└─StreamHashJoin { type: LeftSemi, predicate: t1.ts = t2.ts, output_watermarks: [t1.ts], output: all }
├─StreamExchange { dist: HashShard(t1.ts) }
| └─StreamTableScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t2.ts) }
└─StreamTableScan { table: t2, columns: [t2.ts, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
- name: union all
sql: |
create table t1 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
Expand Down
8 changes: 6 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,12 @@ impl StreamHashJoin {
if logical.left().watermark_columns().contains(left_key)
&& logical.right().watermark_columns().contains(right_key)
{
watermark_columns.insert(l2i.map(left_key));
watermark_columns.insert(r2i.map(right_key));
if let Some(internal) = l2i.try_map(left_key) {
watermark_columns.insert(internal);
}
if let Some(internal) = r2i.try_map(right_key) {
watermark_columns.insert(internal);
}
}
}
logical.i2o_col_mapping().rewrite_bitset(&watermark_columns)
Expand Down

0 comments on commit 694c446

Please sign in to comment.