From 477e30de470a2066d20efb57fe8e3f49a31d51d9 Mon Sep 17 00:00:00 2001 From: Dylan Date: Thu, 2 Feb 2023 17:44:31 +0800 Subject: [PATCH] feat(streaming): support delta join on primary table (#7662) - Support delta join on primary table, because primary table is also an index as well. Approved-By: st1page --- .../{ => delta_join}/delta_join_snapshot.slt | 5 +- .../delta_join_snapshot_no_index.slt | 47 ++++++ .../{ => delta_join}/delta_join_upstream.slt | 5 +- .../delta_join_upstream_no_index.slt | 47 ++++++ .../tests/testdata/delta_join.yaml | 29 +++- .../tests/testdata/distribution_derive.yaml | 155 ++++++++++++------ .../optimizer/plan_node/stream_delta_join.rs | 23 ++- .../optimizer/plan_node/stream_index_scan.rs | 34 ++-- .../optimizer/plan_node/stream_table_scan.rs | 12 +- .../optimizer/rule/index_delta_join_rule.rs | 29 +++- 10 files changed, 308 insertions(+), 78 deletions(-) rename e2e_test/streaming/{ => delta_join}/delta_join_snapshot.slt (92%) create mode 100644 e2e_test/streaming/delta_join/delta_join_snapshot_no_index.slt rename e2e_test/streaming/{ => delta_join}/delta_join_upstream.slt (92%) create mode 100644 e2e_test/streaming/delta_join/delta_join_upstream_no_index.slt diff --git a/e2e_test/streaming/delta_join_snapshot.slt b/e2e_test/streaming/delta_join/delta_join_snapshot.slt similarity index 92% rename from e2e_test/streaming/delta_join_snapshot.slt rename to e2e_test/streaming/delta_join/delta_join_snapshot.slt index 12a269710708..1f973608aacb 100644 --- a/e2e_test/streaming/delta_join_snapshot.slt +++ b/e2e_test/streaming/delta_join/delta_join_snapshot.slt @@ -32,7 +32,7 @@ statement ok set streaming_parallelism = 0; statement ok -create materialized view v as select * from a join b on a.a1 = b.b1 ; +create materialized view v as select * from a join b on a.a1 = b.b1; query IIII rowsort select * from v order by a1, a2, b1, b2; @@ -50,3 +50,6 @@ drop table a; statement ok drop table b; + +statement ok +set rw_streaming_enable_delta_join = false; diff --git a/e2e_test/streaming/delta_join/delta_join_snapshot_no_index.slt b/e2e_test/streaming/delta_join/delta_join_snapshot_no_index.slt new file mode 100644 index 000000000000..a22ceff6423a --- /dev/null +++ b/e2e_test/streaming/delta_join/delta_join_snapshot_no_index.slt @@ -0,0 +1,47 @@ +statement ok +set rw_implicit_flush = true; + +statement ok +set rw_streaming_enable_delta_join = true; + +statement ok +set streaming_parallelism = 2; + +statement ok +create table a (a1 int primary key, a2 int); + +statement ok +set streaming_parallelism = 3; + +statement ok +create table b (b1 int primary key, b2 int); + +statement ok +insert into A values (1,2), (11, 22); + +statement ok +insert into B values (1,4), (11, 44); + +statement ok +set streaming_parallelism = 0; + +statement ok +create materialized view v as select * from a join b on a.a1 = b.b1; + +query IIII rowsort +select * from v order by a1, a2, b1, b2; +---- +1 2 1 4 +11 22 11 44 + +statement ok +drop materialized view v; + +statement ok +drop table a; + +statement ok +drop table b; + +statement ok +set rw_streaming_enable_delta_join = false; diff --git a/e2e_test/streaming/delta_join_upstream.slt b/e2e_test/streaming/delta_join/delta_join_upstream.slt similarity index 92% rename from e2e_test/streaming/delta_join_upstream.slt rename to e2e_test/streaming/delta_join/delta_join_upstream.slt index 466fad7dd092..090e001d4885 100644 --- a/e2e_test/streaming/delta_join_upstream.slt +++ b/e2e_test/streaming/delta_join/delta_join_upstream.slt @@ -26,7 +26,7 @@ statement ok set streaming_parallelism = 0; statement ok -create materialized view v as select * from a join b on a.a1 = b.b1 ; +create materialized view v as select * from a join b on a.a1 = b.b1; statement ok insert into A values (1,2), (1,3); @@ -50,3 +50,6 @@ drop table a; statement ok drop table b; + +statement ok +set rw_streaming_enable_delta_join = false; diff --git a/e2e_test/streaming/delta_join/delta_join_upstream_no_index.slt b/e2e_test/streaming/delta_join/delta_join_upstream_no_index.slt new file mode 100644 index 000000000000..956bdc31682f --- /dev/null +++ b/e2e_test/streaming/delta_join/delta_join_upstream_no_index.slt @@ -0,0 +1,47 @@ +statement ok +set rw_implicit_flush = true; + +statement ok +set rw_streaming_enable_delta_join = true; + +statement ok +set streaming_parallelism = 2; + +statement ok +create table a (a1 int primary key, a2 int); + +statement ok +set streaming_parallelism = 3; + +statement ok +create table b (b1 int primary key, b2 int); + +statement ok +set streaming_parallelism = 0; + +statement ok +create materialized view v as select * from a join b on a.a1 = b.b1; + +statement ok +insert into A values (1,2), (11, 22); + +statement ok +insert into B values (1,4), (11, 44); + +query IIII rowsort +select * from v order by a1, a2, b1, b2; +---- +1 2 1 4 +11 22 11 44 + +statement ok +drop materialized view v; + +statement ok +drop table a; + +statement ok +drop table b; + +statement ok +set rw_streaming_enable_delta_join = false; diff --git a/src/frontend/planner_test/tests/testdata/delta_join.yaml b/src/frontend/planner_test/tests/testdata/delta_join.yaml index 4a5299967647..331210d86a96 100644 --- a/src/frontend/planner_test/tests/testdata/delta_join.yaml +++ b/src/frontend/planner_test/tests/testdata/delta_join.yaml @@ -11,5 +11,30 @@ StreamMaterialize { columns: [a1, a2, b1, b2, i_a1.a._row_id(hidden), i_b1.b._row_id(hidden)], pk_columns: [i_a1.a._row_id, i_b1.b._row_id, a1, b1] } └─StreamExchange { dist: HashShard(i_a1.a1, i_b1.b1, i_a1.a._row_id, i_b1.b._row_id) } └─StreamDeltaJoin { type: Inner, predicate: i_a1.a1 = i_b1.b1, output: [i_a1.a1, i_a1.a2, i_b1.b1, i_b1.b2, i_a1.a._row_id, i_b1.b._row_id] } - ├─StreamIndexScan { index: "i_a1", columns: [i_a1.a1, i_a1.a2, i_a1.a._row_id], pk: [i_a1.a._row_id], dist: UpstreamHashShard(i_a1.a1) } - └─StreamIndexScan { index: "i_b1", columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) } + ├─StreamIndexScan { index: i_a1, columns: [i_a1.a1, i_a1.a2, i_a1.a._row_id], pk: [i_a1.a._row_id], dist: UpstreamHashShard(i_a1.a1) } + └─StreamIndexScan { index: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) } +- sql: | + set rw_streaming_enable_delta_join = true; + create table a (a1 int primary key, a2 int); + create table b (b1 int, b2 int); + create index i_b1 on b(b1); + /* should generate delta join plan, and stream index scan */ + select * from a join b on a.a1 = b.b1 ; + stream_plan: | + StreamMaterialize { columns: [a1, a2, b1, b2, i_b1.b._row_id(hidden)], pk_columns: [a1, i_b1.b._row_id, b1] } + └─StreamExchange { dist: HashShard(a.a1, i_b1.b1, i_b1.b._row_id) } + └─StreamDeltaJoin { type: Inner, predicate: a.a1 = i_b1.b1, output: all } + ├─StreamTableScan { table: a, columns: [a.a1, a.a2], pk: [a.a1], dist: UpstreamHashShard(a.a1) } + └─StreamIndexScan { index: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) } +- sql: | + set rw_streaming_enable_delta_join = true; + create table a (a1 int primary key, a2 int); + create table b (b1 int primary key, b2 int); + /* should generate delta join plan, and stream index scan */ + select * from a join b on a.a1 = b.b1 ; + stream_plan: | + StreamMaterialize { columns: [a1, a2, b1, b2], pk_columns: [a1, b1] } + └─StreamExchange { dist: HashShard(a.a1, b.b1) } + └─StreamDeltaJoin { type: Inner, predicate: a.a1 = b.b1, output: all } + ├─StreamTableScan { table: a, columns: [a.a1, a.a2], pk: [a.a1], dist: UpstreamHashShard(a.a1) } + └─StreamTableScan { table: b, columns: [b.b1, b.b2], pk: [b.b1], dist: UpstreamHashShard(b.b1) } diff --git a/src/frontend/planner_test/tests/testdata/distribution_derive.yaml b/src/frontend/planner_test/tests/testdata/distribution_derive.yaml index 22d819a25f98..c8c348511352 100644 --- a/src/frontend/planner_test/tests/testdata/distribution_derive.yaml +++ b/src/frontend/planner_test/tests/testdata/distribution_derive.yaml @@ -20,8 +20,40 @@ StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] } └─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1) } └─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1] } - ├─StreamIndexScan { index: "ak1", columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) } - └─StreamIndexScan { index: "bk1", columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) } + ├─StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) } + └─StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) } + stream_dist_plan: | + Fragment 0 + StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] } + materialized table: 4294967294 + StreamExchange Hash([2, 3, 4, 5]) from 1 + + Fragment 1 + Union + StreamExchange Hash([2, 4, 3, 5]) from 4 + StreamExchange Hash([2, 4, 3, 5]) from 5 + + Fragment 2 + StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) } + Upstream + BatchPlanNode + + Fragment 3 + StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) } + Upstream + BatchPlanNode + + Fragment 4 + Lookup + StreamExchange Hash([0]) from 3 + StreamExchange NoShuffle from 2 + + Fragment 5 + Lookup + StreamExchange Hash([0]) from 2 + StreamExchange NoShuffle from 3 + + Table 4294967294 { columns: [v, bv, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1], primary key: [$2 ASC, $4 ASC, $3 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [2, 3, 4, 5] } - id: Ak1_join_B_onk1 before: - create_tables @@ -32,36 +64,43 @@ └─BatchExchange { order: [], dist: UpstreamHashShard(ak1.k1) } └─BatchScan { table: ak1, columns: [ak1.k1, ak1.v], distribution: UpstreamHashShard(ak1.k1) } stream_plan: | - StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), b._row_id(hidden), b.k1(hidden)], pk_columns: [ak1.a._row_id, b._row_id, ak1.k1, b.k1] } - └─StreamHashJoin { type: Inner, predicate: ak1.k1 = b.k1, output: [ak1.v, b.v, ak1.a._row_id, ak1.k1, b._row_id, b.k1] } - ├─StreamExchange { dist: HashShard(ak1.k1) } - | └─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) } - └─StreamExchange { dist: HashShard(b.k1) } - └─StreamTableScan { table: b, columns: [b.k1, b.v, b._row_id], pk: [b._row_id], dist: UpstreamHashShard(b._row_id) } + StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] } + └─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1) } + └─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1] } + ├─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) } + └─StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) } stream_dist_plan: | Fragment 0 - StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), b._row_id(hidden), b.k1(hidden)], pk_columns: [ak1.a._row_id, b._row_id, ak1.k1, b.k1] } + StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] } materialized table: 4294967294 - StreamHashJoin { type: Inner, predicate: ak1.k1 = b.k1, output: [ak1.v, b.v, ak1.a._row_id, ak1.k1, b._row_id, b.k1] } - left table: 0, right table 2, left degree table: 1, right degree table: 3, - StreamExchange Hash([0]) from 1 - StreamExchange Hash([0]) from 2 + StreamExchange Hash([2, 3, 4, 5]) from 1 Fragment 1 + Union + StreamExchange Hash([2, 4, 3, 5]) from 4 + StreamExchange Hash([2, 4, 3, 5]) from 5 + + Fragment 2 Chain { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) } Upstream BatchPlanNode - Fragment 2 - Chain { table: b, columns: [b.k1, b.v, b._row_id], pk: [b._row_id], dist: UpstreamHashShard(b._row_id) } + Fragment 3 + StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) } Upstream BatchPlanNode - Table 0 { columns: [ak1_k1, ak1_v, ak1_a__row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] } - Table 1 { columns: [ak1_k1, ak1_a__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] } - Table 2 { columns: [b_k1, b_v, b__row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] } - Table 3 { columns: [b_k1, b__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] } - Table 4294967294 { columns: [v, bv, ak1.a._row_id, ak1.k1, b._row_id, b.k1], primary key: [$2 ASC, $4 ASC, $3 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [3] } + Fragment 4 + Lookup + StreamExchange Hash([0]) from 3 + StreamExchange NoShuffle from 2 + + Fragment 5 + Lookup + StreamExchange Hash([0]) from 2 + StreamExchange NoShuffle from 3 + + Table 4294967294 { columns: [v, bv, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1], primary key: [$2 ASC, $4 ASC, $3 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [2, 3, 4, 5] } - id: A_join_Bk1_onk1 before: - create_tables @@ -72,36 +111,43 @@ └─BatchExchange { order: [], dist: UpstreamHashShard(a.k1) } └─BatchScan { table: a, columns: [a.k1, a.v], distribution: SomeShard } stream_plan: | - StreamMaterialize { columns: [v, bv, a._row_id(hidden), a.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [a._row_id, bk1.b._row_id, a.k1, bk1.k1] } - └─StreamHashJoin { type: Inner, predicate: a.k1 = bk1.k1, output: [a.v, bk1.v, a._row_id, a.k1, bk1.b._row_id, bk1.k1] } - ├─StreamExchange { dist: HashShard(a.k1) } - | └─StreamTableScan { table: a, columns: [a.k1, a.v, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) } - └─StreamExchange { dist: HashShard(bk1.k1) } + StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] } + └─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1) } + └─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1] } + ├─StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) } └─StreamTableScan { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) } stream_dist_plan: | Fragment 0 - StreamMaterialize { columns: [v, bv, a._row_id(hidden), a.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [a._row_id, bk1.b._row_id, a.k1, bk1.k1] } + StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] } materialized table: 4294967294 - StreamHashJoin { type: Inner, predicate: a.k1 = bk1.k1, output: [a.v, bk1.v, a._row_id, a.k1, bk1.b._row_id, bk1.k1] } - left table: 0, right table 2, left degree table: 1, right degree table: 3, - StreamExchange Hash([0]) from 1 - StreamExchange Hash([0]) from 2 + StreamExchange Hash([2, 3, 4, 5]) from 1 Fragment 1 - Chain { table: a, columns: [a.k1, a.v, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) } + Union + StreamExchange Hash([2, 4, 3, 5]) from 4 + StreamExchange Hash([2, 4, 3, 5]) from 5 + + Fragment 2 + StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) } Upstream BatchPlanNode - Fragment 2 + Fragment 3 Chain { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) } Upstream BatchPlanNode - Table 0 { columns: [a_k1, a_v, a__row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] } - Table 1 { columns: [a_k1, a__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] } - Table 2 { columns: [bk1_k1, bk1_v, bk1_b__row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] } - Table 3 { columns: [bk1_k1, bk1_b__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] } - Table 4294967294 { columns: [v, bv, a._row_id, a.k1, bk1.b._row_id, bk1.k1], primary key: [$2 ASC, $4 ASC, $3 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [3] } + Fragment 4 + Lookup + StreamExchange Hash([0]) from 3 + StreamExchange NoShuffle from 2 + + Fragment 5 + Lookup + StreamExchange Hash([0]) from 2 + StreamExchange NoShuffle from 3 + + Table 4294967294 { columns: [v, bv, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1], primary key: [$2 ASC, $4 ASC, $3 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [2, 3, 4, 5] } - id: Ak1_join_Bk1_onk1 before: - create_tables @@ -113,35 +159,42 @@ └─BatchScan { table: ak1, columns: [ak1.k1, ak1.v], distribution: UpstreamHashShard(ak1.k1) } stream_plan: | StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] } - └─StreamHashJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1] } - ├─StreamExchange { dist: HashShard(ak1.k1) } - | └─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) } - └─StreamExchange { dist: HashShard(bk1.k1) } + └─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1) } + └─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1] } + ├─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) } └─StreamTableScan { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) } stream_dist_plan: | Fragment 0 StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden), bk1.k1(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1, bk1.k1] } materialized table: 4294967294 - StreamHashJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1] } - left table: 0, right table 2, left degree table: 1, right degree table: 3, - StreamExchange Hash([0]) from 1 - StreamExchange Hash([0]) from 2 + StreamExchange Hash([2, 3, 4, 5]) from 1 Fragment 1 + Union + StreamExchange Hash([2, 4, 3, 5]) from 4 + StreamExchange Hash([2, 4, 3, 5]) from 5 + + Fragment 2 Chain { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) } Upstream BatchPlanNode - Fragment 2 + Fragment 3 Chain { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) } Upstream BatchPlanNode - Table 0 { columns: [ak1_k1, ak1_v, ak1_a__row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] } - Table 1 { columns: [ak1_k1, ak1_a__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] } - Table 2 { columns: [bk1_k1, bk1_v, bk1_b__row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] } - Table 3 { columns: [bk1_k1, bk1_b__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] } - Table 4294967294 { columns: [v, bv, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1], primary key: [$2 ASC, $4 ASC, $3 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [3] } + Fragment 4 + Lookup + StreamExchange Hash([0]) from 3 + StreamExchange NoShuffle from 2 + + Fragment 5 + Lookup + StreamExchange Hash([0]) from 2 + StreamExchange NoShuffle from 3 + + Table 4294967294 { columns: [v, bv, ak1.a._row_id, ak1.k1, bk1.b._row_id, bk1.k1], primary key: [$2 ASC, $4 ASC, $3 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [2, 3, 4, 5] } - id: aggk1_from_A before: - create_tables diff --git a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs index 67f20767bcbc..7eff29860ced 100644 --- a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs @@ -154,10 +154,23 @@ impl StreamNode for StreamDeltaJoin { fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> NodeBody { let left = self.left(); let right = self.right(); - let left_table = left.as_stream_index_scan().unwrap(); - let right_table = right.as_stream_index_scan().unwrap(); - let left_table_desc = left_table.logical().table_desc(); - let right_table_desc = right_table.logical().table_desc(); + + let left_table = if let Some(stream_index_scan) = left.as_stream_index_scan() { + stream_index_scan.logical() + } else if let Some(stream_table_scan) = left.as_stream_table_scan() { + stream_table_scan.logical() + } else { + unreachable!(); + }; + let left_table_desc = left_table.table_desc(); + let right_table = if let Some(stream_index_scan) = right.as_stream_index_scan() { + stream_index_scan.logical() + } else if let Some(stream_table_scan) = right.as_stream_table_scan() { + stream_table_scan.logical() + } else { + unreachable!(); + }; + let right_table_desc = right_table.table_desc(); // TODO: add a separate delta join node in proto, or move fragmenter to frontend so that we // don't need an intermediate representation. @@ -192,7 +205,6 @@ impl StreamNode for StreamDeltaJoin { arrange_key_orders: left_table_desc.arrange_key_orders_prost(), // TODO: remove it column_descs: left_table - .logical() .column_descs() .iter() .map(ColumnDesc::to_protobuf) @@ -204,7 +216,6 @@ impl StreamNode for StreamDeltaJoin { arrange_key_orders: right_table_desc.arrange_key_orders_prost(), // TODO: remove it column_descs: right_table - .logical() .column_descs() .iter() .map(ColumnDesc::to_protobuf) diff --git a/src/frontend/src/optimizer/plan_node/stream_index_scan.rs b/src/frontend/src/optimizer/plan_node/stream_index_scan.rs index 7b3ccf350ded..e53edf901aca 100644 --- a/src/frontend/src/optimizer/plan_node/stream_index_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_index_scan.rs @@ -89,17 +89,14 @@ impl fmt::Display for StreamIndexScan { let verbose = self.base.ctx.is_explain_verbose(); let mut builder = f.debug_struct("StreamIndexScan"); - builder.field("index", &self.logical.table_name()).field( - "columns", - &format_args!( - "[{}]", - match verbose { - false => self.logical.column_names(), - true => self.logical.column_names_with_table_prefix(), - } - .join(", ") - ), - ); + let v = match verbose { + false => self.logical.column_names(), + true => self.logical.column_names_with_table_prefix(), + } + .join(", "); + builder + .field("index", &format_args!("{}", self.logical.table_name())) + .field("columns", &format_args!("[{}]", v)); if verbose { builder.field( @@ -151,6 +148,21 @@ impl StreamIndexScan { // The merge node should be empty ProstStreamPlan { node_body: Some(ProstStreamNode::Merge(Default::default())), + identity: "Upstream".into(), + fields: self + .logical + .table_desc() + .columns + .iter() + .map(|c| risingwave_common::catalog::Field::from(c).to_prost()) + .collect(), + stream_key: self + .logical + .table_desc() + .stream_key + .iter() + .map(|i| *i as _) + .collect(), ..Default::default() }, ProstStreamPlan { diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 63460b9eafc2..831ecf3ed2ed 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -36,10 +36,15 @@ pub struct StreamTableScan { pub base: PlanBase, logical: LogicalScan, batch_plan_id: PlanNodeId, + chain_type: ChainType, } impl StreamTableScan { pub fn new(logical: LogicalScan) -> Self { + Self::new_with_chain_type(logical, ChainType::Backfill) + } + + pub fn new_with_chain_type(logical: LogicalScan, chain_type: ChainType) -> Self { let ctx = logical.base.ctx.clone(); let batch_plan_id = ctx.next_plan_node_id(); @@ -69,6 +74,7 @@ impl StreamTableScan { base, logical, batch_plan_id, + chain_type, } } @@ -93,6 +99,10 @@ impl StreamTableScan { chain_type, ) } + + pub fn chain_type(&self) -> ChainType { + self.chain_type + } } impl_plan_tree_node_for_leaf! { StreamTableScan } @@ -191,7 +201,7 @@ impl StreamTableScan { node_body: Some(ProstStreamNode::Chain(ChainNode { table_id: self.logical.table_desc().table_id.table_id, same_worker_node: false, - chain_type: ChainType::Backfill as i32, + chain_type: self.chain_type as i32, // The fields from upstream upstream_fields: self .logical diff --git a/src/frontend/src/optimizer/rule/index_delta_join_rule.rs b/src/frontend/src/optimizer/rule/index_delta_join_rule.rs index 49204668d53d..ed9677f68ebb 100644 --- a/src/frontend/src/optimizer/rule/index_delta_join_rule.rs +++ b/src/frontend/src/optimizer/rule/index_delta_join_rule.rs @@ -56,10 +56,6 @@ impl Rule for IndexDeltaJoinRule { table_scan: &StreamTableScan, chain_type: ChainType, ) -> Option { - if table_scan.logical().indexes().is_empty() { - return None; - } - for index in table_scan.logical().indexes() { // Only full covering index can be used in delta join if !index.full_covering() { @@ -107,7 +103,30 @@ impl Rule for IndexDeltaJoinRule { ); } - None + // Primary table is also an index. + let primary_table = table_scan.logical(); + if let Some(primary_table_distribution_key) = primary_table.distribution_key() + && primary_table_distribution_key == join_indices { + // Check join key is prefix of primary table order key + let primary_table_order_key_prefix = primary_table.table_desc().pk.iter() + .map(|x| x.column_idx) + .take(primary_table_distribution_key.len()) + .collect_vec(); + + if primary_table_order_key_prefix != join_indices { + return None; + } + + if chain_type != table_scan.chain_type() { + Some( + StreamTableScan::new_with_chain_type(table_scan.logical().clone(), chain_type).into() + ) + } else { + Some(table_scan.clone().into()) + } + } else { + None + } } // Delta join only needs to backfill one stream flow and others should be upstream only