diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index b54bf3c756b06..f2c7be3e8e63c 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -30,7 +30,7 @@ use crate::catalog::column_catalog::ColumnCatalog; use crate::catalog::table_catalog::TableCatalog; use crate::catalog::{gen_row_id_column_name, is_row_id_column_name, ColumnId}; use crate::optimizer::plan_node::{PlanBase, PlanNode}; -use crate::optimizer::property::Order; +use crate::optimizer::property::{Distribution, Order}; /// Materializes a stream. #[derive(Debug, Clone)] @@ -105,6 +105,13 @@ impl StreamMaterialize { user_order_by: Order, user_cols: FixedBitSet, ) -> Result { + // ensure the same pk will not shuffle to different node + let input = match input.distribution() { + Distribution::Single => input, + _ => Distribution::HashShard(input.pk_indices().to_vec()) + .enforce_if_not_satisfies(input, Order::any()), + }; + let base = Self::derive_plan_base(&input)?; let schema = &base.schema; let pk_indices = &base.pk_indices; diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 9b254d02d8c72..a40133da31f0b 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -100,13 +100,8 @@ impl StreamTableScan { type_name: "".to_string(), }) .collect(), - distribution_keys: self - .base - .dist - .dist_column_indices() - .iter() - .map(|idx| *idx as i32) - .collect_vec(), + // TODO: add the distribution key from tableCatalog + distribution_keys: vec![], // Will fill when resolving chain node. parallel_info: None, }; diff --git a/src/frontend/src/optimizer/property/distribution.rs b/src/frontend/src/optimizer/property/distribution.rs index 6e0f77a772c96..caa9c194d6cae 100644 --- a/src/frontend/src/optimizer/property/distribution.rs +++ b/src/frontend/src/optimizer/property/distribution.rs @@ -118,11 +118,13 @@ impl Distribution { matches!(self, Distribution::Any) } - /// Get distribution column indices. + /// Get distribution column indices. After optimization, only `HashShard` and `Single` are + /// valid. pub fn dist_column_indices(&self) -> &[usize] { match self { + Distribution::Single => Default::default(), Distribution::HashShard(dists) => dists, - _ => &[], + _ => unreachable!(), } } } diff --git a/src/frontend/test_runner/tests/testdata/basic_query_1.yaml b/src/frontend/test_runner/tests/testdata/basic_query_1.yaml index 41ec3ceb4ef02..f6d8e027d940f 100644 --- a/src/frontend/test_runner/tests/testdata/basic_query_1.yaml +++ b/src/frontend/test_runner/tests/testdata/basic_query_1.yaml @@ -11,7 +11,8 @@ BatchScan { table: t, columns: [v1, v2] } stream_plan: | StreamMaterialize { columns: [v1, v2, _row_id#0(hidden)], pk_columns: [_row_id#0] } - StreamTableScan { table: t, columns: [v1, v2, _row_id#0], pk_indices: [2] } + StreamExchange { dist: HashShard([2]) } + StreamTableScan { table: t, columns: [v1, v2, _row_id#0], pk_indices: [2] } - sql: | create table t (v1 bigint, v2 double precision); select t2.* from t; @@ -32,8 +33,9 @@ BatchScan { table: t, columns: [] } stream_plan: | StreamMaterialize { columns: [_row_id#0(hidden)], pk_columns: [_row_id#0] } - StreamFilter { predicate: (1:Int32 = 1:Int32) AND ((((1:Int32 > 2:Int32) AND (3:Int32 < 1:Int32)) AND (4:Int32 <> 1:Int32)) OR ((2:Int32 >= 1:Int32) AND (1:Int32 <= 2:Int32))) } - StreamTableScan { table: t, columns: [_row_id#0], pk_indices: [0] } + StreamExchange { dist: HashShard([0]) } + StreamFilter { predicate: (1:Int32 = 1:Int32) AND ((((1:Int32 > 2:Int32) AND (3:Int32 < 1:Int32)) AND (4:Int32 <> 1:Int32)) OR ((2:Int32 >= 1:Int32) AND (1:Int32 <= 2:Int32))) } + StreamTableScan { table: t, columns: [_row_id#0], pk_indices: [0] } - sql: | create table t (v1 int); select * from t where v1<1; @@ -43,8 +45,9 @@ BatchScan { table: t, columns: [v1] } stream_plan: | StreamMaterialize { columns: [v1, _row_id#0(hidden)], pk_columns: [_row_id#0] } - StreamFilter { predicate: ($0 < 1:Int32) } - StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] } + StreamExchange { dist: HashShard([1]) } + StreamFilter { predicate: ($0 < 1:Int32) } + StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] } - sql: | create table t (); select (((((false is not true) is true) is not false) is false) is not null) is null from t; @@ -88,8 +91,9 @@ BatchScan { table: t, columns: [v1] } stream_plan: | StreamMaterialize { columns: [expr#0, _row_id#0(hidden)], pk_columns: [_row_id#0] } - StreamProject { exprs: [Case(($0 = 1:Int32), 1:Int32::Decimal, ($0 = 2:Int32), 2:Int32::Decimal, 0.0:Decimal), $1], expr_alias: [ , ] } - StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] } + StreamExchange { dist: HashShard([1]) } + StreamProject { exprs: [Case(($0 = 1:Int32), 1:Int32::Decimal, ($0 = 2:Int32), 2:Int32::Decimal, 0.0:Decimal), $1], expr_alias: [ , ] } + StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] } - sql: | select length(trim(trailing '1' from '12'))+length(trim(leading '2' from '23'))+length(trim(both '3' from '34')); batch_plan: | diff --git a/src/frontend/test_runner/tests/testdata/basic_query_2.yaml b/src/frontend/test_runner/tests/testdata/basic_query_2.yaml index 1bca0a512a077..5ad02db58f33d 100644 --- a/src/frontend/test_runner/tests/testdata/basic_query_2.yaml +++ b/src/frontend/test_runner/tests/testdata/basic_query_2.yaml @@ -12,7 +12,8 @@ BatchScan { table: t, columns: [v1] } stream_plan: | StreamMaterialize { columns: [v1, _row_id#0(hidden)], pk_columns: [_row_id#0] } - StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] } + StreamExchange { dist: HashShard([1]) } + StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] } - sql: | values(cast(1 as bigint)); batch_plan: | @@ -59,16 +60,17 @@ BatchScan { table: t3, columns: [v1, v2] } stream_plan: | StreamMaterialize { columns: [t1_v1, t1_v2, t2_v1, t2_v2, t3_v1, t3_v2, _row_id#0(hidden), _row_id#1(hidden), _row_id#2(hidden)], pk_columns: [_row_id#0, _row_id#1, _row_id#2] } - StreamProject { exprs: [$0, $1, $3, $4, $6, $7, $2, $5, $8], expr_alias: [t1_v1, t1_v2, t2_v1, t2_v2, t3_v1, t3_v2, , , ] } - StreamHashJoin { type: Inner, predicate: $4 = $7 } - StreamExchange { dist: HashShard([4]) } - StreamHashJoin { type: Inner, predicate: $0 = $3 } - StreamExchange { dist: HashShard([0]) } - StreamTableScan { table: t1, columns: [v1, v2, _row_id#0], pk_indices: [2] } - StreamExchange { dist: HashShard([0]) } - StreamTableScan { table: t2, columns: [v1, v2, _row_id#0], pk_indices: [2] } - StreamExchange { dist: HashShard([1]) } - StreamTableScan { table: t3, columns: [v1, v2, _row_id#0], pk_indices: [2] } + StreamExchange { dist: HashShard([6, 7, 8]) } + StreamProject { exprs: [$0, $1, $3, $4, $6, $7, $2, $5, $8], expr_alias: [t1_v1, t1_v2, t2_v1, t2_v2, t3_v1, t3_v2, , , ] } + StreamHashJoin { type: Inner, predicate: $4 = $7 } + StreamExchange { dist: HashShard([4]) } + StreamHashJoin { type: Inner, predicate: $0 = $3 } + StreamExchange { dist: HashShard([0]) } + StreamTableScan { table: t1, columns: [v1, v2, _row_id#0], pk_indices: [2] } + StreamExchange { dist: HashShard([0]) } + StreamTableScan { table: t2, columns: [v1, v2, _row_id#0], pk_indices: [2] } + StreamExchange { dist: HashShard([1]) } + StreamTableScan { table: t3, columns: [v1, v2, _row_id#0], pk_indices: [2] } - sql: | create table t1 (v1 int not null, v2 int not null); create table t2 (v1 int not null, v2 int not null); @@ -83,12 +85,13 @@ BatchScan { table: t2, columns: [v1, v2] } stream_plan: | StreamMaterialize { columns: [t1_v2, t2_v2, _row_id#0(hidden), _row_id#1(hidden)], pk_columns: [_row_id#0, _row_id#1] } - StreamProject { exprs: [$1, $4, $2, $5], expr_alias: [t1_v2, t2_v2, , ] } - StreamHashJoin { type: Inner, predicate: $0 = $3 } - StreamExchange { dist: HashShard([0]) } - StreamTableScan { table: t1, columns: [v1, v2, _row_id#0], pk_indices: [2] } - StreamExchange { dist: HashShard([0]) } - StreamTableScan { table: t2, columns: [v1, v2, _row_id#0], pk_indices: [2] } + StreamExchange { dist: HashShard([2, 3]) } + StreamProject { exprs: [$1, $4, $2, $5], expr_alias: [t1_v2, t2_v2, , ] } + StreamHashJoin { type: Inner, predicate: $0 = $3 } + StreamExchange { dist: HashShard([0]) } + StreamTableScan { table: t1, columns: [v1, v2, _row_id#0], pk_indices: [2] } + StreamExchange { dist: HashShard([0]) } + StreamTableScan { table: t2, columns: [v1, v2, _row_id#0], pk_indices: [2] } - sql: select 1 batch_plan: | BatchProject { exprs: [1:Int32], expr_alias: [ ] } diff --git a/src/frontend/test_runner/tests/testdata/join.yaml b/src/frontend/test_runner/tests/testdata/join.yaml index a1aa401d4966b..88947332da0fd 100644 --- a/src/frontend/test_runner/tests/testdata/join.yaml +++ b/src/frontend/test_runner/tests/testdata/join.yaml @@ -13,14 +13,15 @@ LogicalScan { table: t3, columns: [_row_id#0, v5, v6] } stream_plan: | StreamMaterialize { columns: [v1, v2, _row_id#0(hidden), v3, v4, _row_id#1(hidden), v5, v6, _row_id#2(hidden)], pk_columns: [_row_id#0, _row_id#1, _row_id#2] } - StreamHashJoin { type: Inner, predicate: $0 = $6 } - StreamHashJoin { type: Inner, predicate: $0 = $3 } + StreamExchange { dist: HashShard([2, 5, 8]) } + StreamHashJoin { type: Inner, predicate: $0 = $6 } + StreamHashJoin { type: Inner, predicate: $0 = $3 } + StreamExchange { dist: HashShard([0]) } + StreamTableScan { table: t1, columns: [v1, v2, _row_id#0], pk_indices: [2] } + StreamExchange { dist: HashShard([0]) } + StreamTableScan { table: t2, columns: [v3, v4, _row_id#0], pk_indices: [2] } StreamExchange { dist: HashShard([0]) } - StreamTableScan { table: t1, columns: [v1, v2, _row_id#0], pk_indices: [2] } - StreamExchange { dist: HashShard([0]) } - StreamTableScan { table: t2, columns: [v3, v4, _row_id#0], pk_indices: [2] } - StreamExchange { dist: HashShard([0]) } - StreamTableScan { table: t3, columns: [v5, v6, _row_id#0], pk_indices: [2] } + StreamTableScan { table: t3, columns: [v5, v6, _row_id#0], pk_indices: [2] } - sql: | /* self join */ create table t (v1 int, v2 int); @@ -32,9 +33,10 @@ LogicalScan { table: t, columns: [_row_id#0, v1, v2] } stream_plan: | StreamMaterialize { columns: [t1v1, t2v1, _row_id#0(hidden), _row_id#1(hidden)], pk_columns: [_row_id#0, _row_id#1] } - StreamProject { exprs: [$0, $2, $1, $3], expr_alias: [t1v1, t2v1, , ] } - StreamHashJoin { type: Inner, predicate: $0 = $2 } - StreamExchange { dist: HashShard([0]) } - StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] } - StreamExchange { dist: HashShard([0]) } - StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] } + StreamExchange { dist: HashShard([2, 3]) } + StreamProject { exprs: [$0, $2, $1, $3], expr_alias: [t1v1, t2v1, , ] } + StreamHashJoin { type: Inner, predicate: $0 = $2 } + StreamExchange { dist: HashShard([0]) } + StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] } + StreamExchange { dist: HashShard([0]) } + StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] } diff --git a/src/frontend/test_runner/tests/testdata/mv_on_mv.yaml b/src/frontend/test_runner/tests/testdata/mv_on_mv.yaml index e1aaab260d4ba..cbb19f2c0c451 100644 --- a/src/frontend/test_runner/tests/testdata/mv_on_mv.yaml +++ b/src/frontend/test_runner/tests/testdata/mv_on_mv.yaml @@ -11,9 +11,10 @@ select m1.v1 as m1v1, m1.v2 as m1v2, m2.v1 as m2v1, m2.v2 as m2v2 from m1 join m2 on m1.v1 = m2.v1; stream_plan: | StreamMaterialize { columns: [m1v1, m1v2, m2v1, m2v2, _row_id#0(hidden), _row_id#1(hidden)], pk_columns: [_row_id#0, _row_id#1] } - StreamProject { exprs: [$0, $1, $3, $4, $2, $5], expr_alias: [m1v1, m1v2, m2v1, m2v2, , ] } - StreamHashJoin { type: Inner, predicate: $0 = $3 } - StreamExchange { dist: HashShard([0]) } - StreamTableScan { table: m1, columns: [v1, v2, _row_id#0], pk_indices: [2] } - StreamExchange { dist: HashShard([0]) } - StreamTableScan { table: m2, columns: [v1, v2, _row_id#0], pk_indices: [2] } + StreamExchange { dist: HashShard([4, 5]) } + StreamProject { exprs: [$0, $1, $3, $4, $2, $5], expr_alias: [m1v1, m1v2, m2v1, m2v2, , ] } + StreamHashJoin { type: Inner, predicate: $0 = $3 } + StreamExchange { dist: HashShard([0]) } + StreamTableScan { table: m1, columns: [v1, v2, _row_id#0], pk_indices: [2] } + StreamExchange { dist: HashShard([0]) } + StreamTableScan { table: m2, columns: [v1, v2, _row_id#0], pk_indices: [2] } diff --git a/src/frontend/test_runner/tests/testdata/stream_proto.yaml b/src/frontend/test_runner/tests/testdata/stream_proto.yaml index 6d6cf2fc50599..3548946d382c9 100644 --- a/src/frontend/test_runner/tests/testdata/stream_proto.yaml +++ b/src/frontend/test_runner/tests/testdata/stream_proto.yaml @@ -7,39 +7,70 @@ input: - input: - input: - - mergeNode: {} - - pkIndices: + - input: + - mergeNode: {} + - pkIndices: + - 1 + batchPlanNode: + tableRefId: + tableId: 1 + columnDescs: + - columnType: + typeName: INT32 + isNullable: true + columnId: 1 + name: v1 + - columnType: + typeName: INT64 + isNullable: true + name: "_row_id#0" + pkIndices: - 1 - batchPlanNode: + chainNode: tableRefId: tableId: 1 - columnDescs: - - columnType: - typeName: INT32 - isNullable: true - columnId: 1 - name: v1 - - columnType: + upstreamFields: + - dataType: typeName: INT64 isNullable: true name: "_row_id#0" + - dataType: + typeName: INT32 + isNullable: true + name: v1 + columnIds: + - 1 + - 0 pkIndices: - 1 - chainNode: - tableRefId: - tableId: 1 - upstreamFields: - - dataType: - typeName: INT64 - isNullable: true - name: "_row_id#0" - - dataType: - typeName: INT32 - isNullable: true - name: v1 - columnIds: - - 1 - - 0 + fields: + - dataType: + typeName: INT32 + isNullable: true + name: v1 + - dataType: + typeName: INT64 + isNullable: true + name: "_row_id#0" + filterNode: + searchCondition: + exprType: LESS_THAN + returnType: + typeName: BOOLEAN + isNullable: true + funcCall: + children: + - exprType: INPUT_REF + returnType: + typeName: INT32 + isNullable: true + inputRef: {} + - exprType: CONSTANT_VALUE + returnType: + typeName: INT32 + isNullable: true + constant: + body: AAAAAQ== pkIndices: - 1 fields: @@ -51,25 +82,11 @@ typeName: INT64 isNullable: true name: "_row_id#0" - filterNode: - searchCondition: - exprType: LESS_THAN - returnType: - typeName: BOOLEAN - isNullable: true - funcCall: - children: - - exprType: INPUT_REF - returnType: - typeName: INT32 - isNullable: true - inputRef: {} - - exprType: CONSTANT_VALUE - returnType: - typeName: INT32 - isNullable: true - constant: - body: AAAAAQ== + exchangeNode: + strategy: + type: HASH + columnIndices: + - 1 pkIndices: - 1 fields: @@ -92,6 +109,8 @@ columnIds: - 0 - 1 + distributionKeys: + - 1 --- id: 4294967294 name: test @@ -120,39 +139,56 @@ --- input: - input: - - mergeNode: {} - - pkIndices: + - input: + - mergeNode: {} + - pkIndices: + - 1 + batchPlanNode: + tableRefId: + tableId: 1 + columnDescs: + - columnType: + typeName: INT32 + isNullable: true + columnId: 1 + name: v1 + - columnType: + typeName: INT64 + isNullable: true + name: "_row_id#0" + pkIndices: - 1 - batchPlanNode: + chainNode: tableRefId: tableId: 1 - columnDescs: - - columnType: - typeName: INT32 - isNullable: true - columnId: 1 - name: v1 - - columnType: + upstreamFields: + - dataType: typeName: INT64 isNullable: true name: "_row_id#0" + - dataType: + typeName: INT32 + isNullable: true + name: v1 + columnIds: + - 1 + - 0 pkIndices: - 1 - chainNode: - tableRefId: - tableId: 1 - upstreamFields: - - dataType: - typeName: INT64 - isNullable: true - name: "_row_id#0" - - dataType: - typeName: INT32 - isNullable: true - name: v1 - columnIds: - - 1 - - 0 + fields: + - dataType: + typeName: INT32 + isNullable: true + name: v1 + - dataType: + typeName: INT64 + isNullable: true + name: "_row_id#0" + exchangeNode: + strategy: + type: HASH + columnIndices: + - 1 pkIndices: - 1 fields: @@ -175,6 +211,8 @@ columnIds: - 0 - 1 + distributionKeys: + - 1 --- id: 4294967294 name: test @@ -203,43 +241,60 @@ --- input: - input: - - mergeNode: {} - - pkIndices: + - input: + - mergeNode: {} + - pkIndices: + - 1 + batchPlanNode: + tableRefId: + tableId: 1 + columnDescs: + - columnType: + typeName: INT32 + isNullable: true + columnId: 1 + name: v1 + - columnType: + typeName: INT64 + isNullable: true + name: "_row_id#0" + pkIndices: - 1 - batchPlanNode: + chainNode: tableRefId: tableId: 1 - columnDescs: - - columnType: + upstreamFields: + - dataType: + typeName: INT64 + isNullable: true + name: "_row_id#0" + - dataType: typeName: INT32 isNullable: true - columnId: 1 name: v1 - - columnType: - typeName: INT64 + - dataType: + typeName: INT32 isNullable: true - name: "_row_id#0" + name: v2 + columnIds: + - 1 + - 0 pkIndices: - 1 - chainNode: - tableRefId: - tableId: 1 - upstreamFields: - - dataType: - typeName: INT64 - isNullable: true - name: "_row_id#0" - - dataType: - typeName: INT32 - isNullable: true - name: v1 - - dataType: - typeName: INT32 - isNullable: true - name: v2 - columnIds: - - 1 - - 0 + fields: + - dataType: + typeName: INT32 + isNullable: true + name: v1 + - dataType: + typeName: INT64 + isNullable: true + name: "_row_id#0" + exchangeNode: + strategy: + type: HASH + columnIndices: + - 1 pkIndices: - 1 fields: @@ -262,6 +317,8 @@ columnIds: - 0 - 1 + distributionKeys: + - 1 --- id: 4294967294 name: test diff --git a/src/frontend/test_runner/tests/testdata/struct_query.yaml b/src/frontend/test_runner/tests/testdata/struct_query.yaml index fbf869aea5052..895b6a8e8834c 100644 --- a/src/frontend/test_runner/tests/testdata/struct_query.yaml +++ b/src/frontend/test_runner/tests/testdata/struct_query.yaml @@ -6,7 +6,8 @@ BatchScan { table: mv1, columns: [country] } stream_plan: | StreamMaterialize { columns: [country, _row_id#0(hidden)], pk_columns: [_row_id#0] } - StreamTableScan { table: mv1, columns: [country, _row_id#0], pk_indices: [1] } + StreamExchange { dist: HashShard([1]) } + StreamTableScan { table: mv1, columns: [country, _row_id#0], pk_indices: [1] } create_source: row_format: protobuf name: t