Skip to content

Commit

Permalink
feat(streaming): support delta join on primary table (risingwavelabs#…
Browse files Browse the repository at this point in the history
…7662)

- Support delta join on primary table, because primary table is also an index as well.

Approved-By: st1page
  • Loading branch information
chenzl25 authored Feb 2, 2023
1 parent e987106 commit 477e30d
Show file tree
Hide file tree
Showing 10 changed files with 308 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ statement ok
set streaming_parallelism = 0;

statement ok
create materialized view v as select * from a join b on a.a1 = b.b1 ;
create materialized view v as select * from a join b on a.a1 = b.b1;

query IIII rowsort
select * from v order by a1, a2, b1, b2;
Expand All @@ -50,3 +50,6 @@ drop table a;

statement ok
drop table b;

statement ok
set rw_streaming_enable_delta_join = false;
47 changes: 47 additions & 0 deletions e2e_test/streaming/delta_join/delta_join_snapshot_no_index.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
statement ok
set rw_implicit_flush = true;

statement ok
set rw_streaming_enable_delta_join = true;

statement ok
set streaming_parallelism = 2;

statement ok
create table a (a1 int primary key, a2 int);

statement ok
set streaming_parallelism = 3;

statement ok
create table b (b1 int primary key, b2 int);

statement ok
insert into A values (1,2), (11, 22);

statement ok
insert into B values (1,4), (11, 44);

statement ok
set streaming_parallelism = 0;

statement ok
create materialized view v as select * from a join b on a.a1 = b.b1;

query IIII rowsort
select * from v order by a1, a2, b1, b2;
----
1 2 1 4
11 22 11 44

statement ok
drop materialized view v;

statement ok
drop table a;

statement ok
drop table b;

statement ok
set rw_streaming_enable_delta_join = false;
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ statement ok
set streaming_parallelism = 0;

statement ok
create materialized view v as select * from a join b on a.a1 = b.b1 ;
create materialized view v as select * from a join b on a.a1 = b.b1;

statement ok
insert into A values (1,2), (1,3);
Expand All @@ -50,3 +50,6 @@ drop table a;

statement ok
drop table b;

statement ok
set rw_streaming_enable_delta_join = false;
47 changes: 47 additions & 0 deletions e2e_test/streaming/delta_join/delta_join_upstream_no_index.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
statement ok
set rw_implicit_flush = true;

statement ok
set rw_streaming_enable_delta_join = true;

statement ok
set streaming_parallelism = 2;

statement ok
create table a (a1 int primary key, a2 int);

statement ok
set streaming_parallelism = 3;

statement ok
create table b (b1 int primary key, b2 int);

statement ok
set streaming_parallelism = 0;

statement ok
create materialized view v as select * from a join b on a.a1 = b.b1;

statement ok
insert into A values (1,2), (11, 22);

statement ok
insert into B values (1,4), (11, 44);

query IIII rowsort
select * from v order by a1, a2, b1, b2;
----
1 2 1 4
11 22 11 44

statement ok
drop materialized view v;

statement ok
drop table a;

statement ok
drop table b;

