Skip to content

Commit

Permalink
add some planner test
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page committed Mar 13, 2023
1 parent 3dc808a commit 7cc05b0
Showing 1 changed file with 74 additions and 0 deletions.
74 changes: 74 additions & 0 deletions src/frontend/planner_test/tests/testdata/temporal_join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@
| └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2) }
└─StreamTableScan { table: version, columns: [version.id2, version.a2], pk: [version.id2], dist: UpstreamHashShard(version.id2) }
- name: implicit join with temporal tables
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, a2 from stream, version FOR SYSTEM_TIME AS OF NOW() where id1 = id2 AND a2 < 10;
stream_plan: |
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], pk_columns: [stream._row_id, id2, id1], pk_conflict: "no check" }
└─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] }
├─StreamExchange { dist: HashShard(stream.id1) }
| └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2) }
└─StreamTableScan { table: version, columns: [version.id2, version.a2], pk: [version.id2], dist: UpstreamHashShard(version.id2) }
- name: Multi join key for temporal join
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
Expand Down Expand Up @@ -78,3 +90,65 @@
stream_error: |-
Not supported: exist dangling temporal scan
HINT: please check your temporal join syntax e.g. consider removing the right outer join if it is being used.
- name: multi-way temporal join with the same key
sql: |
create table stream(k int, a1 int, b1 int) APPEND ONLY;
create table version1(k int, x1 int, y2 int, primary key (k));
create table version2(k int, x2 int, y2 int, primary key (k));
select stream.k, x1, x2, a1, b1
from stream
join version1 FOR SYSTEM_TIME AS OF NOW() on stream.k = version1.k
join version2 FOR SYSTEM_TIME AS OF NOW() on stream.k = version2.k where a1 < 10;
stream_plan: |
StreamMaterialize { columns: [k, x1, x2, a1, b1, stream._row_id(hidden), version1.k(hidden), version2.k(hidden)], pk_columns: [stream._row_id, version1.k, k, version2.k], pk_conflict: "no check" }
└─StreamTemporalJoin { type: Inner, predicate: stream.k = version2.k, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version1.k, version2.k] }
├─StreamTemporalJoin { type: Inner, predicate: stream.k = version1.k, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] }
| ├─StreamExchange { dist: HashShard(stream.k) }
| | └─StreamFilter { predicate: (stream.a1 < 10:Int32) }
| | └─StreamTableScan { table: stream, columns: [stream.k, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
| └─StreamNoShuffleExchange { dist: UpstreamHashShard(version1.k) }
| └─StreamTableScan { table: version1, columns: [version1.k, version1.x1], pk: [version1.k], dist: UpstreamHashShard(version1.k) }
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version2.k) }
└─StreamTableScan { table: version2, columns: [version2.k, version2.x2], pk: [version2.k], dist: UpstreamHashShard(version2.k) }
- name: multi-way temporal join with different keys
sql: |
create table stream(id1 int, id2 int, a1 int, b1 int) APPEND ONLY;
create table version1(id1 int, x1 int, y2 int, primary key (id1));
create table version2(id2 int, x2 int, y2 int, primary key (id2));
select stream.id1, x1, stream.id2, x2, a1, b1
from stream
join version1 FOR SYSTEM_TIME AS OF NOW() on stream.id1 = version1.id1
join version2 FOR SYSTEM_TIME AS OF NOW() on stream.id2 = version2.id2 where a1 < 10;
stream_plan: |
StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version1.id1(hidden), version2.id2(hidden)], pk_columns: [stream._row_id, version1.id1, id1, version2.id2, id2], pk_conflict: "no check" }
└─StreamTemporalJoin { type: Inner, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version1.id1, version2.id2] }
├─StreamExchange { dist: HashShard(stream.id2) }
| └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] }
| ├─StreamExchange { dist: HashShard(stream.id1) }
| | └─StreamFilter { predicate: (stream.a1 < 10:Int32) }
| | └─StreamTableScan { table: stream, columns: [stream.id1, stream.id2, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
| └─StreamNoShuffleExchange { dist: UpstreamHashShard(version1.id1) }
| └─StreamTableScan { table: version1, columns: [version1.id1, version1.x1], pk: [version1.id1], dist: UpstreamHashShard(version1.id1) }
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version2.id2) }
└─StreamTableScan { table: version2, columns: [version2.id2, version2.x2], pk: [version2.id2], dist: UpstreamHashShard(version2.id2) }
- name: multi-way temporal join with different keys
sql: |
create table stream(id1 int, id2 int, a1 int, b1 int) APPEND ONLY;
create table version1(id1 int, x1 int, y2 int, primary key (id1));
create table version2(id2 int, x2 int, y2 int, primary key (id2));
select stream.id1, x1, stream.id2, x2, a1, b1
from stream
join version1 FOR SYSTEM_TIME AS OF NOW() on stream.id1 = version1.id1
join version2 FOR SYSTEM_TIME AS OF NOW() on stream.id2 = version2.id2 where a1 < 10;
stream_plan: |
StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version1.id1(hidden), version2.id2(hidden)], pk_columns: [stream._row_id, version1.id1, id1, version2.id2, id2], pk_conflict: "no check" }
└─StreamTemporalJoin { type: Inner, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version1.id1, version2.id2] }
├─StreamExchange { dist: HashShard(stream.id2) }
| └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] }
| ├─StreamExchange { dist: HashShard(stream.id1) }
| | └─StreamFilter { predicate: (stream.a1 < 10:Int32) }
| | └─StreamTableScan { table: stream, columns: [stream.id1, stream.id2, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
| └─StreamNoShuffleExchange { dist: UpstreamHashShard(version1.id1) }
| └─StreamTableScan { table: version1, columns: [version1.id1, version1.x1], pk: [version1.id1], dist: UpstreamHashShard(version1.id1) }
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version2.id2) }
└─StreamTableScan { table: version2, columns: [version2.id2, version2.x2], pk: [version2.id2], dist: UpstreamHashShard(version2.id2) }

0 comments on commit 7cc05b0

Please sign in to comment.