Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(optimizer): turn on logical rewrite for stream #1231

Merged
merged 3 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub fn gen_create_mv_plan(

let column_descs = bound_query.gen_create_mv_column_desc();

let logical = planner.plan_query(bound_query)?;
let mut logical = planner.plan_query(bound_query)?;

let plan =
logical.gen_create_mv_plan(order, column_descs.iter().map(|x| x.column_id).collect());
Expand Down
17 changes: 14 additions & 3 deletions rust/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,20 @@ impl PlanRoot {
}

/// optimize and generate a create materialize view plan
pub fn gen_create_mv_plan(&self, order: Vec<FieldOrder>, column_ids: Vec<ColumnId>) -> PlanRef {
let plan = self.gen_optimized_logical_plan();

pub fn gen_create_mv_plan(
&mut self,
order: Vec<FieldOrder>,
column_ids: Vec<ColumnId>,
) -> PlanRef {
let mut plan = self.gen_optimized_logical_plan();
plan = {
let (plan, mut out_col_change) = plan.logical_rewrite_for_stream();
self.required_dist = out_col_change.rewrite_distribution(self.required_dist.clone());
self.required_order = out_col_change.rewrite_order(self.required_order.clone());
self.out_fields = out_col_change.rewrite_bitset(&self.out_fields);
self.schema = plan.schema().clone();
plan
};
// Ignore the required_dist and required_order, as they are provided by user now.
// TODO: need more thinking and refactor.

Expand Down
3 changes: 1 addition & 2 deletions rust/frontend/src/optimizer/plan_node/logical_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,15 +265,14 @@ impl ToStream for LogicalProject {
let (input, input_col_change) = self.input.logical_rewrite_for_stream();
let (proj, out_col_change) = self.rewrite_with_input(input.clone(), input_col_change);
let input_pk = input.pk_indices();
assert!(!input_pk.is_empty());
let i2o = Self::i2o_col_mapping(input.schema().len(), proj.exprs());
let col_need_to_add = input_pk.iter().cloned().filter(|i| i2o.try_map(*i) == None);
let input_schema = input.schema();
let (exprs, expr_alias) = proj
.exprs()
.iter()
.cloned()
.zip_eq(self.expr_alias().iter().cloned())
.zip_eq(proj.expr_alias().iter().cloned())
.map(|(a, b)| (a, b))
.chain(col_need_to_add.map(|idx| {
(
Expand Down
2 changes: 1 addition & 1 deletion rust/frontend/test_runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl TestCase {
column_descs = None;
}

let logical_plan = match Planner::new(context).plan(bound) {
let mut logical_plan = match Planner::new(context).plan(bound) {
Ok(logical_plan) => {
if self.logical_plan.is_some() {
ret.logical_plan = Some(explain_plan(&logical_plan.clone().as_subplan()));
Expand Down
8 changes: 2 additions & 6 deletions rust/frontend/test_runner/tests/testdata/basic_query_1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@
BatchExchange { order: [], dist: Single }
BatchProject { exprs: [IsNull(IsNotNull(IsFalse(IsNotFalse(IsTrue(IsNotTrue(false:Boolean))))))], expr_alias: [ ] }
BatchScan { table: t, columns: [] }
stream_plan: |
StreamMaterialize { table_id: 0, column_order: [], column_id: [#0], pk_indices: [] }
StreamProject { exprs: [IsNull(IsNotNull(IsFalse(IsNotFalse(IsTrue(IsNotTrue(false:Boolean))))))], expr_alias: [ ] }
StreamTableScan { table: t, columns: [], pk_indices: [0] }
- sql: |
create table t (v1 int);
select (case when v1=1 then 1 when v1=2 then 2 else 0.0 end) from t;
Expand All @@ -64,8 +60,8 @@
BatchProject { exprs: [Case(($0 = 1:Int32), 1:Int32::Decimal, ($0 = 2:Int32), 2:Int32::Decimal, Normalized(0.0):Decimal)], expr_alias: [ ] }
BatchScan { table: t, columns: [v1] }
stream_plan: |
StreamMaterialize { table_id: 0, column_order: [], column_id: [#0], pk_indices: [] }
StreamProject { exprs: [Case(($0 = 1:Int32), 1:Int32::Decimal, ($0 = 2:Int32), 2:Int32::Decimal, Normalized(0.0):Decimal)], expr_alias: [ ] }
StreamMaterialize { table_id: 0, column_order: [$1 ASC], column_id: [#0], pk_indices: [1] }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe out of the scope of this PR. Is pk_indices redundant with column_order? In another words, is it guaranteed that column_order must contain all PK columns with same order?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes... we can refactor it later

Copy link
Member

@fuyufjh fuyufjh Mar 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamProject { exprs: [Case(($0 = 1:Int32), 1:Int32::Decimal, ($0 = 2:Int32), 2:Int32::Decimal, Normalized(0.0):Decimal), $0], expr_alias: [ , ] }
StreamTableScan { table: t, columns: [v1], pk_indices: [0] }
- sql: |
select length(trim(trailing '1' from '12'))+length(trim(leading '2' from '23'))+length(trim(both '3' from '34'));
Expand Down
8 changes: 4 additions & 4 deletions rust/frontend/test_runner/tests/testdata/basic_query_2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@
BatchExchange { order: [], dist: HashShard([2]) }
BatchScan { table: t2, columns: [v1, v2] }
stream_plan: |
StreamMaterialize { table_id: 0, column_order: [], column_id: [#0, #1], pk_indices: [] }
StreamProject { exprs: [$1, $3], expr_alias: [v2, v2] }
StreamMaterialize { table_id: 0, column_order: [$2 ASC, $3 ASC], column_id: [#0, #1], pk_indices: [2, 3] }
StreamProject { exprs: [$1, $3, $0, $2], expr_alias: [v2, v2, , ] }
StreamHashJoin { type: Inner, predicate: $0 = $2 }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: t1, columns: [v1, v2], pk_indices: [0] }
Expand Down Expand Up @@ -126,8 +126,8 @@
BatchProject { exprs: [$0, $1, $2], expr_alias: [ , , ] }
BatchScan { table: t, columns: [v1, v2, v3] }
stream_plan: |
StreamMaterialize { table_id: 0, column_order: [], column_id: [#0], pk_indices: [] }
StreamProject { exprs: [($0 + ($1 * $2))], expr_alias: [ ] }
StreamMaterialize { table_id: 0, column_order: [$1 ASC, $2 ASC, $3 ASC], column_id: [#0], pk_indices: [1, 2, 3] }
StreamProject { exprs: [($0 + ($1 * $2)), $0, $1, $2], expr_alias: [ , , , ] }
StreamSimpleAgg { aggs: [min($0), max($1), count($2)] }
StreamProject { exprs: [$0, $1, $2], expr_alias: [ , , ] }
StreamTableScan { table: t, columns: [v1, v2, v3], pk_indices: [0] }
Expand Down
68 changes: 42 additions & 26 deletions rust/frontend/test_runner/tests/testdata/stream_proto.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
select * from t where v1<1;
stream_plan_proto: |
---
operatorId: "14"
operatorId: "17"
input:
- operatorId: "13"
- operatorId: "16"
input:
- operatorId: "11"
- operatorId: "14"
input:
- operatorId: "9"
- operatorId: "12"
input:
- mergeNode: {}
- operatorId: "8"
- operatorId: "11"
pkIndices:
- 0
batchPlanNode:
Expand Down Expand Up @@ -87,14 +87,14 @@
select * from t;
stream_plan_proto: |
---
operatorId: "9"
operatorId: "11"
input:
- operatorId: "8"
- operatorId: "10"
input:
- operatorId: "6"
- operatorId: "8"
input:
- mergeNode: {}
- operatorId: "5"
- operatorId: "7"
pkIndices:
- 0
batchPlanNode:
Expand Down Expand Up @@ -146,14 +146,14 @@
select v1 from t;
stream_plan_proto: |
---
operatorId: "9"
operatorId: "11"
input:
- operatorId: "8"
- operatorId: "10"
input:
- operatorId: "6"
- operatorId: "8"
input:
- mergeNode: {}
- operatorId: "5"
- operatorId: "7"
pkIndices:
- 0
batchPlanNode:
Expand Down Expand Up @@ -194,18 +194,18 @@
select sum(v1) from t;
stream_plan_proto: |
---
operatorId: "19"
operatorId: "24"
input:
- operatorId: "18"
- operatorId: "23"
input:
- operatorId: "16"
- operatorId: "21"
input:
- operatorId: "14"
- operatorId: "19"
input:
- operatorId: "12"
- operatorId: "17"
input:
- mergeNode: {}
- operatorId: "11"
- operatorId: "16"
pkIndices:
- 0
batchPlanNode:
Expand Down Expand Up @@ -268,20 +268,20 @@
select sum(v1) from t group by v2;
stream_plan_proto: |
---
operatorId: "21"
operatorId: "26"
input:
- operatorId: "20"
- operatorId: "25"
input:
- operatorId: "18"
- operatorId: "23"
input:
- operatorId: "16"
- operatorId: "21"
input:
- operatorId: "14"
- operatorId: "19"
input:
- operatorId: "13"
- operatorId: "18"
input:
- mergeNode: {}
- operatorId: "12"
- operatorId: "17"
pkIndices:
- 0
batchPlanNode:
Expand Down Expand Up @@ -348,6 +348,8 @@
returnType:
typeName: INT64
isNullable: true
pkIndices:
- 1
projectNode:
selectList:
- exprType: INPUT_REF
Expand All @@ -356,6 +358,20 @@
isNullable: true
inputRef:
columnIdx: 1
- exprType: INPUT_REF
returnType:
typeName: INT32
isNullable: true
inputRef: {}
pkIndices:
- 1
materializeNode:
columnOrders:
- orderType: ASCENDING
inputRef:
columnIdx: 1
returnType:
typeName: INT32
isNullable: true
columnIds:
- 0
10 changes: 5 additions & 5 deletions rust/frontend/test_runner/tests/testdata/tpch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,10 @@
StreamMaterialize { table_id: 0, column_order: [$1 DESC, $2 ASC, $0 ASC, $3 ASC], column_id: [#0, #1, #2, #3], pk_indices: [0, 2, 3] }
StreamProject { exprs: [$0, $3, $1, $2], expr_alias: [l_orderkey, revenue, o_orderdate, o_shippriority] }
StreamHashAgg { group_keys: [$0, $1, $2], aggs: [sum($3)] }
StreamProject { exprs: [$3, $1, $2, ($4 * (1:Int32 - $5))], expr_alias: [ , , , ] }
StreamProject { exprs: [$4, $1, $2, ($5 * (1:Int32 - $6)), $3, $0], expr_alias: [ , , , , , ] }
StreamExchange { dist: HashShard([0, 1, 2]) }
StreamHashJoin { type: Inner, predicate: $0 = $3 }
StreamProject { exprs: [$1, $3, $4], expr_alias: [ , , ] }
StreamHashJoin { type: Inner, predicate: $0 = $4 }
StreamProject { exprs: [$1, $3, $4, $0], expr_alias: [ , , , ] }
StreamExchange { dist: HashShard([0]) }
StreamHashJoin { type: Inner, predicate: $0 = $2 }
StreamProject { exprs: [$0], expr_alias: [ ] }
Expand All @@ -147,7 +147,7 @@
StreamExchange { dist: HashShard([2]) }
StreamFilter { predicate: }
StreamTableScan { table: orders, columns: [o_orderkey, o_custkey, o_orderdate, o_shippriority], pk_indices: [0] }
StreamExchange { dist: HashShard([3]) }
StreamExchange { dist: HashShard([4]) }
StreamFilter { predicate: }
StreamTableScan { table: lineitem, columns: [l_orderkey, l_extendedprice, l_discount], pk_indices: [0] }
- id: tpch_q6
Expand Down Expand Up @@ -175,6 +175,6 @@
StreamMaterialize { table_id: 0, column_order: [$0 ASC], column_id: [#0], pk_indices: [0] }
StreamProject { exprs: [$0], expr_alias: [revenue] }
StreamSimpleAgg { aggs: [sum($0)] }
StreamProject { exprs: [($1 * $2)], expr_alias: [ ] }
StreamProject { exprs: [($1 * $2), $0], expr_alias: [ , ] }
StreamFilter { predicate: ($0 < 24:Int32) }
StreamTableScan { table: lineitem, columns: [l_quantity, l_extendedprice, l_discount], pk_indices: [0] }