statement ok
set rw_streaming_enable_delta_join = false;
29 changes: 27 additions & 2 deletions src/frontend/planner_test/tests/testdata/delta_join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,30 @@
StreamMaterialize { columns: [a1, a2, b1, b2, i_a1.a._row_id(hidden), i_b1.b._row_id(hidden)], pk_columns: [i_a1.a._row_id, i_b1.b._row_id, a1, b1] }
└─StreamExchange { dist: HashShard(i_a1.a1, i_b1.b1, i_a1.a._row_id, i_b1.b._row_id) }
└─StreamDeltaJoin { type: Inner, predicate: i_a1.a1 = i_b1.b1, output: [i_a1.a1, i_a1.a2, i_b1.b1, i_b1.b2, i_a1.a._row_id, i_b1.b._row_id] }
├─StreamIndexScan { index: "i_a1", columns: [i_a1.a1, i_a1.a2, i_a1.a._row_id], pk: [i_a1.a._row_id], dist: UpstreamHashShard(i_a1.a1) }
└─StreamIndexScan { index: "i_b1", columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) }
├─StreamIndexScan { index: i_a1, columns: [i_a1.a1, i_a1.a2, i_a1.a._row_id], pk: [i_a1.a._row_id], dist: UpstreamHashShard(i_a1.a1) }
└─StreamIndexScan { index: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) }
- sql: |
set rw_streaming_enable_delta_join = true;
create table a (a1 int primary key, a2 int);
create table b (b1 int, b2 int);
create index i_b1 on b(b1);
/* should generate delta join plan, and stream index scan */
select * from a join b on a.a1 = b.b1 ;
stream_plan: |
StreamMaterialize { columns: [a1, a2, b1, b2, i_b1.b._row_id(hidden)], pk_columns: [a1, i_b1.b._row_id, b1] }
└─StreamExchange { dist: HashShard(a.a1, i_b1.b1, i_b1.b._row_id) }
└─StreamDeltaJoin { type: Inner, predicate: a.a1 = i_b1.b1, output: all }
├─StreamTableScan { table: a, columns: [a.a1, a.a2], pk: [a.a1], dist: UpstreamHashShard(a.a1) }
└─StreamIndexScan { index: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) }
- sql: |
set rw_streaming_enable_delta_join = true;
create table a (a1 int primary key, a2 int);
create table b (b1 int primary key, b2 int);
/* should generate delta join plan, and stream index scan */
select * from a join b on a.a1 = b.b1 ;
stream_plan: |
StreamMaterialize { columns: [a1, a2, b1, b2], pk_columns: [a1, b1] }
└─StreamExchange { dist: HashShard(a.a1, b.b1) }
└─StreamDeltaJoin { type: Inner, predicate: a.a1 = b.b1, output: all }
├─StreamTableScan { table: a, columns: [a.a1, a.a2], pk: [a.a1], dist: UpstreamHashShard(a.a1) }
└─StreamTableScan { table: b, columns: [b.b1, b.b2], pk: [b.b1], dist: UpstreamHashShard(b.b1) }
155 changes: 104 additions & 51 deletions src/frontend/planner_test/tests/testdata/distribution_derive.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,40 @@
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] }
└─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1) }
└─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1] }
├─StreamIndexScan { index: "ak1", columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamIndexScan { index: "bk1", columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
├─StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
stream_dist_plan: |
Fragment 0
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] }
materialized table: 4294967294
StreamExchange Hash([2, 3, 4, 5]) from 1
Fragment 1
Union
StreamExchange Hash([2, 4, 3, 5]) from 4
StreamExchange Hash([2, 4, 3, 5]) from 5
Fragment 2
StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
Upstream
BatchPlanNode
Fragment 3
StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
Upstream
BatchPlanNode
Fragment 4
Lookup
StreamExchange Hash([0]) from 3
StreamExchange NoShuffle from 2
Fragment 5
Lookup
StreamExchange Hash([0]) from 2
StreamExchange NoShuffle from 3
Table 4294967294 { columns: [v, bv, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1], primary key: [$2 ASC, $4 ASC, $3 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [2, 3, 4, 5] }
- id: Ak1_join_B_onk1
before:
- create_tables
Expand All @@ -32,36 +64,43 @@
└─BatchExchange { order: [], dist: UpstreamHashShard(ak1.k1) }
└─BatchScan { table: ak1, columns: [ak1.k1, ak1.v], distribution: UpstreamHashShard(ak1.k1) }
stream_plan: |
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), b._row_id(hidden), b.k1(hidden)], pk_columns: [ak1.a._row_id, b._row_id, ak1.k1, b.k1] }
└─StreamHashJoin { type: Inner, predicate: ak1.k1 = b.k1, output: [ak1.v, b.v, ak1.a._row_id, ak1.k1, b._row_id, b.k1] }
├─StreamExchange { dist: HashShard(ak1.k1) }
| └─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamExchange { dist: HashShard(b.k1) }
└─StreamTableScan { table: b, columns: [b.k1, b.v, b._row_id], pk: [b._row_id], dist: UpstreamHashShard(b._row_id) }
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] }
└─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1) }
└─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1] }
├─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
stream_dist_plan: |
Fragment 0
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), b._row_id(hidden), b.k1(hidden)], pk_columns: [ak1.a._row_id, b._row_id, ak1.k1, b.k1] }
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] }
materialized table: 4294967294
StreamHashJoin { type: Inner, predicate: ak1.k1 = b.k1, output: [ak1.v, b.v, ak1.a._row_id, ak1.k1, b._row_id, b.k1] }
left table: 0, right table 2, left degree table: 1, right degree table: 3,
StreamExchange Hash([0]) from 1
StreamExchange Hash([0]) from 2
StreamExchange Hash([2, 3, 4, 5]) from 1
Fragment 1
Union
StreamExchange Hash([2, 4, 3, 5]) from 4
StreamExchange Hash([2, 4, 3, 5]) from 5
Fragment 2
Chain { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
Upstream
BatchPlanNode
Fragment 2
Chain { table: b, columns: [b.k1, b.v, b._row_id], pk: [b._row_id], dist: UpstreamHashShard(b._row_id) }
Fragment 3
StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
Upstream
BatchPlanNode
Table 0 { columns: [ak1_k1, ak1_v, ak1_a__row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] }
Table 1 { columns: [ak1_k1, ak1_a__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Table 2 { columns: [b_k1, b_v, b__row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] }
Table 3 { columns: [b_k1, b__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Table 4294967294 { columns: [v, bv, ak1.a._row_id, ak1.k1, b._row_id, b.k1], primary key: [$2 ASC, $4 ASC, $3 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [3] }
Fragment 4
Lookup
StreamExchange Hash([0]) from 3
StreamExchange NoShuffle from 2
Fragment 5
Lookup
StreamExchange Hash([0]) from 2
StreamExchange NoShuffle from 3
Table 4294967294 { columns: [v, bv, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1], primary key: [$2 ASC, $4 ASC, $3 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [2, 3, 4, 5] }
- id: A_join_Bk1_onk1
before:
- create_tables
Expand All @@ -72,36 +111,43 @@
└─BatchExchange { order: [], dist: UpstreamHashShard(a.k1) }
└─BatchScan { table: a, columns: [a.k1, a.v], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [v, bv, a._row_id(hidden), a.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [a._row_id, bk1.b._row_id, a.k1, bk1.k1] }
└─StreamHashJoin { type: Inner, predicate: a.k1 = bk1.k1, output: [a.v, bk1.v, a._row_id, a.k1, bk1.b._row_id, bk1.k1] }
├─StreamExchange { dist: HashShard(a.k1) }
| └─StreamTableScan { table: a, columns: [a.k1, a.v, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) }
└─StreamExchange { dist: HashShard(bk1.k1) }
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] }
└─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1) }
└─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1] }
├─StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamTableScan { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
stream_dist_plan: |
Fragment 0
StreamMaterialize { columns: [v, bv, a._row_id(hidden), a.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [a._row_id, bk1.b._row_id, a.k1, bk1.k1] }
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] }
materialized table: 4294967294
StreamHashJoin { type: Inner, predicate: a.k1 = bk1.k1, output: [a.v, bk1.v, a._row_id, a.k1, bk1.b._row_id, bk1.k1] }
left table: 0, right table 2, left degree table: 1, right degree table: 3,
StreamExchange Hash([0]) from 1
StreamExchange Hash([0]) from 2
StreamExchange Hash([2, 3, 4, 5]) from 1
Fragment 1
Chain { table: a, columns: [a.k1, a.v, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) }
Union
StreamExchange Hash([2, 4, 3, 5]) from 4
StreamExchange Hash([2, 4, 3, 5]) from 5
Fragment 2
StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
Upstream
BatchPlanNode
Fragment 2
Fragment 3
Chain { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
Upstream
BatchPlanNode
Table 0 { columns: [a_k1, a_v, a__row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] }
Table 1 { columns: [a_k1, a__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Table 2 { columns: [bk1_k1, bk1_v, bk1_b__row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] }
Table 3 { columns: [bk1_k1, bk1_b__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Table 4294967294 { columns: [v, bv, a._row_id, a.k1, bk1.b._row_id, bk1.k1], primary key: [$2 ASC, $4 ASC, $3 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [3] }
Fragment 4
Lookup
StreamExchange Hash([0]) from 3
StreamExchange NoShuffle from 2
Fragment 5
Lookup
StreamExchange Hash([0]) from 2
StreamExchange NoShuffle from 3
Table 4294967294 { columns: [v, bv, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1], primary key: [$2 ASC, $4 ASC, $3 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [2, 3, 4, 5] }
- id: Ak1_join_Bk1_onk1
before:
- create_tables
Expand All @@ -113,35 +159,42 @@
└─BatchScan { table: ak1, columns: [ak1.k1, ak1.v], distribution: UpstreamHashShard(ak1.k1) }
stream_plan: |
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] }
└─StreamHashJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1] }
├─StreamExchange { dist: HashShard(ak1.k1) }
| └─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamExchange { dist: HashShard(bk1.k1) }
└─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1) }
└─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1] }
├─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamTableScan { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
stream_dist_plan: |
Fragment 0
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] }
materialized table: 4294967294
StreamHashJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1] }
left table: 0, right table 2, left degree table: 1, right degree table: 3,
StreamExchange Hash([0]) from 1
StreamExchange Hash([0]) from 2
StreamExchange Hash([2, 3, 4, 5]) from 1
Fragment 1
Union
StreamExchange Hash([2, 4, 3, 5]) from 4
StreamExchange Hash([2, 4, 3, 5]) from 5
Fragment 2
Chain { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
Upstream
BatchPlanNode
Fragment 2
Fragment 3
Chain { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
Upstream
BatchPlanNode
Table 0 { columns: [ak1_k1, ak1_v, ak1_a__row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] }
Table 1 { columns: [ak1_k1, ak1_a__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Table 2 { columns: [bk1_k1, bk1_v, bk1_b__row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] }
Table 3 { columns: [bk1_k1, bk1_b__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Table 4294967294 { columns: [v, bv, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1], primary key: [$2 ASC, $4 ASC, $3 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [3] }
Fragment 4
Lookup
StreamExchange Hash([0]) from 3
StreamExchange NoShuffle from 2
Fragment 5
Lookup
StreamExchange Hash([0]) from 2
StreamExchange NoShuffle from 3
Table 4294967294 { columns: [v, bv, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1], primary key: [$2 ASC, $4 ASC, $3 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [2, 3, 4, 5] }
- id: aggk1_from_A
before:
- create_tables
Expand Down
Loading

0 comments on commit 477e30d

Please sign in to comment.