Skip to content

Commit

Permalink
fix(optimizer): materialize's dist should satisfies pk (risingwavelab…
Browse files Browse the repository at this point in the history
…s#1926)

* fix(optimizer): materialize's dist should statisfies pk

* more detail

* fix ut

* revert risingwavelabs#1653

* fix

* fix
  • Loading branch information
st1page authored Apr 19, 2022
1 parent 7656f8a commit 754815e
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 151 deletions.
9 changes: 8 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::catalog::column_catalog::ColumnCatalog;
use crate::catalog::table_catalog::TableCatalog;
use crate::catalog::{gen_row_id_column_name, is_row_id_column_name, ColumnId};
use crate::optimizer::plan_node::{PlanBase, PlanNode};
use crate::optimizer::property::Order;
use crate::optimizer::property::{Distribution, Order};

/// Materializes a stream.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -105,6 +105,13 @@ impl StreamMaterialize {
user_order_by: Order,
user_cols: FixedBitSet,
) -> Result<Self> {
// ensure the same pk will not shuffle to different node
let input = match input.distribution() {
Distribution::Single => input,
_ => Distribution::HashShard(input.pk_indices().to_vec())
.enforce_if_not_satisfies(input, Order::any()),
};

let base = Self::derive_plan_base(&input)?;
let schema = &base.schema;
let pk_indices = &base.pk_indices;
Expand Down
9 changes: 2 additions & 7 deletions src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,8 @@ impl StreamTableScan {
type_name: "".to_string(),
})
.collect(),
distribution_keys: self
.base
.dist
.dist_column_indices()
.iter()
.map(|idx| *idx as i32)
.collect_vec(),
// TODO: add the distribution key from tableCatalog
distribution_keys: vec![],
// Will fill when resolving chain node.
parallel_info: None,
};
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/optimizer/property/distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,13 @@ impl Distribution {
matches!(self, Distribution::Any)
}

