Skip to content

Commit

Permalink
diff
Browse files Browse the repository at this point in the history
  • Loading branch information
ice1000 committed Jun 15, 2023
1 parent a4c5dbe commit a6bf4d5
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
└─LogicalScan { table: t2, columns: [t2.v2] }
stream_plan: |
StreamMaterialize { columns: [v1, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: "NoCheck" }
└─StreamDynamicFilter { predicate: (t1.v1 > max(max(t2.v2))), output: [t1.v1, t1._row_id] }
└─StreamDynamicFilter { predicate: (t1.v1 > max(max(t2.v2))), output: ["t1.v1", "t1._row_id"] }
├─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [max(max(t2.v2))] }
Expand Down Expand Up @@ -57,7 +57,7 @@
stream_plan: |
StreamMaterialize { columns: [v1, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [t1.v1, t1._row_id] }
└─StreamDynamicFilter { predicate: ($expr1 > max(max(t2.v2))), output: [t1.v1, $expr1, t1._row_id] }
└─StreamDynamicFilter { predicate: ($expr1 > max(max(t2.v2))), output: ["t1.v1", "$expr1", "t1._row_id"] }
├─StreamProject { exprs: [t1.v1, (t1.v1 + t1.v1) as $expr1, t1._row_id] }
| └─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: Broadcast }
Expand Down Expand Up @@ -132,7 +132,7 @@
stream_plan: |
StreamMaterialize { columns: [v1, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [t1.v1, t1._row_id] }
└─StreamDynamicFilter { predicate: ($expr1 > max(max(t2.v2))), output: [t1.v1, $expr1, t1._row_id] }
└─StreamDynamicFilter { predicate: ($expr1 > max(max(t2.v2))), output: ["t1.v1", "$expr1", "t1._row_id"] }
├─StreamProject { exprs: [t1.v1, t1.v1::Int64 as $expr1, t1._row_id] }
| └─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: Broadcast }
Expand All @@ -149,7 +149,7 @@
with max_v2 as (select max(v2) max from t2) select v1 from t1 where exists (select * from max_v2 where v1 > max);
stream_plan: |
StreamMaterialize { columns: [v1, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: "NoCheck" }
└─StreamDynamicFilter { predicate: (t1.v1 > max(max(t2.v2))), output: [t1.v1, t1._row_id] }
└─StreamDynamicFilter { predicate: (t1.v1 > max(max(t2.v2))), output: ["t1.v1", "t1._row_id"] }
├─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [max(max(t2.v2))] }
Expand All @@ -171,7 +171,7 @@
└─LogicalScan { table: t2, columns: [t2.v2] }
stream_plan: |
StreamMaterialize { columns: [v1, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: "NoCheck" }
└─StreamDynamicFilter { predicate: (t1.v1 > $expr2), output: [t1.v1, t1._row_id] }
└─StreamDynamicFilter { predicate: (t1.v1 > $expr2), output: ["t1.v1", "t1._row_id"] }
├─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [(2:Int32 * max(max(t2.v2))) as $expr2] }
Expand Down
20 changes: 10 additions & 10 deletions src/frontend/planner_test/tests/testdata/output/expr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -510,10 +510,10 @@
└─LogicalScan { table: t, columns: [t.v1, t._row_id] }
stream_plan: |
StreamMaterialize { columns: [v1, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: "NoCheck", watermark_columns: [v1] }
└─StreamDynamicFilter { predicate: (t.v1 >= now), output_watermarks: [t.v1], output: [t.v1, t._row_id] }
└─StreamDynamicFilter { predicate: (t.v1 >= now), output_watermarks: [t.v1], output: ["t.v1", "t._row_id"] }
├─StreamTableScan { table: t, columns: [t.v1, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamNow { output: [now] }
└─StreamNow { output: ["now"] }
- name: now expression with proj
sql: |
create table t (v1 timestamp with time zone);
Expand All @@ -524,24 +524,24 @@
└─LogicalScan { table: t, columns: [t.v1, t._row_id] }
stream_plan: |
StreamMaterialize { columns: [v1, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: "NoCheck", watermark_columns: [v1] }
└─StreamDynamicFilter { predicate: (t.v1 >= $expr1), output_watermarks: [t.v1], output: [t.v1, t._row_id] }
└─StreamDynamicFilter { predicate: (t.v1 >= $expr1), output_watermarks: [t.v1], output: ["t.v1", "t._row_id"] }
├─StreamTableScan { table: t, columns: [t.v1, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [(AtTimeZone((AtTimeZone(now, 'UTC':Varchar) - '00:00:00':Interval), 'UTC':Varchar) - '00:00:02':Interval) as $expr1], output_watermarks: [$expr1] }
└─StreamNow { output: [now] }
└─StreamNow { output: ["now"] }
- name: and of two now expression condition
sql: |
create table t (v1 timestamp with time zone, v2 timestamp with time zone);
select * from t where v1 >= now() and v2 >= now();
stream_plan: |
StreamMaterialize { columns: [v1, v2, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: "NoCheck", watermark_columns: [v2] }
└─StreamDynamicFilter { predicate: (t.v2 >= now), output_watermarks: [t.v2], output: [t.v1, t.v2, t._row_id] }
├─StreamDynamicFilter { predicate: (t.v1 >= now), output_watermarks: [t.v1], output: [t.v1, t.v2, t._row_id] }
└─StreamDynamicFilter { predicate: (t.v2 >= now), output_watermarks: [t.v2], output: ["t.v1", "t.v2", "t._row_id"] }
├─StreamDynamicFilter { predicate: (t.v1 >= now), output_watermarks: [t.v1], output: ["t.v1", "t.v2", "t._row_id"] }
| ├─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
| └─StreamExchange { dist: Broadcast }
| └─StreamNow { output: [now] }
| └─StreamNow { output: ["now"] }
└─StreamExchange { dist: Broadcast }
└─StreamNow { output: [now] }
└─StreamNow { output: ["now"] }
- name: or of two now expression condition
sql: |
create table t (v1 timestamp with time zone, v2 timestamp with time zone);
Expand All @@ -554,13 +554,13 @@
stream_plan: |
StreamMaterialize { columns: [max_time, t.v2(hidden)], stream_key: [t.v2], pk_columns: [t.v2], pk_conflict: "NoCheck", watermark_columns: [max_time] }
└─StreamProject { exprs: [max(t.v1), t.v2], output_watermarks: [max(t.v1)] }
└─StreamDynamicFilter { predicate: (max(t.v1) >= now), output_watermarks: [max(t.v1)], output: [t.v2, max(t.v1)] }
└─StreamDynamicFilter { predicate: (max(t.v1) >= now), output_watermarks: [max(t.v1)], output: ["t.v2", "max(t.v1)"] }
├─StreamProject { exprs: [t.v2, max(t.v1)] }
| └─StreamHashAgg { group_key: [t.v2], aggs: [max(t.v1), count] }
| └─StreamExchange { dist: HashShard(t.v2) }
| └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamNow { output: [now] }
└─StreamNow { output: ["now"] }
- name: forbid now in group by for stream
sql: |
create table t (v1 timestamp with time zone, v2 int);
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/output/nexmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1517,7 +1517,7 @@
└─BatchScan { table: bid, columns: [bid.auction], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id, auction_item_name], pk_columns: [auction_id, auction_item_name], pk_conflict: "NoCheck" }
└─StreamDynamicFilter { predicate: (count(bid.auction) >= $expr1), output: [auction.id, auction.item_name, count(bid.auction)] }
└─StreamDynamicFilter { predicate: (count(bid.auction) >= $expr1), output: ["auction.id", "auction.item_name", "count(bid.auction)"] }
├─StreamProject { exprs: [auction.id, auction.item_name, count(bid.auction)] }
| └─StreamHashAgg { group_key: [auction.id, auction.item_name], aggs: [count(bid.auction), count] }
| └─StreamHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all }
Expand All @@ -1536,7 +1536,7 @@
Fragment 0
StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id, auction_item_name], pk_columns: [auction_id, auction_item_name], pk_conflict: "NoCheck" }
├── materialized table: 4294967294
└── StreamDynamicFilter { predicate: (count(bid.auction) >= $expr1), output: [auction.id, auction.item_name, count(bid.auction)] } { left table: 0, right table: 1 }
└── StreamDynamicFilter { predicate: (count(bid.auction) >= $expr1), output: ["auction.id", "auction.item_name", "count(bid.auction)"] } { left table: 0, right table: 1 }
├── StreamProject { exprs: [auction.id, auction.item_name, count(bid.auction)] }
│ └── StreamHashAgg { group_key: [auction.id, auction.item_name], aggs: [count(bid.auction), count] } { result table: 2, state tables: [], distinct tables: [] }
│ └── StreamHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all } { left table: 3, right table: 5, left degree table: 4, right degree table: 6 }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1488,7 +1488,7 @@
└─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"], filter: (None, None) }
stream_plan: |
StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id, auction_item_name], pk_columns: [auction_id, auction_item_name], pk_conflict: "NoCheck" }
└─StreamDynamicFilter { predicate: (count(auction) >= $expr1), output: [id, item_name, count(auction)] }
└─StreamDynamicFilter { predicate: (count(auction) >= $expr1), output: ["id", "item_name", "count(auction)"] }
├─StreamProject { exprs: [id, item_name, count(auction)] }
| └─StreamHashAgg [append_only] { group_key: [id, item_name], aggs: [count(auction), count] }
| └─StreamHashJoin [append_only] { type: Inner, predicate: id = auction, output: [id, item_name, auction, _row_id, _row_id] }
Expand All @@ -1514,7 +1514,7 @@
Fragment 0
StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id, auction_item_name], pk_columns: [auction_id, auction_item_name], pk_conflict: "NoCheck" }
├── materialized table: 4294967294
└── StreamDynamicFilter { predicate: (count(auction) >= $expr1), output: [id, item_name, count(auction)] } { left table: 0, right table: 1 }
└── StreamDynamicFilter { predicate: (count(auction) >= $expr1), output: ["id", "item_name", "count(auction)"] } { left table: 0, right table: 1 }
├── StreamProject { exprs: [id, item_name, count(auction)] }
│ └── StreamHashAgg [append_only] { group_key: [id, item_name], aggs: [count(auction), count] } { result table: 2, state tables: [], distinct tables: [] }
│ └── StreamHashJoin [append_only] { type: Inner, predicate: id = auction, output: [id, item_name, auction, _row_id, _row_id] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1796,7 +1796,7 @@
└─BatchSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"], filter: (None, None) }
stream_plan: |
StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id, auction_item_name], pk_columns: [auction_id, auction_item_name], pk_conflict: "NoCheck" }
└─StreamDynamicFilter { predicate: (count($expr4) >= $expr5), output: [$expr2, $expr3, count($expr4)] }
└─StreamDynamicFilter { predicate: (count($expr4) >= $expr5), output: ["$expr2", "$expr3", "count($expr4)"] }
├─StreamProject { exprs: [$expr2, $expr3, count($expr4)] }
| └─StreamHashAgg [append_only] { group_key: [$expr2, $expr3], aggs: [count($expr4), count] }
| └─StreamHashJoin [append_only] { type: Inner, predicate: $expr2 = $expr4, output: [$expr2, $expr3, $expr4, _row_id, _row_id] }
Expand Down Expand Up @@ -1841,7 +1841,7 @@
Fragment 0
StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id, auction_item_name], pk_columns: [auction_id, auction_item_name], pk_conflict: "NoCheck" }
├── materialized table: 4294967294
└── StreamDynamicFilter { predicate: (count($expr4) >= $expr5), output: [$expr2, $expr3, count($expr4)] } { left table: 0, right table: 1 }
└── StreamDynamicFilter { predicate: (count($expr4) >= $expr5), output: ["$expr2", "$expr3", "count($expr4)"] } { left table: 0, right table: 1 }
├── StreamProject { exprs: [$expr2, $expr3, count($expr4)] }
│ └── StreamHashAgg [append_only] { group_key: [$expr2, $expr3], aggs: [count($expr4), count] } { result table: 2, state tables: [], distinct tables: [] }
│ └── StreamHashJoin [append_only] { type: Inner, predicate: $expr2 = $expr4, output: [$expr2, $expr3, $expr4, _row_id, _row_id] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,11 @@
StreamMaterialize { columns: [v1, v2, t1._row_id(hidden), t2._row_id(hidden)], stream_key: [t1._row_id, t2._row_id, v1], pk_columns: [t1._row_id, t2._row_id, v1], pk_conflict: "NoCheck" }
└─StreamHashJoin { type: Inner, predicate: t1.v1 = t2.v2, output: [t1.v1, t2.v2, t1._row_id, t2._row_id] }
├─StreamExchange { dist: HashShard(t1.v1) }
| └─StreamDynamicFilter { predicate: (t1.v1 > $expr1), output_watermarks: [t1.v1], output: [t1.v1, t1._row_id] }
| └─StreamDynamicFilter { predicate: (t1.v1 > $expr1), output_watermarks: [t1.v1], output: ["t1.v1", "t1._row_id"] }
| ├─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
| └─StreamExchange { dist: Broadcast }
| └─StreamProject { exprs: [(AtTimeZone((AtTimeZone(now, 'UTC':Varchar) + '00:00:00':Interval), 'UTC':Varchar) + '01:00:00':Interval) as $expr1], output_watermarks: [$expr1] }
| └─StreamNow { output: [now] }
| └─StreamNow { output: ["now"] }
└─StreamExchange { dist: HashShard(t2.v2) }
└─StreamTableScan { table: t2, columns: [t2.v2, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
- name: now() in a complex cmp expr does not get pushed down
Expand Down Expand Up @@ -300,12 +300,12 @@
LogicalScan { table: t1, columns: [t1.v1, t1.v2], predicate: (t1.v1 > ('2021-04-01 00:00:00+00:00':Timestamptz + '00:30:00':Interval)) AND (t1.v2 > 5:Int32) }
stream_plan: |
StreamMaterialize { columns: [v1, v2, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: "NoCheck", watermark_columns: [v1] }
└─StreamDynamicFilter { predicate: (t1.v1 > $expr1), output_watermarks: [t1.v1], output: [t1.v1, t1.v2, t1._row_id] }
└─StreamDynamicFilter { predicate: (t1.v1 > $expr1), output_watermarks: [t1.v1], output: ["t1.v1", "t1.v2", "t1._row_id"] }
├─StreamFilter { predicate: (t1.v2 > 5:Int32) }
| └─StreamTableScan { table: t1, columns: [t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [(AtTimeZone((AtTimeZone(now, 'UTC':Varchar) + '00:00:00':Interval), 'UTC':Varchar) + '00:30:00':Interval) as $expr1], output_watermarks: [$expr1] }
└─StreamNow { output: [now] }
└─StreamNow { output: ["now"] }
- name: eq-predicate derived condition other side pushdown in inner join
sql: |
create table t1(v1 int, v2 int);
Expand Down Expand Up @@ -347,9 +347,9 @@
StreamMaterialize { columns: [v1, v2, t1._row_id(hidden), t2._row_id(hidden)], stream_key: [t1._row_id, t2._row_id, v1], pk_columns: [t1._row_id, t2._row_id, v1], pk_conflict: "NoCheck" }
└─StreamHashJoin { type: Inner, predicate: t1.v1 = t2.v2, output: [t1.v1, t2.v2, t1._row_id, t2._row_id] }
├─StreamExchange { dist: HashShard(t1.v1) }
| └─StreamDynamicFilter { predicate: (t1.v1 > now), output_watermarks: [t1.v1], output: [t1.v1, t1._row_id] }
| └─StreamDynamicFilter { predicate: (t1.v1 > now), output_watermarks: [t1.v1], output: ["t1.v1", "t1._row_id"] }
| ├─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
| └─StreamExchange { dist: Broadcast }
| └─StreamNow { output: [now] }
| └─StreamNow { output: ["now"] }
└─StreamExchange { dist: HashShard(t2.v2) }
└─StreamTableScan { table: t2, columns: [t2.v2, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
Loading

0 comments on commit a6bf4d5

Please sign in to comment.