diff --git a/src/frontend/planner_test/tests/testdata/subquery.yaml b/src/frontend/planner_test/tests/testdata/subquery.yaml index 8c73032d523c..4271a945f376 100644 --- a/src/frontend/planner_test/tests/testdata/subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/subquery.yaml @@ -217,35 +217,37 @@ └─LogicalProject { exprs: [CorrelatedInputRef { index: 0, correlated_id: 1 } as $expr1] } └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } optimized_logical_plan_for_batch: | - LogicalJoin { type: LeftSemi, on: IsNotDistinctFrom(auction.date_time, auction.date_time), output: all } - ├─LogicalHopWindow { time_col: auction.date_time, slide: 00:00:01, size: 01:00:00, output: all } - | └─LogicalScan { table: auction, columns: [auction.date_time], predicate: IsNotNull(auction.date_time) } - └─LogicalProject { exprs: [auction.date_time] } - └─LogicalProject { exprs: [auction.date_time, auction.date_time] } - └─LogicalAgg { group_key: [auction.date_time], aggs: [] } - └─LogicalJoin { type: Inner, on: true, output: all } - ├─LogicalAgg { group_key: [auction.date_time], aggs: [] } - | └─LogicalHopWindow { time_col: auction.date_time, slide: 00:00:01, size: 01:00:00, output: [auction.date_time] } - | └─LogicalScan { table: auction, columns: [auction.date_time], predicate: IsNotNull(auction.date_time) } - └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } + LogicalHopWindow { time_col: auction.date_time, slide: 00:00:01, size: 01:00:00, output: all } + └─LogicalFilter { predicate: IsNotNull(auction.date_time) } + └─LogicalJoin { type: LeftSemi, on: IsNotDistinctFrom(auction.date_time, auction.date_time), output: all } + ├─LogicalScan { table: auction, columns: [auction.date_time], predicate: IsNotNull(auction.date_time) } + └─LogicalProject { exprs: [auction.date_time] } + └─LogicalProject { exprs: [auction.date_time, auction.date_time] } + └─LogicalAgg { group_key: [auction.date_time], aggs: [] } + └─LogicalJoin { type: Inner, on: true, output: all } + ├─LogicalAgg { group_key: [auction.date_time], aggs: [] } + | └─LogicalHopWindow { time_col: auction.date_time, slide: 00:00:01, size: 01:00:00, output: [auction.date_time] } + | └─LogicalScan { table: auction, columns: [auction.date_time], predicate: IsNotNull(auction.date_time) } + └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } batch_plan: | - BatchExchange { order: [], dist: Single } - └─BatchHashJoin { type: LeftSemi, predicate: auction.date_time IS NOT DISTINCT FROM auction.date_time, output: all } - ├─BatchHopWindow { time_col: auction.date_time, slide: 00:00:01, size: 01:00:00, output: all } - | └─BatchExchange { order: [], dist: HashShard(auction.date_time) } - | └─BatchFilter { predicate: IsNotNull(auction.date_time) } - | └─BatchScan { table: auction, columns: [auction.date_time], distribution: SomeShard } - └─BatchProject { exprs: [auction.date_time] } - └─BatchHashAgg { group_key: [auction.date_time], aggs: [] } - └─BatchExchange { order: [], dist: HashShard(auction.date_time) } - └─BatchNestedLoopJoin { type: Inner, predicate: true, output: all } - ├─BatchExchange { order: [], dist: Single } - | └─BatchHashAgg { group_key: [auction.date_time], aggs: [] } - | └─BatchHopWindow { time_col: auction.date_time, slide: 00:00:01, size: 01:00:00, output: [auction.date_time] } - | └─BatchExchange { order: [], dist: HashShard(auction.date_time) } - | └─BatchFilter { predicate: IsNotNull(auction.date_time) } - | └─BatchScan { table: auction, columns: [auction.date_time], distribution: SomeShard } - └─BatchValues { rows: [[]] } + BatchHopWindow { time_col: auction.date_time, slide: 00:00:01, size: 01:00:00, output: all } + └─BatchExchange { order: [], dist: Single } + └─BatchFilter { predicate: IsNotNull(auction.date_time) } + └─BatchHashJoin { type: LeftSemi, predicate: auction.date_time IS NOT DISTINCT FROM auction.date_time, output: all } + ├─BatchExchange { order: [], dist: HashShard(auction.date_time) } + | └─BatchFilter { predicate: IsNotNull(auction.date_time) } + | └─BatchScan { table: auction, columns: [auction.date_time], distribution: SomeShard } + └─BatchProject { exprs: [auction.date_time] } + └─BatchHashAgg { group_key: [auction.date_time], aggs: [] } + └─BatchExchange { order: [], dist: HashShard(auction.date_time) } + └─BatchNestedLoopJoin { type: Inner, predicate: true, output: all } + ├─BatchExchange { order: [], dist: Single } + | └─BatchHashAgg { group_key: [auction.date_time], aggs: [] } + | └─BatchHopWindow { time_col: auction.date_time, slide: 00:00:01, size: 01:00:00, output: [auction.date_time] } + | └─BatchExchange { order: [], dist: HashShard(auction.date_time) } + | └─BatchFilter { predicate: IsNotNull(auction.date_time) } + | └─BatchScan { table: auction, columns: [auction.date_time], distribution: SomeShard } + └─BatchValues { rows: [[]] } stream_error: |- Not supported: streaming nested-loop join HINT: The non-equal join in the query requires a nested-loop join executor, which could be very expensive to run. Consider rewriting the query to use dynamic filter as a substitute if possible. diff --git a/src/frontend/planner_test/tests/testdata/time_window.yaml b/src/frontend/planner_test/tests/testdata/time_window.yaml index f16bd8310347..7ac771c1aeb0 100644 --- a/src/frontend/planner_test/tests/testdata/time_window.yaml +++ b/src/frontend/planner_test/tests/testdata/time_window.yaml @@ -210,3 +210,87 @@ batch_plan: | BatchProject { exprs: [*VALUES*_0.column_0, TumbleStart(*VALUES*_0.column_0, '00:00:10':Interval) as $expr1, (TumbleStart(*VALUES*_0.column_0, '00:00:10':Interval) + '00:00:10':Interval) as $expr2] } └─BatchValues { rows: [['2020-01-01 12:00:00':Timestamp]] } +- sql: | + create table t1 ( c1 varchar, c2 int, c3 timestamp); + create table t2 ( d1 int, d2 varchar, d3 timestamp); + create index idx_t1 on t1(c2) include (c1, c2, c3); + select * from t2 inner join hop(t1, c3, interval '10 minute',interval '20 minute') on t2.d1 = t1.c2; + logical_plan: | + LogicalProject { exprs: [t2.d1, t2.d2, t2.d3, t1.c1, t1.c2, t1.c3, window_start, window_end] } + └─LogicalJoin { type: Inner, on: (t2.d1 = t1.c2), output: all } + ├─LogicalScan { table: t2, columns: [t2.d1, t2.d2, t2.d3, t2._row_id] } + └─LogicalHopWindow { time_col: t1.c3, slide: 00:10:00, size: 00:20:00, output: all } + └─LogicalFilter { predicate: IsNotNull(t1.c3) } + └─LogicalScan { table: t1, columns: [t1.c1, t1.c2, t1.c3, t1._row_id] } + optimized_logical_plan_for_batch: | + LogicalHopWindow { time_col: t1.c3, slide: 00:10:00, size: 00:20:00, output: all } + └─LogicalFilter { predicate: IsNotNull(t1.c3) } + └─LogicalJoin { type: Inner, on: (t2.d1 = t1.c2), output: all } + ├─LogicalScan { table: t2, columns: [t2.d1, t2.d2, t2.d3] } + └─LogicalScan { table: t1, columns: [t1.c1, t1.c2, t1.c3], predicate: IsNotNull(t1.c3) } + batch_plan: | + BatchHopWindow { time_col: idx_t1.c3, slide: 00:10:00, size: 00:20:00, output: all } + └─BatchExchange { order: [], dist: Single } + └─BatchFilter { predicate: IsNotNull(idx_t1.c3) } + └─BatchLookupJoin { type: Inner, predicate: t2.d1 = idx_t1.c2 AND IsNotNull(idx_t1.c3), output: all } + └─BatchExchange { order: [], dist: UpstreamHashShard(t2.d1) } + └─BatchScan { table: t2, columns: [t2.d1, t2.d2, t2.d3], distribution: SomeShard } +- sql: | + create table t1 ( c1 varchar, c2 int, c3 timestamp); + create table t2 ( d1 int, d2 varchar, d3 timestamp); + create index idx_t1 on t1(c2) include (c1, c2, c3); + select * from hop(t2, d3, interval '10 minute',interval '20 minute') inner join hop(t1, c3, interval '10 minute',interval '20 minute') on t2.d1 = t1.c2; + logical_plan: | + LogicalProject { exprs: [t2.d1, t2.d2, t2.d3, window_start, window_end, t1.c1, t1.c2, t1.c3, window_start, window_end] } + └─LogicalJoin { type: Inner, on: (t2.d1 = t1.c2), output: all } + ├─LogicalHopWindow { time_col: t2.d3, slide: 00:10:00, size: 00:20:00, output: all } + | └─LogicalFilter { predicate: IsNotNull(t2.d3) } + | └─LogicalScan { table: t2, columns: [t2.d1, t2.d2, t2.d3, t2._row_id] } + └─LogicalHopWindow { time_col: t1.c3, slide: 00:10:00, size: 00:20:00, output: all } + └─LogicalFilter { predicate: IsNotNull(t1.c3) } + └─LogicalScan { table: t1, columns: [t1.c1, t1.c2, t1.c3, t1._row_id] } + optimized_logical_plan_for_batch: | + LogicalHopWindow { time_col: t1.c3, slide: 00:10:00, size: 00:20:00, output: [t2.d1, t2.d2, t2.d3, window_start, window_end, t1.c1, t1.c2, t1.c3, window_start, window_end] } + └─LogicalFilter { predicate: IsNotNull(t1.c3) } + └─LogicalHopWindow { time_col: t2.d3, slide: 00:10:00, size: 00:20:00, output: all } + └─LogicalFilter { predicate: IsNotNull(t2.d3) } + └─LogicalJoin { type: Inner, on: (t2.d1 = t1.c2), output: all } + ├─LogicalScan { table: t2, columns: [t2.d1, t2.d2, t2.d3], predicate: IsNotNull(t2.d3) } + └─LogicalScan { table: t1, columns: [t1.c1, t1.c2, t1.c3], predicate: IsNotNull(t1.c3) } + batch_plan: | + BatchHopWindow { time_col: idx_t1.c3, slide: 00:10:00, size: 00:20:00, output: [t2.d1, t2.d2, t2.d3, window_start, window_end, idx_t1.c1, idx_t1.c2, idx_t1.c3, window_start, window_end] } + └─BatchExchange { order: [], dist: Single } + └─BatchFilter { predicate: IsNotNull(idx_t1.c3) } + └─BatchHopWindow { time_col: t2.d3, slide: 00:10:00, size: 00:20:00, output: all } + └─BatchFilter { predicate: IsNotNull(t2.d3) } + └─BatchLookupJoin { type: Inner, predicate: t2.d1 = idx_t1.c2 AND IsNotNull(idx_t1.c3), output: all } + └─BatchExchange { order: [], dist: UpstreamHashShard(t2.d1) } + └─BatchFilter { predicate: IsNotNull(t2.d3) } + └─BatchScan { table: t2, columns: [t2.d1, t2.d2, t2.d3], distribution: SomeShard } +- sql: | + CREATE TABLE auction (id BIGINT, item_name CHARACTER VARYING, description CHARACTER VARYING, initial_bid BIGINT, reserve BIGINT, date_time TIMESTAMP, expires TIMESTAMP, seller BIGINT, category BIGINT, extra CHARACTER VARYING, PRIMARY KEY (id)); + CREATE TABLE nation (n_nationkey INT, n_name CHARACTER VARYING, n_regionkey INT, n_comment CHARACTER VARYING, PRIMARY KEY (n_nationkey)); + CREATE TABLE alltypes2 (c1 BOOLEAN, c2 SMALLINT, c3 INT, c4 BIGINT, c5 REAL, c6 DOUBLE, c7 NUMERIC, c8 DATE, c9 CHARACTER VARYING, c10 TIME, c11 TIMESTAMP, c13 INTERVAL, c14 STRUCT, c15 INT[], c16 CHARACTER VARYING[]); + SELECT ((CASE WHEN (hop_0.c2 = (32)) THEN TIMESTAMP '2022-07-29 15:06:36' WHEN hop_0.c1 THEN t_2.expires WHEN false THEN (t_2.expires + (INTERVAL '-18')) ELSE t_2.expires END) + (INTERVAL '87')) AS col_0, ((INT '618') % ((311))) AS col_1, (SMALLINT '35') AS col_2, ((coalesce(NULL, NULL, NULL, NULL, NULL, (INT '0'), NULL, NULL, NULL, NULL)) - hop_0.c7) AS col_3 FROM hop(alltypes2, alltypes2.c11, INTERVAL '3600', INTERVAL '144000') AS hop_0, nation AS t_1 JOIN auction AS t_2 ON t_1.n_name = t_2.item_name AND true WHERE (t_2.seller > ((CAST(((SMALLINT '62')) IN (hop_0.c2, hop_0.c2, hop_0.c2, (- hop_0.c2), hop_0.c2, hop_0.c2, hop_0.c2, (SMALLINT '-32768')) AS INT) | t_1.n_nationkey) + hop_0.c7)) GROUP BY t_2.description, hop_0.c7, t_2.category, t_2.item_name, hop_0.c15, hop_0.c2, hop_0.c1, hop_0.c9, t_2.expires, hop_0.c6; + logical_plan: | + LogicalProject { exprs: [(Case((alltypes2.c2 = 32:Int32), '2022-07-29 15:06:36':Timestamp, alltypes2.c1, auction.expires, false:Boolean, (auction.expires + '-00:00:18':Interval), auction.expires) + '00:01:27':Interval) as $expr1, (618:Int32 % 311:Int32) as $expr2, 35:Int16, (Coalesce(null:Int32, null:Int32, null:Int32, null:Int32, null:Int32, 0:Int32, null:Int32, null:Int32, null:Int32, null:Int32) - alltypes2.c7) as $expr3] } + └─LogicalAgg { group_key: [auction.description, alltypes2.c7, auction.category, auction.item_name, alltypes2.c15, alltypes2.c2, alltypes2.c1, alltypes2.c9, auction.expires, alltypes2.c6], aggs: [] } + └─LogicalProject { exprs: [auction.description, alltypes2.c7, auction.category, auction.item_name, alltypes2.c15, alltypes2.c2, alltypes2.c1, alltypes2.c9, auction.expires, alltypes2.c6] } + └─LogicalFilter { predicate: (auction.seller > (((((((((In(62:Int16, -32768:Int16) OR (62:Int16 = alltypes2.c2)) OR (62:Int16 = alltypes2.c2)) OR (62:Int16 = alltypes2.c2)) OR (62:Int16 = Neg(alltypes2.c2))) OR (62:Int16 = alltypes2.c2)) OR (62:Int16 = alltypes2.c2)) OR (62:Int16 = alltypes2.c2))::Int32 | nation.n_nationkey) + alltypes2.c7)) } + └─LogicalJoin { type: Inner, on: true, output: all } + ├─LogicalHopWindow { time_col: alltypes2.c11, slide: 01:00:00, size: 40:00:00, output: all } + | └─LogicalFilter { predicate: IsNotNull(alltypes2.c11) } + | └─LogicalScan { table: alltypes2, columns: [alltypes2.c1, alltypes2.c2, alltypes2.c3, alltypes2.c4, alltypes2.c5, alltypes2.c6, alltypes2.c7, alltypes2.c8, alltypes2.c9, alltypes2.c10, alltypes2.c11, alltypes2.c13, alltypes2.c14, alltypes2.c15, alltypes2.c16, alltypes2._row_id] } + └─LogicalJoin { type: Inner, on: (nation.n_name = auction.item_name), output: all } + ├─LogicalScan { table: nation, columns: [nation.n_nationkey, nation.n_name, nation.n_regionkey, nation.n_comment] } + └─LogicalScan { table: auction, columns: [auction.id, auction.item_name, auction.description, auction.initial_bid, auction.reserve, auction.date_time, auction.expires, auction.seller, auction.category, auction.extra] } + optimized_logical_plan_for_batch: | + LogicalProject { exprs: [(Case((alltypes2.c2 = 32:Int32), '2022-07-29 15:06:36':Timestamp, alltypes2.c1, auction.expires, false:Boolean, (auction.expires + '-00:00:18':Interval), auction.expires) + '00:01:27':Interval) as $expr1, (618:Int32 % 311:Int32) as $expr2, 35:Int16, (Coalesce(null:Int32, null:Int32, null:Int32, null:Int32, null:Int32, 0:Int32, null:Int32, null:Int32, null:Int32, null:Int32) - alltypes2.c7) as $expr3] } + └─LogicalAgg { group_key: [auction.description, alltypes2.c7, auction.category, auction.item_name, alltypes2.c15, alltypes2.c2, alltypes2.c1, alltypes2.c9, auction.expires, alltypes2.c6], aggs: [] } + └─LogicalHopWindow { time_col: alltypes2.c11, slide: 01:00:00, size: 40:00:00, output: [auction.description, alltypes2.c7, auction.category, auction.item_name, alltypes2.c15, alltypes2.c2, alltypes2.c1, alltypes2.c9, auction.expires, alltypes2.c6] } + └─LogicalFilter { predicate: IsNotNull(alltypes2.c11) } + └─LogicalJoin { type: Inner, on: (auction.seller > (((((((((In(62:Int16, -32768:Int16) OR (62:Int16 = alltypes2.c2)) OR (62:Int16 = alltypes2.c2)) OR (62:Int16 = alltypes2.c2)) OR (62:Int16 = Neg(alltypes2.c2))) OR (62:Int16 = alltypes2.c2)) OR (62:Int16 = alltypes2.c2)) OR (62:Int16 = alltypes2.c2))::Int32 | nation.n_nationkey) + alltypes2.c7)), output: all } + ├─LogicalJoin { type: Inner, on: (nation.n_name = auction.item_name), output: [nation.n_nationkey, auction.item_name, auction.description, auction.expires, auction.seller, auction.category] } + | ├─LogicalScan { table: nation, columns: [nation.n_nationkey, nation.n_name] } + | └─LogicalScan { table: auction, columns: [auction.item_name, auction.description, auction.expires, auction.seller, auction.category] } + └─LogicalScan { table: alltypes2, columns: [alltypes2.c1, alltypes2.c2, alltypes2.c6, alltypes2.c7, alltypes2.c9, alltypes2.c11, alltypes2.c15], predicate: IsNotNull(alltypes2.c11) } diff --git a/src/frontend/src/expr/input_ref.rs b/src/frontend/src/expr/input_ref.rs index 58732392b6f3..d7ef606700c1 100644 --- a/src/frontend/src/expr/input_ref.rs +++ b/src/frontend/src/expr/input_ref.rs @@ -107,6 +107,13 @@ impl InputRef { self.index = (self.index as isize + offset) as usize; } + pub fn clone_with_offset(&self, offset: isize) -> Self { + Self { + index: (self.index as isize + offset) as usize, + data_type: self.data_type.clone(), + } + } + /// Convert to protobuf. pub fn to_proto(&self) -> PbInputRef { PbInputRef { diff --git a/src/frontend/src/optimizer/logical_optimization.rs b/src/frontend/src/optimizer/logical_optimization.rs index 97843dde6155..a907f87e6048 100644 --- a/src/frontend/src/optimizer/logical_optimization.rs +++ b/src/frontend/src/optimizer/logical_optimization.rs @@ -255,6 +255,12 @@ lazy_static! { vec![AlwaysFalseFilterRule::create()], ApplyOrder::TopDown, ); + + static ref PULL_UP_HOP: OptimizationStage = OptimizationStage::new( + "Pull up hop", + vec![PullUpHopRule::create()], + ApplyOrder::BottomUp, + ); } impl LogicalOptimizer { @@ -481,6 +487,8 @@ impl LogicalOptimizer { plan = plan.optimize_by_rules(&PROJECT_REMOVE); + plan = plan.optimize_by_rules(&PULL_UP_HOP); + plan = plan.optimize_by_rules(&CONVERT_WINDOW_AGG); if has_logical_over_agg(plan.clone()) { diff --git a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs index b6bc0f867ff0..d9babaab86e1 100644 --- a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs @@ -39,6 +39,10 @@ pub struct LogicalHopWindow { } impl LogicalHopWindow { + /// Hop windows will add `windows_start` and `windows_end` columns at the end. + /// Take care to modify the code referring it if above rule changes. + pub const ADDITION_COLUMN_LEN: usize = 2; + /// just used in optimizer and the function will not check if the `time_col`'s value is NULL /// compared with `LogicalHopWindow::create` fn new( @@ -122,6 +126,16 @@ impl LogicalHopWindow { self.core.internal_window_end_col_idx() } + pub fn output_window_start_col_idx(&self) -> Option { + self.internal2output_col_mapping() + .try_map(self.internal_window_start_col_idx()) + } + + pub fn output_window_end_col_idx(&self) -> Option { + self.internal2output_col_mapping() + .try_map(self.internal_window_end_col_idx()) + } + pub fn o2i_col_mapping(&self) -> ColIndexMapping { self.core.o2i_col_mapping() } @@ -134,11 +148,15 @@ impl LogicalHopWindow { self.core.internal_column_num() } - fn output2internal_col_mapping(&self) -> ColIndexMapping { + pub fn output2internal_col_mapping(&self) -> ColIndexMapping { self.core.output2internal_col_mapping() } - fn clone_with_output_indices(&self, output_indices: Vec) -> Self { + pub fn internal2output_col_mapping(&self) -> ColIndexMapping { + self.core.internal2output_col_mapping() + } + + pub fn clone_with_output_indices(&self, output_indices: Vec) -> Self { Self::new( self.input(), self.core.time_col.clone(), diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index 17f07a6db6ba..36cb1103e6bc 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -142,6 +142,28 @@ impl LogicalJoin { &self.core.on } + /// Collect all input ref in the on condition. And separate them into left and right. + pub fn input_idx_on_condition(&self) -> (Vec, Vec) { + let input_refs = self + .core + .on + .collect_input_refs(self.core.left.schema().len() + self.core.right.schema().len()); + let index_group = input_refs + .ones() + .group_by(|i| *i < self.core.left.schema().len()); + let left_index = index_group + .into_iter() + .next() + .map_or(vec![], |group| group.1.collect_vec()); + let right_index = index_group.into_iter().next().map_or(vec![], |group| { + group + .1 + .map(|i| i - self.core.left.schema().len()) + .collect_vec() + }); + (left_index, right_index) + } + /// Get the join type of the logical join. pub fn join_type(&self) -> JoinType { self.core.join_type diff --git a/src/frontend/src/optimizer/rule/join_project_transpose_rule.rs b/src/frontend/src/optimizer/rule/join_project_transpose_rule.rs index dd071a76bf5c..e79520dda509 100644 --- a/src/frontend/src/optimizer/rule/join_project_transpose_rule.rs +++ b/src/frontend/src/optimizer/rule/join_project_transpose_rule.rs @@ -37,18 +37,8 @@ impl Rule for JoinProjectTransposeRule { let (left, right, on, join_type, _) = join.clone().decompose(); - let (left_input_index_on_condition, right_input_index_on_condition) = { - let input_refs = on.collect_input_refs(left.schema().len() + right.schema().len()); - let index_group = input_refs.ones().group_by(|i| *i < left.schema().len()); - let left_index = index_group - .into_iter() - .next() - .map_or(vec![], |group| group.1.collect_vec()); - let right_index = index_group.into_iter().next().map_or(vec![], |group| { - group.1.map(|i| i - left.schema().len()).collect_vec() - }); - (left_index, right_index) - }; + let (left_input_index_on_condition, right_input_index_on_condition) = + join.input_idx_on_condition(); let full_output_len = left.schema().len() + right.schema().len(); let right_output_len = right.schema().len(); diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index 5adf649661f2..a1796bd0a814 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -101,6 +101,8 @@ mod always_false_filter_rule; pub use always_false_filter_rule::*; mod join_project_transpose_rule; pub use join_project_transpose_rule::*; +mod pull_up_hop_rule; +pub use pull_up_hop_rule::*; mod apply_offset_rewriter; use apply_offset_rewriter::ApplyOffsetRewriter; @@ -146,6 +148,7 @@ macro_rules! for_all_rules { , { BushyTreeJoinOrderingRule } , { StreamProjectMergeRule } , { JoinProjectTransposeRule } + , { PullUpHopRule } } }; } diff --git a/src/frontend/src/optimizer/rule/pull_up_hop_rule.rs b/src/frontend/src/optimizer/rule/pull_up_hop_rule.rs new file mode 100644 index 000000000000..d29599bb5fe1 --- /dev/null +++ b/src/frontend/src/optimizer/rule/pull_up_hop_rule.rs @@ -0,0 +1,214 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::util::column_index_mapping::ColIndexMapping; +use risingwave_pb::plan_common::JoinType; + +use super::{BoxedRule, Rule}; +use crate::optimizer::plan_node::{LogicalHopWindow, LogicalJoin}; +use crate::utils::IndexRewriter; + +pub struct PullUpHopRule {} + +impl Rule for PullUpHopRule { + fn apply(&self, plan: crate::PlanRef) -> Option { + let join = plan.as_logical_join()?; + + let (left, right, on, join_type, mut output_index) = join.clone().decompose(); + + let (left_input_index_on_condition, right_input_index_on_condition) = + join.input_idx_on_condition(); + + let (left_output_pos, right_output_pos) = { + let mut left_output_pos = vec![]; + let mut right_output_pos = vec![]; + output_index.iter_mut().enumerate().for_each(|(pos, idx)| { + if *idx < left.schema().len() { + left_output_pos.push(pos); + } else { + right_output_pos.push(pos); + // make right output index start from 0. We can identify left and right output + // index by the output_pos. + *idx -= left.schema().len(); + } + }); + (left_output_pos, right_output_pos) + }; + + let mut old_i2new_i = ColIndexMapping::empty(0, 0); + + let mut pull_up_left = false; + let mut pull_up_right = false; + + let (new_left, + left_time_col, + left_window_slide, + left_window_size, + left_window_offset, + ) = if let Some(hop) = left.as_logical_hop_window() && left_input_index_on_condition.iter().all(|&index| hop.output_window_start_col_idx().map_or(true, |v|index!=v) && hop.output_window_end_col_idx().map_or(true, |v|index!=v)) && join_type != JoinType::RightAnti && join_type != JoinType::RightSemi && join_type != JoinType::RightOuter && join_type != JoinType::FullOuter { + let (input, + time_col, + window_slide, + window_size, + window_offset, + _) = hop.clone().into_parts(); + + old_i2new_i = old_i2new_i.union(&join.i2l_col_mapping_ignore_join_type().composite(&hop.o2i_col_mapping())); + left_output_pos.iter().for_each(|&pos| { + output_index[pos] = hop.output2internal_col_mapping().map(output_index[pos]); + }); + pull_up_left = true; + (input,Some(time_col),Some(window_slide),Some(window_size),Some(window_offset)) + } else { + old_i2new_i = old_i2new_i.union(&join.i2l_col_mapping_ignore_join_type()); + + (left,None,None,None,None) + }; + + let (new_right, + right_time_col, + right_window_slide, + right_window_size, + right_window_offset + ) = if let Some(hop) = right.as_logical_hop_window() && right_input_index_on_condition.iter().all(|&index| hop.output_window_start_col_idx().map_or(true, |v|index!=v) && hop.output_window_end_col_idx().map_or(true, |v|index!=v)) && join_type != JoinType::LeftAnti && join_type != JoinType::LeftSemi && join_type != JoinType::LeftOuter && join_type != JoinType::FullOuter { + let (input, + time_col, + window_slide, + window_size, + window_offset, + _) = hop.clone().into_parts(); + + old_i2new_i = old_i2new_i.union(&join.i2r_col_mapping_ignore_join_type().composite(&hop.o2i_col_mapping()).clone_with_offset(new_left.schema().len())); + + right_output_pos.iter().for_each(|&pos| { + output_index[pos] = hop.output2internal_col_mapping().map(output_index[pos]); + }); + pull_up_right = true; + (input,Some(time_col),Some(window_slide),Some(window_size),Some(window_offset)) + } else { + old_i2new_i = old_i2new_i.union(&join.i2r_col_mapping_ignore_join_type().clone_with_offset(new_left.schema().len())); + + (right,None,None,None,None) + }; + + if !pull_up_left && !pull_up_right { + return None; + } + + let new_output_index = { + let new_right_output_len = + if join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti { + 0 + } else { + new_right.schema().len() + }; + let new_left_output_len = + if join_type == JoinType::RightSemi || join_type == JoinType::RightAnti { + 0 + } else { + new_left.schema().len() + }; + + // The left output index can separate into two parts: + // `left_other_column | left_window_start | letf_window_end` + // The right output index can separate into two parts: + // `right_other_column | right_window_start | right_window_end` + // + // If we pull up left, the column index will be changed to: + // `left_other_column | right_column | left_window_start | letf_window_end`, + // we need to update the index of left window start and left window end. + // + // If we pull up right and left, the column index will be changed to: + // `left_other_column | right_other_column | left_window_start | letf_window_end | + // right_window_tart | right_window_end |`, we need to update the index of + // left window start and left window end and right window start and right window end. + if pull_up_left { + left_output_pos.iter().for_each(|&pos| { + if output_index[pos] >= new_left_output_len { + output_index[pos] += new_right_output_len; + } + }); + } + if pull_up_right && pull_up_left { + right_output_pos.iter().for_each(|&pos| { + if output_index[pos] < new_right_output_len { + output_index[pos] += new_left_output_len; + } else { + output_index[pos] += + new_left_output_len + LogicalHopWindow::ADDITION_COLUMN_LEN; + } + }); + } else { + right_output_pos.iter().for_each(|&pos| { + output_index[pos] += new_left_output_len; + }); + } + output_index + }; + let new_left_len = new_left.schema().len(); + let new_cond = on.rewrite_expr(&mut IndexRewriter::new(old_i2new_i)); + let new_join = LogicalJoin::new(new_left, new_right, join_type, new_cond); + + let new_hop = if pull_up_left && pull_up_right { + let left_hop = LogicalHopWindow::create( + new_join.into(), + left_time_col.unwrap(), + left_window_slide.unwrap(), + left_window_size.unwrap(), + left_window_offset.unwrap(), + ); + LogicalHopWindow::create( + left_hop, + right_time_col + .unwrap() + .clone_with_offset(new_left_len as isize), + right_window_slide.unwrap(), + right_window_size.unwrap(), + right_window_offset.unwrap(), + ) + } else if pull_up_left { + LogicalHopWindow::create( + new_join.into(), + left_time_col.unwrap(), + left_window_slide.unwrap(), + left_window_size.unwrap(), + left_window_offset.unwrap(), + ) + } else { + LogicalHopWindow::create( + new_join.into(), + right_time_col + .unwrap() + .clone_with_offset(new_left_len as isize), + right_window_slide.unwrap(), + right_window_size.unwrap(), + right_window_offset.unwrap(), + ) + }; + + Some( + new_hop + .as_logical_hop_window() + .unwrap() + .clone_with_output_indices(new_output_index) + .into(), + ) + } +} + +impl PullUpHopRule { + pub fn create() -> BoxedRule { + Box::new(PullUpHopRule {}) + } +}