/// Get distribution column indices.
/// Get distribution column indices. After optimization, only `HashShard` and `Single` are
/// valid.
pub fn dist_column_indices(&self) -> &[usize] {
match self {
Distribution::Single => Default::default(),
Distribution::HashShard(dists) => dists,
_ => &[],
_ => unreachable!(),
}
}
}
Expand Down
18 changes: 11 additions & 7 deletions src/frontend/test_runner/tests/testdata/basic_query_1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
BatchScan { table: t, columns: [v1, v2] }
stream_plan: |
StreamMaterialize { columns: [v1, v2, _row_id#0(hidden)], pk_columns: [_row_id#0] }
StreamTableScan { table: t, columns: [v1, v2, _row_id#0], pk_indices: [2] }
StreamExchange { dist: HashShard([2]) }
StreamTableScan { table: t, columns: [v1, v2, _row_id#0], pk_indices: [2] }
- sql: |
create table t (v1 bigint, v2 double precision);
select t2.* from t;
Expand All @@ -32,8 +33,9 @@
BatchScan { table: t, columns: [] }
stream_plan: |
StreamMaterialize { columns: [_row_id#0(hidden)], pk_columns: [_row_id#0] }
StreamFilter { predicate: (1:Int32 = 1:Int32) AND ((((1:Int32 > 2:Int32) AND (3:Int32 < 1:Int32)) AND (4:Int32 <> 1:Int32)) OR ((2:Int32 >= 1:Int32) AND (1:Int32 <= 2:Int32))) }
StreamTableScan { table: t, columns: [_row_id#0], pk_indices: [0] }
StreamExchange { dist: HashShard([0]) }
StreamFilter { predicate: (1:Int32 = 1:Int32) AND ((((1:Int32 > 2:Int32) AND (3:Int32 < 1:Int32)) AND (4:Int32 <> 1:Int32)) OR ((2:Int32 >= 1:Int32) AND (1:Int32 <= 2:Int32))) }
StreamTableScan { table: t, columns: [_row_id#0], pk_indices: [0] }
- sql: |
create table t (v1 int);
select * from t where v1<1;
Expand All @@ -43,8 +45,9 @@
BatchScan { table: t, columns: [v1] }
stream_plan: |
StreamMaterialize { columns: [v1, _row_id#0(hidden)], pk_columns: [_row_id#0] }
StreamFilter { predicate: ($0 < 1:Int32) }
StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] }
StreamExchange { dist: HashShard([1]) }
StreamFilter { predicate: ($0 < 1:Int32) }
StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] }
- sql: |
create table t ();
select (((((false is not true) is true) is not false) is false) is not null) is null from t;
Expand Down Expand Up @@ -88,8 +91,9 @@
BatchScan { table: t, columns: [v1] }
stream_plan: |
StreamMaterialize { columns: [expr#0, _row_id#0(hidden)], pk_columns: [_row_id#0] }
StreamProject { exprs: [Case(($0 = 1:Int32), 1:Int32::Decimal, ($0 = 2:Int32), 2:Int32::Decimal, 0.0:Decimal), $1], expr_alias: [ , ] }
StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] }
StreamExchange { dist: HashShard([1]) }
StreamProject { exprs: [Case(($0 = 1:Int32), 1:Int32::Decimal, ($0 = 2:Int32), 2:Int32::Decimal, 0.0:Decimal), $1], expr_alias: [ , ] }
StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] }
- sql: |
select length(trim(trailing '1' from '12'))+length(trim(leading '2' from '23'))+length(trim(both '3' from '34'));
batch_plan: |
Expand Down
37 changes: 20 additions & 17 deletions src/frontend/test_runner/tests/testdata/basic_query_2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
BatchScan { table: t, columns: [v1] }
stream_plan: |
StreamMaterialize { columns: [v1, _row_id#0(hidden)], pk_columns: [_row_id#0] }
StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] }
StreamExchange { dist: HashShard([1]) }
StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] }
- sql: |
values(cast(1 as bigint));
batch_plan: |
Expand Down Expand Up @@ -59,16 +60,17 @@
BatchScan { table: t3, columns: [v1, v2] }
stream_plan: |
StreamMaterialize { columns: [t1_v1, t1_v2, t2_v1, t2_v2, t3_v1, t3_v2, _row_id#0(hidden), _row_id#1(hidden), _row_id#2(hidden)], pk_columns: [_row_id#0, _row_id#1, _row_id#2] }
StreamProject { exprs: [$0, $1, $3, $4, $6, $7, $2, $5, $8], expr_alias: [t1_v1, t1_v2, t2_v1, t2_v2, t3_v1, t3_v2, , , ] }
StreamHashJoin { type: Inner, predicate: $4 = $7 }
StreamExchange { dist: HashShard([4]) }
StreamHashJoin { type: Inner, predicate: $0 = $3 }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: t1, columns: [v1, v2, _row_id#0], pk_indices: [2] }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: t2, columns: [v1, v2, _row_id#0], pk_indices: [2] }
StreamExchange { dist: HashShard([1]) }
StreamTableScan { table: t3, columns: [v1, v2, _row_id#0], pk_indices: [2] }
StreamExchange { dist: HashShard([6, 7, 8]) }
StreamProject { exprs: [$0, $1, $3, $4, $6, $7, $2, $5, $8], expr_alias: [t1_v1, t1_v2, t2_v1, t2_v2, t3_v1, t3_v2, , , ] }
StreamHashJoin { type: Inner, predicate: $4 = $7 }
StreamExchange { dist: HashShard([4]) }
StreamHashJoin { type: Inner, predicate: $0 = $3 }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: t1, columns: [v1, v2, _row_id#0], pk_indices: [2] }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: t2, columns: [v1, v2, _row_id#0], pk_indices: [2] }
StreamExchange { dist: HashShard([1]) }
StreamTableScan { table: t3, columns: [v1, v2, _row_id#0], pk_indices: [2] }
- sql: |
create table t1 (v1 int not null, v2 int not null);
create table t2 (v1 int not null, v2 int not null);
Expand All @@ -83,12 +85,13 @@
BatchScan { table: t2, columns: [v1, v2] }
stream_plan: |
StreamMaterialize { columns: [t1_v2, t2_v2, _row_id#0(hidden), _row_id#1(hidden)], pk_columns: [_row_id#0, _row_id#1] }
StreamProject { exprs: [$1, $4, $2, $5], expr_alias: [t1_v2, t2_v2, , ] }
StreamHashJoin { type: Inner, predicate: $0 = $3 }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: t1, columns: [v1, v2, _row_id#0], pk_indices: [2] }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: t2, columns: [v1, v2, _row_id#0], pk_indices: [2] }
StreamExchange { dist: HashShard([2, 3]) }
StreamProject { exprs: [$1, $4, $2, $5], expr_alias: [t1_v2, t2_v2, , ] }
StreamHashJoin { type: Inner, predicate: $0 = $3 }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: t1, columns: [v1, v2, _row_id#0], pk_indices: [2] }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: t2, columns: [v1, v2, _row_id#0], pk_indices: [2] }
- sql: select 1
batch_plan: |
BatchProject { exprs: [1:Int32], expr_alias: [ ] }
Expand Down
28 changes: 15 additions & 13 deletions src/frontend/test_runner/tests/testdata/join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
LogicalScan { table: t3, columns: [_row_id#0, v5, v6] }
stream_plan: |
StreamMaterialize { columns: [v1, v2, _row_id#0(hidden), v3, v4, _row_id#1(hidden), v5, v6, _row_id#2(hidden)], pk_columns: [_row_id#0, _row_id#1, _row_id#2] }
StreamHashJoin { type: Inner, predicate: $0 = $6 }
StreamHashJoin { type: Inner, predicate: $0 = $3 }
StreamExchange { dist: HashShard([2, 5, 8]) }
StreamHashJoin { type: Inner, predicate: $0 = $6 }
StreamHashJoin { type: Inner, predicate: $0 = $3 }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: t1, columns: [v1, v2, _row_id#0], pk_indices: [2] }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: t2, columns: [v3, v4, _row_id#0], pk_indices: [2] }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: t1, columns: [v1, v2, _row_id#0], pk_indices: [2] }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: t2, columns: [v3, v4, _row_id#0], pk_indices: [2] }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: t3, columns: [v5, v6, _row_id#0], pk_indices: [2] }
StreamTableScan { table: t3, columns: [v5, v6, _row_id#0], pk_indices: [2] }
- sql: |
/* self join */
create table t (v1 int, v2 int);
Expand All @@ -32,9 +33,10 @@
LogicalScan { table: t, columns: [_row_id#0, v1, v2] }
stream_plan: |
StreamMaterialize { columns: [t1v1, t2v1, _row_id#0(hidden), _row_id#1(hidden)], pk_columns: [_row_id#0, _row_id#1] }
StreamProject { exprs: [$0, $2, $1, $3], expr_alias: [t1v1, t2v1, , ] }
StreamHashJoin { type: Inner, predicate: $0 = $2 }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] }
StreamExchange { dist: HashShard([2, 3]) }
StreamProject { exprs: [$0, $2, $1, $3], expr_alias: [t1v1, t2v1, , ] }
StreamHashJoin { type: Inner, predicate: $0 = $2 }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] }
13 changes: 7 additions & 6 deletions src/frontend/test_runner/tests/testdata/mv_on_mv.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
select m1.v1 as m1v1, m1.v2 as m1v2, m2.v1 as m2v1, m2.v2 as m2v2 from m1 join m2 on m1.v1 = m2.v1;
stream_plan: |
StreamMaterialize { columns: [m1v1, m1v2, m2v1, m2v2, _row_id#0(hidden), _row_id#1(hidden)], pk_columns: [_row_id#0, _row_id#1] }
StreamProject { exprs: [$0, $1, $3, $4, $2, $5], expr_alias: [m1v1, m1v2, m2v1, m2v2, , ] }
StreamHashJoin { type: Inner, predicate: $0 = $3 }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: m1, columns: [v1, v2, _row_id#0], pk_indices: [2] }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: m2, columns: [v1, v2, _row_id#0], pk_indices: [2] }
StreamExchange { dist: HashShard([4, 5]) }
StreamProject { exprs: [$0, $1, $3, $4, $2, $5], expr_alias: [m1v1, m1v2, m2v1, m2v2, , ] }
StreamHashJoin { type: Inner, predicate: $0 = $3 }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: m1, columns: [v1, v2, _row_id#0], pk_indices: [2] }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: m2, columns: [v1, v2, _row_id#0], pk_indices: [2] }
Loading

0 comments on commit 754815e

Please sign in to comment.