From 98ea76a6b92001c29ee3cac8f6843c669636f824 Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Fri, 24 Mar 2023 16:57:19 +0800 Subject: [PATCH] feat(optimizer): Add `StreamProjectMergeRule` (#8753) Co-authored-by: lmatz --- src/frontend/planner_test/src/lib.rs | 41 ++++- .../planner_test/tests/testdata/agg.yaml | 25 ++- .../tests/testdata/bushy_join.yaml | 109 ++++++------ .../planner_test/tests/testdata/nexmark.yaml | 102 +++++++---- .../tests/testdata/nexmark_source.yaml | 90 +++++----- .../planner_test/tests/testdata/tpch.yaml | 164 +++++++++--------- src/frontend/src/optimizer/mod.rs | 111 +++++++----- src/frontend/src/optimizer/rule/mod.rs | 2 + src/frontend/src/optimizer/rule/stream/mod.rs | 1 + .../rule/stream/stream_project_merge_rule.rs | 46 +++++ 10 files changed, 405 insertions(+), 286 deletions(-) create mode 100644 src/frontend/src/optimizer/rule/stream/stream_project_merge_rule.rs diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index 6bcf508359bca..a13f99e699231 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -19,7 +19,7 @@ mod resolve_id; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::path::Path; use std::sync::Arc; @@ -33,7 +33,7 @@ use risingwave_frontend::session::SessionImpl; use risingwave_frontend::test_utils::{create_proto_file, get_explain_output, LocalFrontend}; use risingwave_frontend::{ build_graph, explain_stream_graph, Binder, Explain, FrontendOpts, OptimizerContext, - OptimizerContextRef, PlanRef, Planner, + OptimizerContextRef, PlanRef, Planner, WithOptions, }; use risingwave_sqlparser::ast::{ExplainOptions, ObjectName, Statement}; use risingwave_sqlparser::parser::Parser; @@ -83,6 +83,10 @@ pub struct TestCase { /// Batch plan for local execution `.gen_batch_local_plan()` pub batch_local_plan: Option, + /// Create sink plan (assumes blackhole sink) + /// TODO: Other sinks + pub sink_plan: Option, + /// Create MV plan `.gen_create_mv_plan()` pub stream_plan: Option, @@ -152,6 +156,9 @@ pub struct TestCaseResult { /// Batch plan for local execution `.gen_batch_local_plan()` pub batch_local_plan: Option, + /// Generate sink plan + pub sink_plan: Option, + /// Create MV plan `.gen_create_mv_plan()` pub stream_plan: Option, @@ -176,6 +183,9 @@ pub struct TestCaseResult { /// Error of `.gen_stream_plan()` pub stream_error: Option, + /// Error of `.gen_sink_plan()` + pub sink_error: Option, + /// The result of an `EXPLAIN` statement. /// /// This field is used when `sql` is an `EXPLAIN` statement. @@ -209,6 +219,7 @@ impl TestCaseResult { batch_plan: self.batch_plan, batch_local_plan: self.batch_local_plan, stream_plan: self.stream_plan, + sink_plan: self.sink_plan, batch_plan_proto: self.batch_plan_proto, planner_error: self.planner_error, optimizer_error: self.optimizer_error, @@ -640,6 +651,30 @@ impl TestCase { } } + 'sink: { + if self.sink_plan.is_some() { + let sink_name = "sink_test"; + let mut options = HashMap::new(); + options.insert("connector".to_string(), "blackhole".to_string()); + options.insert("type".to_string(), "append-only".to_string()); + let options = WithOptions::new(options); + match logical_plan.gen_sink_plan( + sink_name.to_string(), + format!("CREATE SINK {sink_name} AS {}", stmt), + options, + ) { + Ok(sink_plan) => { + ret.sink_plan = Some(explain_plan(&sink_plan.into())); + break 'sink; + } + Err(err) => { + ret.sink_error = Some(err.to_string()); + break 'sink; + } + } + } + } + Ok(ret) } } @@ -696,7 +731,7 @@ fn check_result(expected: &TestCase, actual: &TestCaseResult) -> Result<()> { &expected.explain_output, &actual.explain_output, )?; - + check_option_plan_eq("sink_plan", &expected.sink_plan, &actual.sink_plan)?; Ok(()) } diff --git a/src/frontend/planner_test/tests/testdata/agg.yaml b/src/frontend/planner_test/tests/testdata/agg.yaml index 2faa95ab79834..7124471263394 100644 --- a/src/frontend/planner_test/tests/testdata/agg.yaml +++ b/src/frontend/planner_test/tests/testdata/agg.yaml @@ -577,9 +577,8 @@ └─StreamGlobalSimpleAgg { aggs: [max(max($expr1) filter((t.a < t.b) AND ((t.a + t.b) < 100:Int32) AND ((t.a * t.b) <> ((t.a + t.b) - 1:Int32)))), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [$expr2], aggs: [max($expr1) filter((t.a < t.b) AND ((t.a + t.b) < 100:Int32) AND ((t.a * t.b) <> ((t.a + t.b) - 1:Int32))), count] } - └─StreamProject { exprs: [t.a, t.b, $expr1, t._row_id, Vnode(t._row_id) as $expr2] } - └─StreamProject { exprs: [t.a, t.b, (t.a * t.b) as $expr1, t._row_id] } - └─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProject { exprs: [t.a, t.b, (t.a * t.b) as $expr1, t._row_id, Vnode(t._row_id) as $expr2] } + └─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } - name: avg filter clause + group by sql: | create table t(a int, b int); @@ -1139,9 +1138,8 @@ └─StreamExchange { dist: HashShard(lineitem.l_commitdate) } └─StreamHashAgg { group_key: [lineitem.l_commitdate, $expr1], aggs: [max(lineitem.l_commitdate), count] } └─StreamProject { exprs: [lineitem.l_tax, lineitem.l_shipinstruct, lineitem.l_orderkey, lineitem.l_commitdate, Vnode(lineitem.l_orderkey) as $expr1] } - └─StreamProject { exprs: [lineitem.l_tax, lineitem.l_shipinstruct, lineitem.l_orderkey, lineitem.l_commitdate] } - └─StreamHashAgg { group_key: [lineitem.l_tax, lineitem.l_shipinstruct, lineitem.l_orderkey, lineitem.l_commitdate], aggs: [count] } - └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_tax, lineitem.l_commitdate, lineitem.l_shipinstruct], pk: [lineitem.l_orderkey], dist: UpstreamHashShard(lineitem.l_orderkey) } + └─StreamHashAgg { group_key: [lineitem.l_tax, lineitem.l_shipinstruct, lineitem.l_orderkey, lineitem.l_commitdate], aggs: [count] } + └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_tax, lineitem.l_commitdate, lineitem.l_shipinstruct], pk: [lineitem.l_orderkey], dist: UpstreamHashShard(lineitem.l_orderkey) } - name: two phase agg on hop window input should use two phase agg sql: | SET QUERY_MODE TO DISTRIBUTED; @@ -1180,14 +1178,13 @@ └─StreamExchange { dist: HashShard(window_start) } └─StreamHashAgg { group_key: [window_start, $expr2], aggs: [max(sum0(count)), count] } └─StreamProject { exprs: [bid.auction, window_start, sum0(count), Vnode(bid.auction, window_start) as $expr2] } - └─StreamProject { exprs: [bid.auction, window_start, sum0(count)] } - └─StreamHashAgg { group_key: [bid.auction, window_start], aggs: [sum0(count), count] } - └─StreamExchange { dist: HashShard(bid.auction, window_start) } - └─StreamHashAgg { group_key: [bid.auction, window_start, $expr1], aggs: [count] } - └─StreamProject { exprs: [bid.auction, window_start, bid._row_id, Vnode(bid._row_id) as $expr1] } - └─StreamHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start, bid._row_id] } - └─StreamFilter { predicate: IsNotNull(bid.date_time) } - └─StreamTableScan { table: bid, columns: [bid.date_time, bid.auction, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } + └─StreamHashAgg { group_key: [bid.auction, window_start], aggs: [sum0(count), count] } + └─StreamExchange { dist: HashShard(bid.auction, window_start) } + └─StreamHashAgg { group_key: [bid.auction, window_start, $expr1], aggs: [count] } + └─StreamProject { exprs: [bid.auction, window_start, bid._row_id, Vnode(bid._row_id) as $expr1] } + └─StreamHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start, bid._row_id] } + └─StreamFilter { predicate: IsNotNull(bid.date_time) } + └─StreamTableScan { table: bid, columns: [bid.date_time, bid.auction, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } - name: two phase agg with stream SomeShard (via index) but pk satisfies output dist should use shuffle agg sql: | SET QUERY_MODE TO DISTRIBUTED; diff --git a/src/frontend/planner_test/tests/testdata/bushy_join.yaml b/src/frontend/planner_test/tests/testdata/bushy_join.yaml index 2678b03ecd3f1..75c16b72cfd50 100644 --- a/src/frontend/planner_test/tests/testdata/bushy_join.yaml +++ b/src/frontend/planner_test/tests/testdata/bushy_join.yaml @@ -267,23 +267,22 @@ └─StreamExchange { dist: Single } └─StreamGroupTopN { order: "[sum($expr1) DESC, orders.o_orderdate ASC]", limit: 10, offset: 0, group_key: [4] } └─StreamProject { exprs: [lineitem.l_orderkey, sum($expr1), orders.o_orderdate, orders.o_shippriority, Vnode(lineitem.l_orderkey) as $expr2] } - └─StreamProject { exprs: [lineitem.l_orderkey, sum($expr1), orders.o_orderdate, orders.o_shippriority] } - └─StreamHashAgg { group_key: [lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority], aggs: [sum($expr1), count] } - └─StreamProject { exprs: [lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority, (lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) as $expr1, lineitem.l_linenumber, customer.c_custkey, orders.o_orderkey] } - └─StreamHashJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey, output: [orders.o_orderdate, orders.o_shippriority, lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber, customer.c_custkey, orders.o_orderkey] } - ├─StreamExchange { dist: HashShard(lineitem.l_orderkey) } - | └─StreamProject { exprs: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber] } - | └─StreamFilter { predicate: (lineitem.l_shipdate > '1995-03-29':Date) } - | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber, lineitem.l_shipdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } - └─StreamExchange { dist: HashShard(orders.o_orderkey) } - └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [orders.o_orderkey, orders.o_orderdate, orders.o_shippriority, customer.c_custkey] } - ├─StreamExchange { dist: HashShard(customer.c_custkey) } - | └─StreamProject { exprs: [customer.c_custkey] } - | └─StreamFilter { predicate: (customer.c_mktsegment = 'FURNITURE':Varchar) } - | └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_mktsegment], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } - └─StreamExchange { dist: HashShard(orders.o_custkey) } - └─StreamFilter { predicate: (orders.o_orderdate < '1995-03-29':Date) } - └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_orderdate, orders.o_shippriority], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } + └─StreamHashAgg { group_key: [lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority], aggs: [sum($expr1), count] } + └─StreamProject { exprs: [lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority, (lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) as $expr1, lineitem.l_linenumber, customer.c_custkey, orders.o_orderkey] } + └─StreamHashJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey, output: [orders.o_orderdate, orders.o_shippriority, lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber, customer.c_custkey, orders.o_orderkey] } + ├─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + | └─StreamProject { exprs: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber] } + | └─StreamFilter { predicate: (lineitem.l_shipdate > '1995-03-29':Date) } + | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber, lineitem.l_shipdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + └─StreamExchange { dist: HashShard(orders.o_orderkey) } + └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [orders.o_orderkey, orders.o_orderdate, orders.o_shippriority, customer.c_custkey] } + ├─StreamExchange { dist: HashShard(customer.c_custkey) } + | └─StreamProject { exprs: [customer.c_custkey] } + | └─StreamFilter { predicate: (customer.c_mktsegment = 'FURNITURE':Varchar) } + | └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_mktsegment], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } + └─StreamExchange { dist: HashShard(orders.o_custkey) } + └─StreamFilter { predicate: (orders.o_orderdate < '1995-03-29':Date) } + └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_orderdate, orders.o_shippriority], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } with_config_map: RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' - id: tpch_q4 @@ -678,26 +677,25 @@ └─StreamExchange { dist: Single } └─StreamGroupTopN { order: "[sum($expr1) DESC]", limit: 20, offset: 0, group_key: [8] } └─StreamProject { exprs: [customer.c_custkey, customer.c_name, sum($expr1), customer.c_acctbal, nation.n_name, customer.c_address, customer.c_phone, customer.c_comment, Vnode(customer.c_custkey) as $expr2] } - └─StreamProject { exprs: [customer.c_custkey, customer.c_name, sum($expr1), customer.c_acctbal, nation.n_name, customer.c_address, customer.c_phone, customer.c_comment] } - └─StreamHashAgg { group_key: [customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment], aggs: [sum($expr1), count] } - └─StreamProject { exprs: [customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment, (lineitem.l_extendedprice * (1.00:Decimal - lineitem.l_discount)) as $expr1, nation.n_nationkey, lineitem.l_orderkey, lineitem.l_linenumber, orders.o_orderkey] } - └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [customer.c_custkey, customer.c_name, customer.c_address, customer.c_phone, customer.c_acctbal, customer.c_comment, lineitem.l_extendedprice, lineitem.l_discount, nation.n_name, nation.n_nationkey, lineitem.l_orderkey, lineitem.l_linenumber, orders.o_orderkey] } - ├─StreamExchange { dist: HashShard(customer.c_custkey) } - | └─StreamHashJoin { type: Inner, predicate: nation.n_nationkey = customer.c_nationkey, output: [nation.n_name, customer.c_custkey, customer.c_name, customer.c_address, customer.c_phone, customer.c_acctbal, customer.c_comment, nation.n_nationkey] } - | ├─StreamExchange { dist: HashShard(nation.n_nationkey) } - | | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } - | └─StreamExchange { dist: HashShard(customer.c_nationkey) } - | └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_name, customer.c_address, customer.c_nationkey, customer.c_phone, customer.c_acctbal, customer.c_comment], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } - └─StreamExchange { dist: HashShard(orders.o_custkey) } - └─StreamHashJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey, output: [lineitem.l_extendedprice, lineitem.l_discount, orders.o_custkey, lineitem.l_orderkey, lineitem.l_linenumber, orders.o_orderkey] } - ├─StreamExchange { dist: HashShard(lineitem.l_orderkey) } - | └─StreamProject { exprs: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber] } - | └─StreamFilter { predicate: (lineitem.l_returnflag = 'R':Varchar) } - | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber, lineitem.l_returnflag], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } - └─StreamExchange { dist: HashShard(orders.o_orderkey) } - └─StreamProject { exprs: [orders.o_orderkey, orders.o_custkey] } - └─StreamFilter { predicate: (orders.o_orderdate >= '1994-01-01':Date) AND (orders.o_orderdate < '1994-04-01 00:00:00':Timestamp) } - └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_orderdate], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } + └─StreamHashAgg { group_key: [customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment], aggs: [sum($expr1), count] } + └─StreamProject { exprs: [customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment, (lineitem.l_extendedprice * (1.00:Decimal - lineitem.l_discount)) as $expr1, nation.n_nationkey, lineitem.l_orderkey, lineitem.l_linenumber, orders.o_orderkey] } + └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [customer.c_custkey, customer.c_name, customer.c_address, customer.c_phone, customer.c_acctbal, customer.c_comment, lineitem.l_extendedprice, lineitem.l_discount, nation.n_name, nation.n_nationkey, lineitem.l_orderkey, lineitem.l_linenumber, orders.o_orderkey] } + ├─StreamExchange { dist: HashShard(customer.c_custkey) } + | └─StreamHashJoin { type: Inner, predicate: nation.n_nationkey = customer.c_nationkey, output: [nation.n_name, customer.c_custkey, customer.c_name, customer.c_address, customer.c_phone, customer.c_acctbal, customer.c_comment, nation.n_nationkey] } + | ├─StreamExchange { dist: HashShard(nation.n_nationkey) } + | | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } + | └─StreamExchange { dist: HashShard(customer.c_nationkey) } + | └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_name, customer.c_address, customer.c_nationkey, customer.c_phone, customer.c_acctbal, customer.c_comment], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } + └─StreamExchange { dist: HashShard(orders.o_custkey) } + └─StreamHashJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey, output: [lineitem.l_extendedprice, lineitem.l_discount, orders.o_custkey, lineitem.l_orderkey, lineitem.l_linenumber, orders.o_orderkey] } + ├─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + | └─StreamProject { exprs: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber] } + | └─StreamFilter { predicate: (lineitem.l_returnflag = 'R':Varchar) } + | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber, lineitem.l_returnflag], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + └─StreamExchange { dist: HashShard(orders.o_orderkey) } + └─StreamProject { exprs: [orders.o_orderkey, orders.o_custkey] } + └─StreamFilter { predicate: (orders.o_orderdate >= '1994-01-01':Date) AND (orders.o_orderdate < '1994-04-01 00:00:00':Timestamp) } + └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_orderdate], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } with_config_map: RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' - id: tpch_q11 @@ -1104,25 +1102,24 @@ └─StreamExchange { dist: Single } └─StreamGroupTopN { order: "[orders.o_totalprice DESC, orders.o_orderdate ASC]", limit: 100, offset: 0, group_key: [6] } └─StreamProject { exprs: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, sum(lineitem.l_quantity), Vnode(orders.o_orderkey) as $expr1] } - └─StreamProject { exprs: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, sum(lineitem.l_quantity)] } - └─StreamHashAgg { group_key: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice], aggs: [sum(lineitem.l_quantity), count] } - └─StreamHashJoin { type: LeftSemi, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber] } - ├─StreamExchange { dist: HashShard(orders.o_orderkey) } - | └─StreamHashJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey, output: [customer.c_custkey, customer.c_name, orders.o_orderkey, orders.o_totalprice, orders.o_orderdate, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber] } - | ├─StreamExchange { dist: HashShard(lineitem.l_orderkey) } - | | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_quantity, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } - | └─StreamExchange { dist: HashShard(orders.o_orderkey) } - | └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [customer.c_custkey, customer.c_name, orders.o_orderkey, orders.o_totalprice, orders.o_orderdate] } - | ├─StreamExchange { dist: HashShard(customer.c_custkey) } - | | └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_name], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } - | └─StreamExchange { dist: HashShard(orders.o_custkey) } - | └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_totalprice, orders.o_orderdate], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } - └─StreamProject { exprs: [lineitem.l_orderkey] } - └─StreamFilter { predicate: (sum(lineitem.l_quantity) > 1:Int32) } - └─StreamProject { exprs: [lineitem.l_orderkey, sum(lineitem.l_quantity)] } - └─StreamHashAgg { group_key: [lineitem.l_orderkey], aggs: [sum(lineitem.l_quantity), count] } - └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } - └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_quantity, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + └─StreamHashAgg { group_key: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice], aggs: [sum(lineitem.l_quantity), count] } + └─StreamHashJoin { type: LeftSemi, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber] } + ├─StreamExchange { dist: HashShard(orders.o_orderkey) } + | └─StreamHashJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey, output: [customer.c_custkey, customer.c_name, orders.o_orderkey, orders.o_totalprice, orders.o_orderdate, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber] } + | ├─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + | | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_quantity, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + | └─StreamExchange { dist: HashShard(orders.o_orderkey) } + | └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [customer.c_custkey, customer.c_name, orders.o_orderkey, orders.o_totalprice, orders.o_orderdate] } + | ├─StreamExchange { dist: HashShard(customer.c_custkey) } + | | └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_name], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } + | └─StreamExchange { dist: HashShard(orders.o_custkey) } + | └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_totalprice, orders.o_orderdate], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } + └─StreamProject { exprs: [lineitem.l_orderkey] } + └─StreamFilter { predicate: (sum(lineitem.l_quantity) > 1:Int32) } + └─StreamProject { exprs: [lineitem.l_orderkey, sum(lineitem.l_quantity)] } + └─StreamHashAgg { group_key: [lineitem.l_orderkey], aggs: [sum(lineitem.l_quantity), count] } + └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_quantity, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } with_config_map: RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' - id: tpch_q19 diff --git a/src/frontend/planner_test/tests/testdata/nexmark.yaml b/src/frontend/planner_test/tests/testdata/nexmark.yaml index 4bc4c030104a2..b12d864776bde 100644 --- a/src/frontend/planner_test/tests/testdata/nexmark.yaml +++ b/src/frontend/planner_test/tests/testdata/nexmark.yaml @@ -42,6 +42,11 @@ batch_plan: | BatchExchange { order: [], dist: Single } └─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time], distribution: SomeShard } + sink_plan: | + StreamSink { type: append-only, columns: [auction, bidder, price, date_time] } + └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time] } + └─StreamExchange { dist: Single } + └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } stream_plan: | StreamMaterialize { columns: [auction, bidder, price, date_time, bid._row_id(hidden)], pk_columns: [bid._row_id], pk_conflict: "no check" } └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } @@ -74,6 +79,12 @@ BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price) as $expr1, bid.date_time] } └─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time], distribution: SomeShard } + sink_plan: | + StreamSink { type: append-only, columns: [auction, bidder, price, date_time] } + └─StreamProject { exprs: [bid.auction, bid.bidder, $expr1, bid.date_time] } + └─StreamExchange { dist: Single } + └─StreamProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price) as $expr1, bid.date_time, bid._row_id] } + └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } stream_plan: | StreamMaterialize { columns: [auction, bidder, price, date_time, bid._row_id(hidden)], pk_columns: [bid._row_id], pk_conflict: "no check" } └─StreamProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price) as $expr1, bid.date_time, bid._row_id] } @@ -102,6 +113,12 @@ BatchExchange { order: [], dist: Single } └─BatchFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) } └─BatchScan { table: bid, columns: [bid.auction, bid.price], distribution: SomeShard } + sink_plan: | + StreamSink { type: append-only, columns: [auction, price] } + └─StreamProject { exprs: [bid.auction, bid.price] } + └─StreamExchange { dist: Single } + └─StreamFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) } + └─StreamTableScan { table: bid, columns: [bid.auction, bid.price, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } stream_plan: | StreamMaterialize { columns: [auction, price, bid._row_id(hidden)], pk_columns: [bid._row_id], pk_conflict: "no check" } └─StreamFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) } @@ -796,6 +813,12 @@ BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, ToChar(bid.date_time, 'YYYY-MM-DD':Varchar) as $expr1, ToChar(bid.date_time, 'HH:MI':Varchar) as $expr2] } └─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time], distribution: SomeShard } + sink_plan: | + StreamSink { type: append-only, columns: [auction, bidder, price, date_time, date, time] } + └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, $expr1, $expr2] } + └─StreamExchange { dist: Single } + └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, ToChar(bid.date_time, 'YYYY-MM-DD':Varchar) as $expr1, ToChar(bid.date_time, 'HH:MI':Varchar) as $expr2, bid._row_id] } + └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } stream_plan: | StreamMaterialize { columns: [auction, bidder, price, date_time, date, time, bid._row_id(hidden)], pk_columns: [bid._row_id], pk_conflict: "no check" } └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, ToChar(bid.date_time, 'YYYY-MM-DD':Varchar) as $expr1, ToChar(bid.date_time, 'HH:MI':Varchar) as $expr2, bid._row_id] } @@ -889,6 +912,13 @@ └─BatchProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price) as $expr1, Case(((Extract('HOUR':Varchar, bid.date_time) >= 8:Int32) AND (Extract('HOUR':Varchar, bid.date_time) <= 18:Int32)), 'dayTime':Varchar, ((Extract('HOUR':Varchar, bid.date_time) <= 6:Int32) OR (Extract('HOUR':Varchar, bid.date_time) >= 20:Int32)), 'nightTime':Varchar, 'otherTime':Varchar) as $expr2, bid.date_time, bid.extra] } └─BatchFilter { predicate: ((0.908:Decimal * bid.price) > 1000000:Int32) AND ((0.908:Decimal * bid.price) < 50000000:Int32) } └─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid.extra], distribution: SomeShard } + sink_plan: | + StreamSink { type: append-only, columns: [auction, bidder, price, bidtimetype, date_time, extra] } + └─StreamProject { exprs: [bid.auction, bid.bidder, $expr1, $expr2, bid.date_time, bid.extra] } + └─StreamExchange { dist: Single } + └─StreamProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price) as $expr1, Case(((Extract('HOUR':Varchar, bid.date_time) >= 8:Int32) AND (Extract('HOUR':Varchar, bid.date_time) <= 18:Int32)), 'dayTime':Varchar, ((Extract('HOUR':Varchar, bid.date_time) <= 6:Int32) OR (Extract('HOUR':Varchar, bid.date_time) >= 20:Int32)), 'nightTime':Varchar, 'otherTime':Varchar) as $expr2, bid.date_time, bid.extra, bid._row_id] } + └─StreamFilter { predicate: ((0.908:Decimal * bid.price) > 1000000:Int32) AND ((0.908:Decimal * bid.price) < 50000000:Int32) } + └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid.extra, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } stream_plan: | StreamMaterialize { columns: [auction, bidder, price, bidtimetype, date_time, extra, bid._row_id(hidden)], pk_columns: [bid._row_id], pk_conflict: "no check" } └─StreamProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price) as $expr1, Case(((Extract('HOUR':Varchar, bid.date_time) >= 8:Int32) AND (Extract('HOUR':Varchar, bid.date_time) <= 18:Int32)), 'dayTime':Varchar, ((Extract('HOUR':Varchar, bid.date_time) <= 6:Int32) OR (Extract('HOUR':Varchar, bid.date_time) >= 20:Int32)), 'nightTime':Varchar, 'otherTime':Varchar) as $expr2, bid.date_time, bid.extra, bid._row_id] } @@ -1259,6 +1289,12 @@ BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, SplitPart(bid.url, '/':Varchar, 4:Int32) as $expr1, SplitPart(bid.url, '/':Varchar, 5:Int32) as $expr2, SplitPart(bid.url, '/':Varchar, 6:Int32) as $expr3] } └─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url], distribution: SomeShard } + sink_plan: | + StreamSink { type: append-only, columns: [auction, bidder, price, channel, dir1, dir2, dir3] } + └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, $expr1, $expr2, $expr3] } + └─StreamExchange { dist: Single } + └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, SplitPart(bid.url, '/':Varchar, 4:Int32) as $expr1, SplitPart(bid.url, '/':Varchar, 5:Int32) as $expr2, SplitPart(bid.url, '/':Varchar, 6:Int32) as $expr3, bid._row_id] } + └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } stream_plan: | StreamMaterialize { columns: [auction, bidder, price, channel, dir1, dir2, dir3, bid._row_id(hidden)], pk_columns: [bid._row_id], pk_conflict: "no check" } └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, SplitPart(bid.url, '/':Varchar, 4:Int32) as $expr1, SplitPart(bid.url, '/':Varchar, 5:Int32) as $expr2, SplitPart(bid.url, '/':Varchar, 6:Int32) as $expr3, bid._row_id] } @@ -1751,13 +1787,12 @@ └─StreamExchange { dist: Single } └─StreamGroupTopN { order: "[count(bid.auction) DESC]", limit: 1000, offset: 0, group_key: [3] } └─StreamProject { exprs: [auction.id, auction.item_name, count(bid.auction), Vnode(auction.id) as $expr1] } - └─StreamProject { exprs: [auction.id, auction.item_name, count(bid.auction)] } - └─StreamHashAgg { group_key: [auction.id, auction.item_name], aggs: [count(bid.auction), count] } - └─StreamHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all } - ├─StreamExchange { dist: HashShard(auction.id) } - | └─StreamTableScan { table: auction, columns: [auction.id, auction.item_name], pk: [auction.id], dist: UpstreamHashShard(auction.id) } - └─StreamExchange { dist: HashShard(bid.auction) } - └─StreamTableScan { table: bid, columns: [bid.auction, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } + └─StreamHashAgg { group_key: [auction.id, auction.item_name], aggs: [count(bid.auction), count] } + └─StreamHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all } + ├─StreamExchange { dist: HashShard(auction.id) } + | └─StreamTableScan { table: auction, columns: [auction.id, auction.item_name], pk: [auction.id], dist: UpstreamHashShard(auction.id) } + └─StreamExchange { dist: HashShard(bid.auction) } + └─StreamTableScan { table: bid, columns: [bid.auction, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], pk_columns: [auction_id, auction_item_name], order_descs: [bid_count, auction_id, auction_item_name], pk_conflict: "no check" } @@ -1769,11 +1804,10 @@ Fragment 1 StreamGroupTopN { order: "[count(bid.auction) DESC]", limit: 1000, offset: 0, group_key: [3] } { state table: 1 } └── StreamProject { exprs: [auction.id, auction.item_name, count(bid.auction), Vnode(auction.id) as $expr1] } - └── StreamProject { exprs: [auction.id, auction.item_name, count(bid.auction)] } - └── StreamHashAgg { group_key: [auction.id, auction.item_name], aggs: [count(bid.auction), count] } { result table: 2, state tables: [], distinct tables: [] } - └── StreamHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all } { left table: 3, right table: 5, left degree table: 4, right degree table: 6 } - ├── StreamExchange Hash([0]) from 2 - └── StreamExchange Hash([0]) from 3 + └── StreamHashAgg { group_key: [auction.id, auction.item_name], aggs: [count(bid.auction), count] } { result table: 2, state tables: [], distinct tables: [] } + └── StreamHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all } { left table: 3, right table: 5, left degree table: 4, right degree table: 6 } + ├── StreamExchange Hash([0]) from 2 + └── StreamExchange Hash([0]) from 3 Fragment 2 Chain { table: auction, columns: [auction.id, auction.item_name], pk: [auction.id], dist: UpstreamHashShard(auction.id) } @@ -1852,15 +1886,14 @@ └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [$expr1], aggs: [min(max(bid.price)), count] } └─StreamProject { exprs: [auction.id, max(bid.price), Vnode(auction.id) as $expr1] } - └─StreamProject { exprs: [auction.id, max(bid.price)] } - └─StreamHashAgg { group_key: [auction.id], aggs: [max(bid.price), count] } - └─StreamProject { exprs: [auction.id, bid.price, bid._row_id] } - └─StreamFilter { predicate: (bid.date_time >= auction.date_time) AND (bid.date_time <= auction.expires) } - └─StreamHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all } - ├─StreamExchange { dist: HashShard(auction.id) } - | └─StreamTableScan { table: auction, columns: [auction.id, auction.date_time, auction.expires], pk: [auction.id], dist: UpstreamHashShard(auction.id) } - └─StreamExchange { dist: HashShard(bid.auction) } - └─StreamTableScan { table: bid, columns: [bid.auction, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } + └─StreamHashAgg { group_key: [auction.id], aggs: [max(bid.price), count] } + └─StreamProject { exprs: [auction.id, bid.price, bid._row_id] } + └─StreamFilter { predicate: (bid.date_time >= auction.date_time) AND (bid.date_time <= auction.expires) } + └─StreamHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all } + ├─StreamExchange { dist: HashShard(auction.id) } + | └─StreamTableScan { table: auction, columns: [auction.id, auction.date_time, auction.expires], pk: [auction.id], dist: UpstreamHashShard(auction.id) } + └─StreamExchange { dist: HashShard(bid.auction) } + └─StreamTableScan { table: bid, columns: [bid.auction, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [min_final], pk_columns: [], pk_conflict: "no check" } @@ -1878,20 +1911,19 @@ ├── state tables: [ 2 ] ├── distinct tables: [] └── StreamProject { exprs: [auction.id, max(bid.price), Vnode(auction.id) as $expr1] } - └── StreamProject { exprs: [auction.id, max(bid.price)] } - └── StreamHashAgg { group_key: [auction.id], aggs: [max(bid.price), count] } - ├── result table: 5 - ├── state tables: [ 4 ] - ├── distinct tables: [] - └── StreamProject { exprs: [auction.id, bid.price, bid._row_id] } - └── StreamFilter { predicate: (bid.date_time >= auction.date_time) AND (bid.date_time <= auction.expires) } - └── StreamHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all } - ├── left table: 6 - ├── right table: 8 - ├── left degree table: 7 - ├── right degree table: 9 - ├── StreamExchange Hash([0]) from 2 - └── StreamExchange Hash([0]) from 3 + └── StreamHashAgg { group_key: [auction.id], aggs: [max(bid.price), count] } + ├── result table: 5 + ├── state tables: [ 4 ] + ├── distinct tables: [] + └── StreamProject { exprs: [auction.id, bid.price, bid._row_id] } + └── StreamFilter { predicate: (bid.date_time >= auction.date_time) AND (bid.date_time <= auction.expires) } + └── StreamHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all } + ├── left table: 6 + ├── right table: 8 + ├── left degree table: 7 + ├── right degree table: 9 + ├── StreamExchange Hash([0]) from 2 + └── StreamExchange Hash([0]) from 3 Fragment 2 Chain { table: auction, columns: [auction.id, auction.date_time, auction.expires], pk: [auction.id], dist: UpstreamHashShard(auction.id) } diff --git a/src/frontend/planner_test/tests/testdata/nexmark_source.yaml b/src/frontend/planner_test/tests/testdata/nexmark_source.yaml index 24aa0c56419a2..c4f4c599e63f9 100644 --- a/src/frontend/planner_test/tests/testdata/nexmark_source.yaml +++ b/src/frontend/planner_test/tests/testdata/nexmark_source.yaml @@ -1777,17 +1777,16 @@ └─StreamExchange { dist: Single } └─StreamGroupTopN { order: "[count(auction) DESC]", limit: 1000, offset: 0, group_key: [3] } └─StreamProject { exprs: [id, item_name, count(auction), Vnode(id) as $expr1] } - └─StreamProject { exprs: [id, item_name, count(auction)] } - └─StreamAppendOnlyHashAgg { group_key: [id, item_name], aggs: [count(auction), count] } - └─StreamAppendOnlyHashJoin { type: Inner, predicate: id = auction, output: [id, item_name, auction, _row_id, _row_id] } - ├─StreamExchange { dist: HashShard(id) } - | └─StreamProject { exprs: [id, item_name, _row_id] } - | └─StreamRowIdGen { row_id_index: 10 } - | └─StreamSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "extra", "_row_id"] } - └─StreamExchange { dist: HashShard(auction) } - └─StreamProject { exprs: [auction, _row_id] } - └─StreamRowIdGen { row_id_index: 7 } - └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } + └─StreamAppendOnlyHashAgg { group_key: [id, item_name], aggs: [count(auction), count] } + └─StreamAppendOnlyHashJoin { type: Inner, predicate: id = auction, output: [id, item_name, auction, _row_id, _row_id] } + ├─StreamExchange { dist: HashShard(id) } + | └─StreamProject { exprs: [id, item_name, _row_id] } + | └─StreamRowIdGen { row_id_index: 10 } + | └─StreamSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "extra", "_row_id"] } + └─StreamExchange { dist: HashShard(auction) } + └─StreamProject { exprs: [auction, _row_id] } + └─StreamRowIdGen { row_id_index: 7 } + └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], pk_columns: [auction_id, auction_item_name], order_descs: [bid_count, auction_id, auction_item_name], pk_conflict: "no check" } @@ -1799,15 +1798,14 @@ Fragment 1 StreamGroupTopN { order: "[count(auction) DESC]", limit: 1000, offset: 0, group_key: [3] } { state table: 1 } └── StreamProject { exprs: [id, item_name, count(auction), Vnode(id) as $expr1] } - └── StreamProject { exprs: [id, item_name, count(auction)] } - └── StreamAppendOnlyHashAgg { group_key: [id, item_name], aggs: [count(auction), count] } { result table: 2, state tables: [], distinct tables: [] } - └── StreamAppendOnlyHashJoin { type: Inner, predicate: id = auction, output: [id, item_name, auction, _row_id, _row_id] } - ├── left table: 3 - ├── right table: 5 - ├── left degree table: 4 - ├── right degree table: 6 - ├── StreamExchange Hash([0]) from 2 - └── StreamExchange Hash([0]) from 3 + └── StreamAppendOnlyHashAgg { group_key: [id, item_name], aggs: [count(auction), count] } { result table: 2, state tables: [], distinct tables: [] } + └── StreamAppendOnlyHashJoin { type: Inner, predicate: id = auction, output: [id, item_name, auction, _row_id, _row_id] } + ├── left table: 3 + ├── right table: 5 + ├── left degree table: 4 + ├── right degree table: 6 + ├── StreamExchange Hash([0]) from 2 + └── StreamExchange Hash([0]) from 3 Fragment 2 StreamProject { exprs: [id, item_name, _row_id] } @@ -1888,19 +1886,18 @@ └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [$expr1], aggs: [min(max(price)), count] } └─StreamProject { exprs: [id, max(price), Vnode(id) as $expr1] } - └─StreamProject { exprs: [id, max(price)] } - └─StreamAppendOnlyHashAgg { group_key: [id], aggs: [max(price), count] } - └─StreamProject { exprs: [id, price, _row_id, _row_id] } - └─StreamFilter { predicate: (date_time >= date_time) AND (date_time <= expires) } - └─StreamAppendOnlyHashJoin { type: Inner, predicate: id = auction, output: all } - ├─StreamExchange { dist: HashShard(id) } - | └─StreamProject { exprs: [id, date_time, expires, _row_id] } - | └─StreamRowIdGen { row_id_index: 10 } - | └─StreamSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "extra", "_row_id"] } - └─StreamExchange { dist: HashShard(auction) } - └─StreamProject { exprs: [auction, price, date_time, _row_id] } - └─StreamRowIdGen { row_id_index: 7 } - └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } + └─StreamAppendOnlyHashAgg { group_key: [id], aggs: [max(price), count] } + └─StreamProject { exprs: [id, price, _row_id, _row_id] } + └─StreamFilter { predicate: (date_time >= date_time) AND (date_time <= expires) } + └─StreamAppendOnlyHashJoin { type: Inner, predicate: id = auction, output: all } + ├─StreamExchange { dist: HashShard(id) } + | └─StreamProject { exprs: [id, date_time, expires, _row_id] } + | └─StreamRowIdGen { row_id_index: 10 } + | └─StreamSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "extra", "_row_id"] } + └─StreamExchange { dist: HashShard(auction) } + └─StreamProject { exprs: [auction, price, date_time, _row_id] } + └─StreamRowIdGen { row_id_index: 7 } + └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [min_final], pk_columns: [], pk_conflict: "no check" } @@ -1918,20 +1915,19 @@ ├── state tables: [ 2 ] ├── distinct tables: [] └── StreamProject { exprs: [id, max(price), Vnode(id) as $expr1] } - └── StreamProject { exprs: [id, max(price)] } - └── StreamAppendOnlyHashAgg { group_key: [id], aggs: [max(price), count] } - ├── result table: 4 - ├── state tables: [] - ├── distinct tables: [] - └── StreamProject { exprs: [id, price, _row_id, _row_id] } - └── StreamFilter { predicate: (date_time >= date_time) AND (date_time <= expires) } - └── StreamAppendOnlyHashJoin { type: Inner, predicate: id = auction, output: all } - ├── left table: 5 - ├── right table: 7 - ├── left degree table: 6 - ├── right degree table: 8 - ├── StreamExchange Hash([0]) from 2 - └── StreamExchange Hash([0]) from 3 + └── StreamAppendOnlyHashAgg { group_key: [id], aggs: [max(price), count] } + ├── result table: 4 + ├── state tables: [] + ├── distinct tables: [] + └── StreamProject { exprs: [id, price, _row_id, _row_id] } + └── StreamFilter { predicate: (date_time >= date_time) AND (date_time <= expires) } + └── StreamAppendOnlyHashJoin { type: Inner, predicate: id = auction, output: all } + ├── left table: 5 + ├── right table: 7 + ├── left degree table: 6 + ├── right degree table: 8 + ├── StreamExchange Hash([0]) from 2 + └── StreamExchange Hash([0]) from 3 Fragment 2 StreamProject { exprs: [id, date_time, expires, _row_id] } diff --git a/src/frontend/planner_test/tests/testdata/tpch.yaml b/src/frontend/planner_test/tests/testdata/tpch.yaml index d408ddebabd6a..80bcfa2ee2308 100644 --- a/src/frontend/planner_test/tests/testdata/tpch.yaml +++ b/src/frontend/planner_test/tests/testdata/tpch.yaml @@ -627,24 +627,23 @@ └─StreamExchange { dist: Single } └─StreamGroupTopN { order: "[sum($expr1) DESC, orders.o_orderdate ASC]", limit: 10, offset: 0, group_key: [4] } └─StreamProject { exprs: [lineitem.l_orderkey, sum($expr1), orders.o_orderdate, orders.o_shippriority, Vnode(lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority) as $expr2] } - └─StreamProject { exprs: [lineitem.l_orderkey, sum($expr1), orders.o_orderdate, orders.o_shippriority] } - └─StreamHashAgg { group_key: [lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority], aggs: [sum($expr1), count] } - └─StreamExchange { dist: HashShard(lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority) } - └─StreamProject { exprs: [lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority, (lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) as $expr1, customer.c_custkey, orders.o_orderkey, lineitem.l_linenumber] } - └─StreamHashJoin { type: Inner, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [orders.o_orderdate, orders.o_shippriority, lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, customer.c_custkey, orders.o_orderkey, lineitem.l_linenumber] } - ├─StreamExchange { dist: HashShard(orders.o_orderkey) } - | └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [orders.o_orderkey, orders.o_orderdate, orders.o_shippriority, customer.c_custkey] } - | ├─StreamExchange { dist: HashShard(customer.c_custkey) } - | | └─StreamProject { exprs: [customer.c_custkey] } - | | └─StreamFilter { predicate: (customer.c_mktsegment = 'FURNITURE':Varchar) } - | | └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_mktsegment], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } - | └─StreamExchange { dist: HashShard(orders.o_custkey) } - | └─StreamFilter { predicate: (orders.o_orderdate < '1995-03-29':Date) } - | └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_orderdate, orders.o_shippriority], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } - └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } - └─StreamProject { exprs: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber] } - └─StreamFilter { predicate: (lineitem.l_shipdate > '1995-03-29':Date) } - └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber, lineitem.l_shipdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + └─StreamHashAgg { group_key: [lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority], aggs: [sum($expr1), count] } + └─StreamExchange { dist: HashShard(lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority) } + └─StreamProject { exprs: [lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority, (lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) as $expr1, customer.c_custkey, orders.o_orderkey, lineitem.l_linenumber] } + └─StreamHashJoin { type: Inner, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [orders.o_orderdate, orders.o_shippriority, lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, customer.c_custkey, orders.o_orderkey, lineitem.l_linenumber] } + ├─StreamExchange { dist: HashShard(orders.o_orderkey) } + | └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [orders.o_orderkey, orders.o_orderdate, orders.o_shippriority, customer.c_custkey] } + | ├─StreamExchange { dist: HashShard(customer.c_custkey) } + | | └─StreamProject { exprs: [customer.c_custkey] } + | | └─StreamFilter { predicate: (customer.c_mktsegment = 'FURNITURE':Varchar) } + | | └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_mktsegment], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } + | └─StreamExchange { dist: HashShard(orders.o_custkey) } + | └─StreamFilter { predicate: (orders.o_orderdate < '1995-03-29':Date) } + | └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_orderdate, orders.o_shippriority], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } + └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + └─StreamProject { exprs: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber] } + └─StreamFilter { predicate: (lineitem.l_shipdate > '1995-03-29':Date) } + └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber, lineitem.l_shipdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [l_orderkey, revenue, o_orderdate, o_shippriority], pk_columns: [l_orderkey, o_orderdate, o_shippriority], order_descs: [revenue, o_orderdate, l_orderkey, o_shippriority], pk_conflict: "no check" } @@ -656,9 +655,8 @@ Fragment 1 StreamGroupTopN { order: "[sum($expr1) DESC, orders.o_orderdate ASC]", limit: 10, offset: 0, group_key: [4] } { state table: 1 } └── StreamProject { exprs: [lineitem.l_orderkey, sum($expr1), orders.o_orderdate, orders.o_shippriority, Vnode(lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority) as $expr2] } - └── StreamProject { exprs: [lineitem.l_orderkey, sum($expr1), orders.o_orderdate, orders.o_shippriority] } - └── StreamHashAgg { group_key: [lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority], aggs: [sum($expr1), count] } { result table: 2, state tables: [], distinct tables: [] } - └── StreamExchange Hash([0, 1, 2]) from 2 + └── StreamHashAgg { group_key: [lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority], aggs: [sum($expr1), count] } { result table: 2, state tables: [], distinct tables: [] } + └── StreamExchange Hash([0, 1, 2]) from 2 Fragment 2 StreamProject { exprs: [lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority, (lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) as $expr1, customer.c_custkey, orders.o_orderkey, lineitem.l_linenumber] } @@ -2059,27 +2057,26 @@ └─StreamExchange { dist: Single } └─StreamGroupTopN { order: "[sum($expr1) DESC]", limit: 20, offset: 0, group_key: [8] } └─StreamProject { exprs: [customer.c_custkey, customer.c_name, sum($expr1), customer.c_acctbal, nation.n_name, customer.c_address, customer.c_phone, customer.c_comment, Vnode(customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment) as $expr2] } - └─StreamProject { exprs: [customer.c_custkey, customer.c_name, sum($expr1), customer.c_acctbal, nation.n_name, customer.c_address, customer.c_phone, customer.c_comment] } - └─StreamHashAgg { group_key: [customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment], aggs: [sum($expr1), count] } - └─StreamExchange { dist: HashShard(customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment) } - └─StreamProject { exprs: [customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment, (lineitem.l_extendedprice * (1.00:Decimal - lineitem.l_discount)) as $expr1, orders.o_orderkey, nation.n_nationkey, customer.c_nationkey, lineitem.l_orderkey, lineitem.l_linenumber] } - └─StreamHashJoin { type: Inner, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [customer.c_custkey, customer.c_name, customer.c_address, customer.c_phone, customer.c_acctbal, customer.c_comment, lineitem.l_extendedprice, lineitem.l_discount, nation.n_name, orders.o_orderkey, nation.n_nationkey, customer.c_nationkey, lineitem.l_orderkey, lineitem.l_linenumber] } - ├─StreamExchange { dist: HashShard(orders.o_orderkey) } - | └─StreamHashJoin { type: Inner, predicate: customer.c_nationkey = nation.n_nationkey, output: [customer.c_custkey, customer.c_name, customer.c_address, customer.c_phone, customer.c_acctbal, customer.c_comment, orders.o_orderkey, nation.n_name, customer.c_nationkey, nation.n_nationkey] } - | ├─StreamExchange { dist: HashShard(customer.c_nationkey) } - | | └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [customer.c_custkey, customer.c_name, customer.c_address, customer.c_nationkey, customer.c_phone, customer.c_acctbal, customer.c_comment, orders.o_orderkey] } - | | ├─StreamExchange { dist: HashShard(customer.c_custkey) } - | | | └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_name, customer.c_address, customer.c_nationkey, customer.c_phone, customer.c_acctbal, customer.c_comment], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } - | | └─StreamExchange { dist: HashShard(orders.o_custkey) } - | | └─StreamProject { exprs: [orders.o_orderkey, orders.o_custkey] } - | | └─StreamFilter { predicate: (orders.o_orderdate >= '1994-01-01':Date) AND (orders.o_orderdate < '1994-04-01 00:00:00':Timestamp) } - | | └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_orderdate], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } - | └─StreamExchange { dist: HashShard(nation.n_nationkey) } - | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } - └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } - └─StreamProject { exprs: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber] } - └─StreamFilter { predicate: (lineitem.l_returnflag = 'R':Varchar) } - └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber, lineitem.l_returnflag], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + └─StreamHashAgg { group_key: [customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment], aggs: [sum($expr1), count] } + └─StreamExchange { dist: HashShard(customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment) } + └─StreamProject { exprs: [customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment, (lineitem.l_extendedprice * (1.00:Decimal - lineitem.l_discount)) as $expr1, orders.o_orderkey, nation.n_nationkey, customer.c_nationkey, lineitem.l_orderkey, lineitem.l_linenumber] } + └─StreamHashJoin { type: Inner, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [customer.c_custkey, customer.c_name, customer.c_address, customer.c_phone, customer.c_acctbal, customer.c_comment, lineitem.l_extendedprice, lineitem.l_discount, nation.n_name, orders.o_orderkey, nation.n_nationkey, customer.c_nationkey, lineitem.l_orderkey, lineitem.l_linenumber] } + ├─StreamExchange { dist: HashShard(orders.o_orderkey) } + | └─StreamHashJoin { type: Inner, predicate: customer.c_nationkey = nation.n_nationkey, output: [customer.c_custkey, customer.c_name, customer.c_address, customer.c_phone, customer.c_acctbal, customer.c_comment, orders.o_orderkey, nation.n_name, customer.c_nationkey, nation.n_nationkey] } + | ├─StreamExchange { dist: HashShard(customer.c_nationkey) } + | | └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [customer.c_custkey, customer.c_name, customer.c_address, customer.c_nationkey, customer.c_phone, customer.c_acctbal, customer.c_comment, orders.o_orderkey] } + | | ├─StreamExchange { dist: HashShard(customer.c_custkey) } + | | | └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_name, customer.c_address, customer.c_nationkey, customer.c_phone, customer.c_acctbal, customer.c_comment], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } + | | └─StreamExchange { dist: HashShard(orders.o_custkey) } + | | └─StreamProject { exprs: [orders.o_orderkey, orders.o_custkey] } + | | └─StreamFilter { predicate: (orders.o_orderdate >= '1994-01-01':Date) AND (orders.o_orderdate < '1994-04-01 00:00:00':Timestamp) } + | | └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_orderdate], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } + | └─StreamExchange { dist: HashShard(nation.n_nationkey) } + | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } + └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + └─StreamProject { exprs: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber] } + └─StreamFilter { predicate: (lineitem.l_returnflag = 'R':Varchar) } + └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber, lineitem.l_returnflag], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [c_custkey, c_name, revenue, c_acctbal, n_name, c_address, c_phone, c_comment], pk_columns: [c_custkey, c_name, c_acctbal, c_phone, n_name, c_address, c_comment], order_descs: [revenue, c_custkey, c_name, c_acctbal, c_phone, n_name, c_address, c_comment], pk_conflict: "no check" } @@ -2091,9 +2088,8 @@ Fragment 1 StreamGroupTopN { order: "[sum($expr1) DESC]", limit: 20, offset: 0, group_key: [8] } { state table: 1 } └── StreamProject { exprs: [customer.c_custkey, customer.c_name, sum($expr1), customer.c_acctbal, nation.n_name, customer.c_address, customer.c_phone, customer.c_comment, Vnode(customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment) as $expr2] } - └── StreamProject { exprs: [customer.c_custkey, customer.c_name, sum($expr1), customer.c_acctbal, nation.n_name, customer.c_address, customer.c_phone, customer.c_comment] } - └── StreamHashAgg { group_key: [customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment], aggs: [sum($expr1), count] } { result table: 2, state tables: [], distinct tables: [] } - └── StreamExchange Hash([0, 1, 2, 3, 4, 5, 6]) from 2 + └── StreamHashAgg { group_key: [customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment], aggs: [sum($expr1), count] } { result table: 2, state tables: [], distinct tables: [] } + └── StreamExchange Hash([0, 1, 2, 3, 4, 5, 6]) from 2 Fragment 2 StreamProject { exprs: [customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment, (lineitem.l_extendedprice * (1.00:Decimal - lineitem.l_discount)) as $expr1, orders.o_orderkey, nation.n_nationkey, customer.c_nationkey, lineitem.l_orderkey, lineitem.l_linenumber] } @@ -3377,24 +3373,23 @@ └─StreamExchange { dist: Single } └─StreamGroupTopN { order: "[orders.o_totalprice DESC, orders.o_orderdate ASC]", limit: 100, offset: 0, group_key: [6] } └─StreamProject { exprs: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, sum(lineitem.l_quantity), Vnode(orders.o_orderkey) as $expr1] } - └─StreamProject { exprs: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, sum(lineitem.l_quantity)] } - └─StreamHashAgg { group_key: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice], aggs: [sum(lineitem.l_quantity), count] } - └─StreamHashJoin { type: LeftSemi, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber] } - ├─StreamHashJoin { type: Inner, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [customer.c_custkey, customer.c_name, orders.o_orderkey, orders.o_totalprice, orders.o_orderdate, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber] } - | ├─StreamExchange { dist: HashShard(orders.o_orderkey) } - | | └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [customer.c_custkey, customer.c_name, orders.o_orderkey, orders.o_totalprice, orders.o_orderdate] } - | | ├─StreamExchange { dist: HashShard(customer.c_custkey) } - | | | └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_name], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } - | | └─StreamExchange { dist: HashShard(orders.o_custkey) } - | | └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_totalprice, orders.o_orderdate], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } - | └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } - | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_quantity, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } - └─StreamProject { exprs: [lineitem.l_orderkey] } - └─StreamFilter { predicate: (sum(lineitem.l_quantity) > 1:Int32) } - └─StreamProject { exprs: [lineitem.l_orderkey, sum(lineitem.l_quantity)] } - └─StreamHashAgg { group_key: [lineitem.l_orderkey], aggs: [sum(lineitem.l_quantity), count] } - └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } - └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_quantity, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + └─StreamHashAgg { group_key: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice], aggs: [sum(lineitem.l_quantity), count] } + └─StreamHashJoin { type: LeftSemi, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber] } + ├─StreamHashJoin { type: Inner, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [customer.c_custkey, customer.c_name, orders.o_orderkey, orders.o_totalprice, orders.o_orderdate, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber] } + | ├─StreamExchange { dist: HashShard(orders.o_orderkey) } + | | └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [customer.c_custkey, customer.c_name, orders.o_orderkey, orders.o_totalprice, orders.o_orderdate] } + | | ├─StreamExchange { dist: HashShard(customer.c_custkey) } + | | | └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_name], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } + | | └─StreamExchange { dist: HashShard(orders.o_custkey) } + | | └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_totalprice, orders.o_orderdate], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } + | └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_quantity, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + └─StreamProject { exprs: [lineitem.l_orderkey] } + └─StreamFilter { predicate: (sum(lineitem.l_quantity) > 1:Int32) } + └─StreamProject { exprs: [lineitem.l_orderkey, sum(lineitem.l_quantity)] } + └─StreamHashAgg { group_key: [lineitem.l_orderkey], aggs: [sum(lineitem.l_quantity), count] } + └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_quantity, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice, quantity], pk_columns: [c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice], order_descs: [o_totalprice, o_orderdate, c_name, c_custkey, o_orderkey], pk_conflict: "no check" } @@ -3406,28 +3401,31 @@ Fragment 1 StreamGroupTopN { order: "[orders.o_totalprice DESC, orders.o_orderdate ASC]", limit: 100, offset: 0, group_key: [6] } { state table: 1 } └── StreamProject { exprs: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, sum(lineitem.l_quantity), Vnode(orders.o_orderkey) as $expr1] } - └── StreamProject { exprs: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, sum(lineitem.l_quantity)] } - └── StreamHashAgg { group_key: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice], aggs: [sum(lineitem.l_quantity), count] } { result table: 2, state tables: [], distinct tables: [] } - └── StreamHashJoin { type: LeftSemi, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber] } - ├── left table: 3 - ├── right table: 5 - ├── left degree table: 4 - ├── right degree table: 6 - ├── StreamHashJoin { type: Inner, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [customer.c_custkey, customer.c_name, orders.o_orderkey, orders.o_totalprice, orders.o_orderdate, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber] } - │ ├── left table: 7 - │ ├── right table: 9 - │ ├── left degree table: 8 - │ ├── right degree table: 10 - │ ├── StreamExchange Hash([2]) from 2 - │ └── StreamExchange Hash([0]) from 5 - └── StreamProject { exprs: [lineitem.l_orderkey] } - └── StreamFilter { predicate: (sum(lineitem.l_quantity) > 1:Int32) } - └── StreamProject { exprs: [lineitem.l_orderkey, sum(lineitem.l_quantity)] } - └── StreamHashAgg { group_key: [lineitem.l_orderkey], aggs: [sum(lineitem.l_quantity), count] } { result table: 15, state tables: [], distinct tables: [] } - └── StreamExchange Hash([0]) from 6 + └── StreamHashAgg { group_key: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice], aggs: [sum(lineitem.l_quantity), count] } { result table: 2, state tables: [], distinct tables: [] } + └── StreamHashJoin { type: LeftSemi, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber] } + ├── left table: 3 + ├── right table: 5 + ├── left degree table: 4 + ├── right degree table: 6 + ├── StreamHashJoin { type: Inner, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [customer.c_custkey, customer.c_name, orders.o_orderkey, orders.o_totalprice, orders.o_orderdate, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber] } + │ ├── left table: 7 + │ ├── right table: 9 + │ ├── left degree table: 8 + │ ├── right degree table: 10 + │ ├── StreamExchange Hash([2]) from 2 + │ └── StreamExchange Hash([0]) from 5 + └── StreamProject { exprs: [lineitem.l_orderkey] } + └── StreamFilter { predicate: (sum(lineitem.l_quantity) > 1:Int32) } + └── StreamProject { exprs: [lineitem.l_orderkey, sum(lineitem.l_quantity)] } + └── StreamHashAgg { group_key: [lineitem.l_orderkey], aggs: [sum(lineitem.l_quantity), count] } { result table: 15, state tables: [], distinct tables: [] } + └── StreamExchange Hash([0]) from 6 Fragment 2 - StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [customer.c_custkey, customer.c_name, orders.o_orderkey, orders.o_totalprice, orders.o_orderdate] } { left table: 11, right table: 13, left degree table: 12, right degree table: 14 } + StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [customer.c_custkey, customer.c_name, orders.o_orderkey, orders.o_totalprice, orders.o_orderdate] } + ├── left table: 11 + ├── right table: 13 + ├── left degree table: 12 + ├── right degree table: 14 ├── StreamExchange Hash([0]) from 3 └── StreamExchange Hash([1]) from 4 diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 91dac433c14ce..e9999c4bfb8e8 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -263,12 +263,70 @@ impl PlanRoot { Ok(plan) } + /// Generate optimized stream plan + fn gen_optimized_stream_plan(&mut self) -> Result { + let ctx = self.plan.ctx(); + let _explain_trace = ctx.is_explain_trace(); + + let mut plan = self.gen_stream_plan()?; + + plan = plan.optimize_by_rules(&OptimizationStage::new( + "Add identity project between exchange and share", + vec![AvoidExchangeShareRule::create()], + ApplyOrder::BottomUp, + )); + + plan = plan.optimize_by_rules(&OptimizationStage::new( + "Merge StreamProject", + vec![StreamProjectMergeRule::create()], + ApplyOrder::BottomUp, + )); + + if ctx.session_ctx().config().get_streaming_enable_delta_join() { + // TODO: make it a logical optimization. + // Rewrite joins with index to delta join + plan = plan.optimize_by_rules(&OptimizationStage::new( + "To IndexDeltaJoin", + vec![IndexDeltaJoinRule::create()], + ApplyOrder::BottomUp, + )); + } + + // Inline session timezone + plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?; + + if ctx.is_explain_trace() { + ctx.trace("Inline session timezone:"); + ctx.trace(plan.explain_to_string().unwrap()); + } + + // Const eval of exprs at the last minute + plan = const_eval_exprs(plan)?; + + if ctx.is_explain_trace() { + ctx.trace("Const eval exprs:"); + ctx.trace(plan.explain_to_string().unwrap()); + } + + #[cfg(debug_assertions)] + InputRefValidator.validate(plan.clone()); + + if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) { + return Err(ErrorCode::NotSupported( + "exist dangling temporal scan".to_string(), + "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_string(), + ).into()); + } + + Ok(plan) + } + /// Generate create index or create materialize view plan. fn gen_stream_plan(&mut self) -> Result { let ctx = self.plan.ctx(); let explain_trace = ctx.is_explain_trace(); - let mut plan = match self.plan.convention() { + let plan = match self.plan.convention() { Convention::Logical => { let plan = self.gen_optimized_logical_plan_for_stream()?; @@ -322,49 +380,6 @@ impl PlanRoot { ctx.trace("To Stream Plan:"); ctx.trace(plan.explain_to_string().unwrap()); } - - plan = plan.optimize_by_rules(&OptimizationStage::new( - "Add identity project between exchange and share", - vec![AvoidExchangeShareRule::create()], - ApplyOrder::BottomUp, - )); - - if ctx.session_ctx().config().get_streaming_enable_delta_join() { - // TODO: make it a logical optimization. - // Rewrite joins with index to delta join - plan = plan.optimize_by_rules(&OptimizationStage::new( - "To IndexDeltaJoin", - vec![IndexDeltaJoinRule::create()], - ApplyOrder::BottomUp, - )); - } - - // Inline session timezone - plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?; - - if ctx.is_explain_trace() { - ctx.trace("Inline session timezone:"); - ctx.trace(plan.explain_to_string().unwrap()); - } - - // Const eval of exprs at the last minute - plan = const_eval_exprs(plan)?; - - if ctx.is_explain_trace() { - ctx.trace("Const eval exprs:"); - ctx.trace(plan.explain_to_string().unwrap()); - } - - #[cfg(debug_assertions)] - InputRefValidator.validate(plan.clone()); - - if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) { - return Err(ErrorCode::NotSupported( - "exist dangling temporal scan".to_string(), - "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_string(), - ).into()); - } - Ok(plan) } @@ -380,7 +395,7 @@ impl PlanRoot { watermark_descs: Vec, version: Option, ) -> Result { - let mut stream_plan = self.gen_stream_plan()?; + let mut stream_plan = self.gen_optimized_stream_plan()?; // Add DML node. stream_plan = StreamDml::new( @@ -423,7 +438,7 @@ impl PlanRoot { mv_name: String, definition: String, ) -> Result { - let stream_plan = self.gen_stream_plan()?; + let stream_plan = self.gen_optimized_stream_plan()?; StreamMaterialize::create( stream_plan, @@ -443,7 +458,7 @@ impl PlanRoot { index_name: String, definition: String, ) -> Result { - let stream_plan = self.gen_stream_plan()?; + let stream_plan = self.gen_optimized_stream_plan()?; StreamMaterialize::create( stream_plan, @@ -464,7 +479,7 @@ impl PlanRoot { definition: String, properties: WithOptions, ) -> Result { - let mut stream_plan = self.gen_stream_plan()?; + let mut stream_plan = self.gen_optimized_stream_plan()?; // Add a project node if there is hidden column(s). let input_fields = stream_plan.schema().fields(); diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index 9437648db1395..39014a35c9e93 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -86,6 +86,7 @@ pub use top_n_on_index_rule::*; mod stream; pub use stream::bushy_tree_join_ordering_rule::*; pub use stream::filter_with_now_to_join_rule::*; +pub use stream::stream_project_merge_rule::*; mod trivial_project_to_values_rule; pub use trivial_project_to_values_rule::*; mod union_input_values_merge_rule; @@ -141,6 +142,7 @@ macro_rules! for_all_rules { , { MinMaxOnIndexRule } , { AlwaysFalseFilterRule } , { BushyTreeJoinOrderingRule } + , { StreamProjectMergeRule } } }; } diff --git a/src/frontend/src/optimizer/rule/stream/mod.rs b/src/frontend/src/optimizer/rule/stream/mod.rs index f1440e913ed9a..9054131250df9 100644 --- a/src/frontend/src/optimizer/rule/stream/mod.rs +++ b/src/frontend/src/optimizer/rule/stream/mod.rs @@ -14,3 +14,4 @@ pub(crate) mod bushy_tree_join_ordering_rule; pub(crate) mod filter_with_now_to_join_rule; +pub(crate) mod stream_project_merge_rule; diff --git a/src/frontend/src/optimizer/rule/stream/stream_project_merge_rule.rs b/src/frontend/src/optimizer/rule/stream/stream_project_merge_rule.rs new file mode 100644 index 0000000000000..ff55dbcf58a67 --- /dev/null +++ b/src/frontend/src/optimizer/rule/stream/stream_project_merge_rule.rs @@ -0,0 +1,46 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::expr::ExprRewriter; +use crate::optimizer::plan_node::{LogicalProject, PlanTreeNodeUnary, StreamProject}; +use crate::optimizer::{BoxedRule, PlanRef, Rule}; +use crate::utils::Substitute; + +/// Merge contiguous [`StreamProject`] nodes. +pub struct StreamProjectMergeRule {} +impl Rule for StreamProjectMergeRule { + fn apply(&self, plan: PlanRef) -> Option { + let outer_project = plan.as_stream_project()?; + let input = outer_project.input(); + let inner_project = input.as_stream_project()?; + + let mut subst = Substitute { + mapping: inner_project.exprs().clone(), + }; + let exprs = outer_project + .exprs() + .iter() + .cloned() + .map(|expr| subst.rewrite_expr(expr)) + .collect(); + let logical_project = LogicalProject::new(inner_project.input(), exprs); + Some(StreamProject::new(logical_project).into()) + } +} + +impl StreamProjectMergeRule { + pub fn create() -> BoxedRule { + Box::new(StreamProjectMergeRule {}) + } +}