From bbf4268ff5d396496326400bf9db6b15233b66fb Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Thu, 24 Mar 2022 13:48:28 +0800 Subject: [PATCH 1/3] open rewrite logical for stream --- rust/frontend/src/handler/create_mv.rs | 2 +- rust/frontend/src/optimizer/mod.rs | 19 ++++++++++++++++--- rust/frontend/test_runner/src/lib.rs | 2 +- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/rust/frontend/src/handler/create_mv.rs b/rust/frontend/src/handler/create_mv.rs index d3bd4b8407afc..a505cc70728ca 100644 --- a/rust/frontend/src/handler/create_mv.rs +++ b/rust/frontend/src/handler/create_mv.rs @@ -65,7 +65,7 @@ pub fn gen_create_mv_plan( let column_descs = bound_query.gen_create_mv_column_desc(); - let logical = planner.plan_query(bound_query)?; + let mut logical = planner.plan_query(bound_query)?; let plan = logical.gen_create_mv_plan(order, column_descs.iter().map(|x| x.column_id).collect()); diff --git a/rust/frontend/src/optimizer/mod.rs b/rust/frontend/src/optimizer/mod.rs index b3070f96eb593..c54b49d9cf843 100644 --- a/rust/frontend/src/optimizer/mod.rs +++ b/rust/frontend/src/optimizer/mod.rs @@ -154,9 +154,22 @@ impl PlanRoot { } /// optimize and generate a create materialize view plan - pub fn gen_create_mv_plan(&self, order: Vec, column_ids: Vec) -> PlanRef { - let plan = self.gen_optimized_logical_plan(); - + pub fn gen_create_mv_plan( + &mut self, + order: Vec, + column_ids: Vec, + ) -> PlanRef { + let mut plan = self.gen_optimized_logical_plan(); + plan = { + let (plan, mut out_col_change) = plan.logical_rewrite_for_stream(); + self.required_dist = out_col_change.rewrite_distribution(self.required_dist.clone()); + self.required_order = out_col_change.rewrite_order(self.required_order.clone()); + self.out_fields = out_col_change.rewrite_bitset(&self.out_fields); + self.schema = plan.schema().clone(); + plan + }; + // Convert to physical plan node + let plan = plan.to_stream_with_dist_required(&self.required_dist); // Ignore the required_dist and required_order, as they are provided by user now. // TODO: need more thinking and refactor. diff --git a/rust/frontend/test_runner/src/lib.rs b/rust/frontend/test_runner/src/lib.rs index c0d7b35a1ebeb..5273e1957d6a4 100644 --- a/rust/frontend/test_runner/src/lib.rs +++ b/rust/frontend/test_runner/src/lib.rs @@ -206,7 +206,7 @@ impl TestCase { column_descs = None; } - let logical_plan = match Planner::new(context).plan(bound) { + let mut logical_plan = match Planner::new(context).plan(bound) { Ok(logical_plan) => { if self.logical_plan.is_some() { ret.logical_plan = Some(explain_plan(&logical_plan.clone().as_subplan())); From 17b0cfa982b923a8dcac0fbfb2975a849df18b27 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Thu, 24 Mar 2022 13:51:38 +0800 Subject: [PATCH 2/3] ix --- rust/frontend/src/optimizer/mod.rs | 2 -- rust/frontend/src/optimizer/plan_node/logical_project.rs | 3 +-- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/rust/frontend/src/optimizer/mod.rs b/rust/frontend/src/optimizer/mod.rs index c54b49d9cf843..84d385c223169 100644 --- a/rust/frontend/src/optimizer/mod.rs +++ b/rust/frontend/src/optimizer/mod.rs @@ -168,8 +168,6 @@ impl PlanRoot { self.schema = plan.schema().clone(); plan }; - // Convert to physical plan node - let plan = plan.to_stream_with_dist_required(&self.required_dist); // Ignore the required_dist and required_order, as they are provided by user now. // TODO: need more thinking and refactor. diff --git a/rust/frontend/src/optimizer/plan_node/logical_project.rs b/rust/frontend/src/optimizer/plan_node/logical_project.rs index b3d7d5562df14..d967432264d70 100644 --- a/rust/frontend/src/optimizer/plan_node/logical_project.rs +++ b/rust/frontend/src/optimizer/plan_node/logical_project.rs @@ -265,7 +265,6 @@ impl ToStream for LogicalProject { let (input, input_col_change) = self.input.logical_rewrite_for_stream(); let (proj, out_col_change) = self.rewrite_with_input(input.clone(), input_col_change); let input_pk = input.pk_indices(); - assert!(!input_pk.is_empty()); let i2o = Self::i2o_col_mapping(input.schema().len(), proj.exprs()); let col_need_to_add = input_pk.iter().cloned().filter(|i| i2o.try_map(*i) == None); let input_schema = input.schema(); @@ -273,7 +272,7 @@ impl ToStream for LogicalProject { .exprs() .iter() .cloned() - .zip_eq(self.expr_alias().iter().cloned()) + .zip_eq(proj.expr_alias().iter().cloned()) .map(|(a, b)| (a, b)) .chain(col_need_to_add.map(|idx| { ( From cd3da1228d9249ff9db6ad08abfe3d5e4d03bd8a Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Thu, 24 Mar 2022 13:55:43 +0800 Subject: [PATCH 3/3] fix ut --- .../tests/testdata/basic_query_1.yaml | 8 +-- .../tests/testdata/basic_query_2.yaml | 8 +-- .../tests/testdata/stream_proto.yaml | 68 ++++++++++++------- .../test_runner/tests/testdata/tpch.yaml | 10 +-- 4 files changed, 53 insertions(+), 41 deletions(-) diff --git a/rust/frontend/test_runner/tests/testdata/basic_query_1.yaml b/rust/frontend/test_runner/tests/testdata/basic_query_1.yaml index 0a7572d55f4a0..11078600a3edc 100644 --- a/rust/frontend/test_runner/tests/testdata/basic_query_1.yaml +++ b/rust/frontend/test_runner/tests/testdata/basic_query_1.yaml @@ -52,10 +52,6 @@ BatchExchange { order: [], dist: Single } BatchProject { exprs: [IsNull(IsNotNull(IsFalse(IsNotFalse(IsTrue(IsNotTrue(false:Boolean))))))], expr_alias: [ ] } BatchScan { table: t, columns: [] } - stream_plan: | - StreamMaterialize { table_id: 0, column_order: [], column_id: [#0], pk_indices: [] } - StreamProject { exprs: [IsNull(IsNotNull(IsFalse(IsNotFalse(IsTrue(IsNotTrue(false:Boolean))))))], expr_alias: [ ] } - StreamTableScan { table: t, columns: [], pk_indices: [0] } - sql: | create table t (v1 int); select (case when v1=1 then 1 when v1=2 then 2 else 0.0 end) from t; @@ -64,8 +60,8 @@ BatchProject { exprs: [Case(($0 = 1:Int32), 1:Int32::Decimal, ($0 = 2:Int32), 2:Int32::Decimal, Normalized(0.0):Decimal)], expr_alias: [ ] } BatchScan { table: t, columns: [v1] } stream_plan: | - StreamMaterialize { table_id: 0, column_order: [], column_id: [#0], pk_indices: [] } - StreamProject { exprs: [Case(($0 = 1:Int32), 1:Int32::Decimal, ($0 = 2:Int32), 2:Int32::Decimal, Normalized(0.0):Decimal)], expr_alias: [ ] } + StreamMaterialize { table_id: 0, column_order: [$1 ASC], column_id: [#0], pk_indices: [1] } + StreamProject { exprs: [Case(($0 = 1:Int32), 1:Int32::Decimal, ($0 = 2:Int32), 2:Int32::Decimal, Normalized(0.0):Decimal), $0], expr_alias: [ , ] } StreamTableScan { table: t, columns: [v1], pk_indices: [0] } - sql: | select length(trim(trailing '1' from '12'))+length(trim(leading '2' from '23'))+length(trim(both '3' from '34')); diff --git a/rust/frontend/test_runner/tests/testdata/basic_query_2.yaml b/rust/frontend/test_runner/tests/testdata/basic_query_2.yaml index 5fa0029435db9..5c5863ea2f481 100644 --- a/rust/frontend/test_runner/tests/testdata/basic_query_2.yaml +++ b/rust/frontend/test_runner/tests/testdata/basic_query_2.yaml @@ -86,8 +86,8 @@ BatchExchange { order: [], dist: HashShard([2]) } BatchScan { table: t2, columns: [v1, v2] } stream_plan: | - StreamMaterialize { table_id: 0, column_order: [], column_id: [#0, #1], pk_indices: [] } - StreamProject { exprs: [$1, $3], expr_alias: [v2, v2] } + StreamMaterialize { table_id: 0, column_order: [$2 ASC, $3 ASC], column_id: [#0, #1], pk_indices: [2, 3] } + StreamProject { exprs: [$1, $3, $0, $2], expr_alias: [v2, v2, , ] } StreamHashJoin { type: Inner, predicate: $0 = $2 } StreamExchange { dist: HashShard([0]) } StreamTableScan { table: t1, columns: [v1, v2], pk_indices: [0] } @@ -126,8 +126,8 @@ BatchProject { exprs: [$0, $1, $2], expr_alias: [ , , ] } BatchScan { table: t, columns: [v1, v2, v3] } stream_plan: | - StreamMaterialize { table_id: 0, column_order: [], column_id: [#0], pk_indices: [] } - StreamProject { exprs: [($0 + ($1 * $2))], expr_alias: [ ] } + StreamMaterialize { table_id: 0, column_order: [$1 ASC, $2 ASC, $3 ASC], column_id: [#0], pk_indices: [1, 2, 3] } + StreamProject { exprs: [($0 + ($1 * $2)), $0, $1, $2], expr_alias: [ , , , ] } StreamSimpleAgg { aggs: [min($0), max($1), count($2)] } StreamProject { exprs: [$0, $1, $2], expr_alias: [ , , ] } StreamTableScan { table: t, columns: [v1, v2, v3], pk_indices: [0] } diff --git a/rust/frontend/test_runner/tests/testdata/stream_proto.yaml b/rust/frontend/test_runner/tests/testdata/stream_proto.yaml index 8cc0171740eee..8ee956f2e1afe 100644 --- a/rust/frontend/test_runner/tests/testdata/stream_proto.yaml +++ b/rust/frontend/test_runner/tests/testdata/stream_proto.yaml @@ -4,16 +4,16 @@ select * from t where v1<1; stream_plan_proto: | --- - operatorId: "14" + operatorId: "17" input: - - operatorId: "13" + - operatorId: "16" input: - - operatorId: "11" + - operatorId: "14" input: - - operatorId: "9" + - operatorId: "12" input: - mergeNode: {} - - operatorId: "8" + - operatorId: "11" pkIndices: - 0 batchPlanNode: @@ -87,14 +87,14 @@ select * from t; stream_plan_proto: | --- - operatorId: "9" + operatorId: "11" input: - - operatorId: "8" + - operatorId: "10" input: - - operatorId: "6" + - operatorId: "8" input: - mergeNode: {} - - operatorId: "5" + - operatorId: "7" pkIndices: - 0 batchPlanNode: @@ -146,14 +146,14 @@ select v1 from t; stream_plan_proto: | --- - operatorId: "9" + operatorId: "11" input: - - operatorId: "8" + - operatorId: "10" input: - - operatorId: "6" + - operatorId: "8" input: - mergeNode: {} - - operatorId: "5" + - operatorId: "7" pkIndices: - 0 batchPlanNode: @@ -194,18 +194,18 @@ select sum(v1) from t; stream_plan_proto: | --- - operatorId: "19" + operatorId: "24" input: - - operatorId: "18" + - operatorId: "23" input: - - operatorId: "16" + - operatorId: "21" input: - - operatorId: "14" + - operatorId: "19" input: - - operatorId: "12" + - operatorId: "17" input: - mergeNode: {} - - operatorId: "11" + - operatorId: "16" pkIndices: - 0 batchPlanNode: @@ -268,20 +268,20 @@ select sum(v1) from t group by v2; stream_plan_proto: | --- - operatorId: "21" + operatorId: "26" input: - - operatorId: "20" + - operatorId: "25" input: - - operatorId: "18" + - operatorId: "23" input: - - operatorId: "16" + - operatorId: "21" input: - - operatorId: "14" + - operatorId: "19" input: - - operatorId: "13" + - operatorId: "18" input: - mergeNode: {} - - operatorId: "12" + - operatorId: "17" pkIndices: - 0 batchPlanNode: @@ -348,6 +348,8 @@ returnType: typeName: INT64 isNullable: true + pkIndices: + - 1 projectNode: selectList: - exprType: INPUT_REF @@ -356,6 +358,20 @@ isNullable: true inputRef: columnIdx: 1 + - exprType: INPUT_REF + returnType: + typeName: INT32 + isNullable: true + inputRef: {} + pkIndices: + - 1 materializeNode: + columnOrders: + - orderType: ASCENDING + inputRef: + columnIdx: 1 + returnType: + typeName: INT32 + isNullable: true columnIds: - 0 diff --git a/rust/frontend/test_runner/tests/testdata/tpch.yaml b/rust/frontend/test_runner/tests/testdata/tpch.yaml index 09f6a7d544e07..7d3466dbb115b 100644 --- a/rust/frontend/test_runner/tests/testdata/tpch.yaml +++ b/rust/frontend/test_runner/tests/testdata/tpch.yaml @@ -134,10 +134,10 @@ StreamMaterialize { table_id: 0, column_order: [$1 DESC, $2 ASC, $0 ASC, $3 ASC], column_id: [#0, #1, #2, #3], pk_indices: [0, 2, 3] } StreamProject { exprs: [$0, $3, $1, $2], expr_alias: [l_orderkey, revenue, o_orderdate, o_shippriority] } StreamHashAgg { group_keys: [$0, $1, $2], aggs: [sum($3)] } - StreamProject { exprs: [$3, $1, $2, ($4 * (1:Int32 - $5))], expr_alias: [ , , , ] } + StreamProject { exprs: [$4, $1, $2, ($5 * (1:Int32 - $6)), $3, $0], expr_alias: [ , , , , , ] } StreamExchange { dist: HashShard([0, 1, 2]) } - StreamHashJoin { type: Inner, predicate: $0 = $3 } - StreamProject { exprs: [$1, $3, $4], expr_alias: [ , , ] } + StreamHashJoin { type: Inner, predicate: $0 = $4 } + StreamProject { exprs: [$1, $3, $4, $0], expr_alias: [ , , , ] } StreamExchange { dist: HashShard([0]) } StreamHashJoin { type: Inner, predicate: $0 = $2 } StreamProject { exprs: [$0], expr_alias: [ ] } @@ -147,7 +147,7 @@ StreamExchange { dist: HashShard([2]) } StreamFilter { predicate: } StreamTableScan { table: orders, columns: [o_orderkey, o_custkey, o_orderdate, o_shippriority], pk_indices: [0] } - StreamExchange { dist: HashShard([3]) } + StreamExchange { dist: HashShard([4]) } StreamFilter { predicate: } StreamTableScan { table: lineitem, columns: [l_orderkey, l_extendedprice, l_discount], pk_indices: [0] } - id: tpch_q6 @@ -175,6 +175,6 @@ StreamMaterialize { table_id: 0, column_order: [$0 ASC], column_id: [#0], pk_indices: [0] } StreamProject { exprs: [$0], expr_alias: [revenue] } StreamSimpleAgg { aggs: [sum($0)] } - StreamProject { exprs: [($1 * $2)], expr_alias: [ ] } + StreamProject { exprs: [($1 * $2), $0], expr_alias: [ , ] } StreamFilter { predicate: ($0 < 24:Int32) } StreamTableScan { table: lineitem, columns: [l_quantity, l_extendedprice, l_discount], pk_indices: [0] }