Skip to content

Commit

Permalink
feat(frontend):support pull_up_hop rule (risingwavelabs#8954)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME authored Apr 5, 2023
1 parent 731f211 commit db072cb
Show file tree
Hide file tree
Showing 9 changed files with 390 additions and 42 deletions.
58 changes: 30 additions & 28 deletions src/frontend/planner_test/tests/testdata/subquery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -217,35 +217,37 @@
└─LogicalProject { exprs: [CorrelatedInputRef { index: 0, correlated_id: 1 } as $expr1] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
optimized_logical_plan_for_batch: |
LogicalJoin { type: LeftSemi, on: IsNotDistinctFrom(auction.date_time, auction.date_time), output: all }
├─LogicalHopWindow { time_col: auction.date_time, slide: 00:00:01, size: 01:00:00, output: all }
| └─LogicalScan { table: auction, columns: [auction.date_time], predicate: IsNotNull(auction.date_time) }
└─LogicalProject { exprs: [auction.date_time] }
└─LogicalProject { exprs: [auction.date_time, auction.date_time] }
└─LogicalAgg { group_key: [auction.date_time], aggs: [] }
└─LogicalJoin { type: Inner, on: true, output: all }
├─LogicalAgg { group_key: [auction.date_time], aggs: [] }
| └─LogicalHopWindow { time_col: auction.date_time, slide: 00:00:01, size: 01:00:00, output: [auction.date_time] }
| └─LogicalScan { table: auction, columns: [auction.date_time], predicate: IsNotNull(auction.date_time) }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
LogicalHopWindow { time_col: auction.date_time, slide: 00:00:01, size: 01:00:00, output: all }
└─LogicalFilter { predicate: IsNotNull(auction.date_time) }
└─LogicalJoin { type: LeftSemi, on: IsNotDistinctFrom(auction.date_time, auction.date_time), output: all }
├─LogicalScan { table: auction, columns: [auction.date_time], predicate: IsNotNull(auction.date_time) }
└─LogicalProject { exprs: [auction.date_time] }
└─LogicalProject { exprs: [auction.date_time, auction.date_time] }
└─LogicalAgg { group_key: [auction.date_time], aggs: [] }
└─LogicalJoin { type: Inner, on: true, output: all }
├─LogicalAgg { group_key: [auction.date_time], aggs: [] }
| └─LogicalHopWindow { time_col: auction.date_time, slide: 00:00:01, size: 01:00:00, output: [auction.date_time] }
| └─LogicalScan { table: auction, columns: [auction.date_time], predicate: IsNotNull(auction.date_time) }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchHashJoin { type: LeftSemi, predicate: auction.date_time IS NOT DISTINCT FROM auction.date_time, output: all }
├─BatchHopWindow { time_col: auction.date_time, slide: 00:00:01, size: 01:00:00, output: all }
| └─BatchExchange { order: [], dist: HashShard(auction.date_time) }
| └─BatchFilter { predicate: IsNotNull(auction.date_time) }
| └─BatchScan { table: auction, columns: [auction.date_time], distribution: SomeShard }
└─BatchProject { exprs: [auction.date_time] }
└─BatchHashAgg { group_key: [auction.date_time], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(auction.date_time) }
└─BatchNestedLoopJoin { type: Inner, predicate: true, output: all }
├─BatchExchange { order: [], dist: Single }
| └─BatchHashAgg { group_key: [auction.date_time], aggs: [] }
| └─BatchHopWindow { time_col: auction.date_time, slide: 00:00:01, size: 01:00:00, output: [auction.date_time] }
| └─BatchExchange { order: [], dist: HashShard(auction.date_time) }
| └─BatchFilter { predicate: IsNotNull(auction.date_time) }
| └─BatchScan { table: auction, columns: [auction.date_time], distribution: SomeShard }
└─BatchValues { rows: [[]] }
BatchHopWindow { time_col: auction.date_time, slide: 00:00:01, size: 01:00:00, output: all }
└─BatchExchange { order: [], dist: Single }
└─BatchFilter { predicate: IsNotNull(auction.date_time) }
└─BatchHashJoin { type: LeftSemi, predicate: auction.date_time IS NOT DISTINCT FROM auction.date_time, output: all }
├─BatchExchange { order: [], dist: HashShard(auction.date_time) }
| └─BatchFilter { predicate: IsNotNull(auction.date_time) }
| └─BatchScan { table: auction, columns: [auction.date_time], distribution: SomeShard }
└─BatchProject { exprs: [auction.date_time] }
└─BatchHashAgg { group_key: [auction.date_time], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(auction.date_time) }
└─BatchNestedLoopJoin { type: Inner, predicate: true, output: all }
├─BatchExchange { order: [], dist: Single }
| └─BatchHashAgg { group_key: [auction.date_time], aggs: [] }
| └─BatchHopWindow { time_col: auction.date_time, slide: 00:00:01, size: 01:00:00, output: [auction.date_time] }
| └─BatchExchange { order: [], dist: HashShard(auction.date_time) }
| └─BatchFilter { predicate: IsNotNull(auction.date_time) }
| └─BatchScan { table: auction, columns: [auction.date_time], distribution: SomeShard }
└─BatchValues { rows: [[]] }
stream_error: |-
Not supported: streaming nested-loop join
HINT: The non-equal join in the query requires a nested-loop join executor, which could be very expensive to run. Consider rewriting the query to use dynamic filter as a substitute if possible.
Expand Down
84 changes: 84 additions & 0 deletions src/frontend/planner_test/tests/testdata/time_window.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,87 @@
batch_plan: |
BatchProject { exprs: [*VALUES*_0.column_0, TumbleStart(*VALUES*_0.column_0, '00:00:10':Interval) as $expr1, (TumbleStart(*VALUES*_0.column_0, '00:00:10':Interval) + '00:00:10':Interval) as $expr2] }
└─BatchValues { rows: [['2020-01-01 12:00:00':Timestamp]] }
- sql: |
create table t1 ( c1 varchar, c2 int, c3 timestamp);
create table t2 ( d1 int, d2 varchar, d3 timestamp);
create index idx_t1 on t1(c2) include (c1, c2, c3);
select * from t2 inner join hop(t1, c3, interval '10 minute',interval '20 minute') on t2.d1 = t1.c2;
logical_plan: |
LogicalProject { exprs: [t2.d1, t2.d2, t2.d3, t1.c1, t1.c2, t1.c3, window_start, window_end] }
└─LogicalJoin { type: Inner, on: (t2.d1 = t1.c2), output: all }
├─LogicalScan { table: t2, columns: [t2.d1, t2.d2, t2.d3, t2._row_id] }
└─LogicalHopWindow { time_col: t1.c3, slide: 00:10:00, size: 00:20:00, output: all }
└─LogicalFilter { predicate: IsNotNull(t1.c3) }
└─LogicalScan { table: t1, columns: [t1.c1, t1.c2, t1.c3, t1._row_id] }
optimized_logical_plan_for_batch: |
LogicalHopWindow { time_col: t1.c3, slide: 00:10:00, size: 00:20:00, output: all }
└─LogicalFilter { predicate: IsNotNull(t1.c3) }
└─LogicalJoin { type: Inner, on: (t2.d1 = t1.c2), output: all }
├─LogicalScan { table: t2, columns: [t2.d1, t2.d2, t2.d3] }
└─LogicalScan { table: t1, columns: [t1.c1, t1.c2, t1.c3], predicate: IsNotNull(t1.c3) }
batch_plan: |
BatchHopWindow { time_col: idx_t1.c3, slide: 00:10:00, size: 00:20:00, output: all }
└─BatchExchange { order: [], dist: Single }
└─BatchFilter { predicate: IsNotNull(idx_t1.c3) }
└─BatchLookupJoin { type: Inner, predicate: t2.d1 = idx_t1.c2 AND IsNotNull(idx_t1.c3), output: all }
└─BatchExchange { order: [], dist: UpstreamHashShard(t2.d1) }
└─BatchScan { table: t2, columns: [t2.d1, t2.d2, t2.d3], distribution: SomeShard }
- sql: |
create table t1 ( c1 varchar, c2 int, c3 timestamp);
create table t2 ( d1 int, d2 varchar, d3 timestamp);
create index idx_t1 on t1(c2) include (c1, c2, c3);
select * from hop(t2, d3, interval '10 minute',interval '20 minute') inner join hop(t1, c3, interval '10 minute',interval '20 minute') on t2.d1 = t1.c2;
logical_plan: |
LogicalProject { exprs: [t2.d1, t2.d2, t2.d3, window_start, window_end, t1.c1, t1.c2, t1.c3, window_start, window_end] }
└─LogicalJoin { type: Inner, on: (t2.d1 = t1.c2), output: all }
├─LogicalHopWindow { time_col: t2.d3, slide: 00:10:00, size: 00:20:00, output: all }
| └─LogicalFilter { predicate: IsNotNull(t2.d3) }
| └─LogicalScan { table: t2, columns: [t2.d1, t2.d2, t2.d3, t2._row_id] }
└─LogicalHopWindow { time_col: t1.c3, slide: 00:10:00, size: 00:20:00, output: all }
└─LogicalFilter { predicate: IsNotNull(t1.c3) }
└─LogicalScan { table: t1, columns: [t1.c1, t1.c2, t1.c3, t1._row_id] }
optimized_logical_plan_for_batch: |
LogicalHopWindow { time_col: t1.c3, slide: 00:10:00, size: 00:20:00, output: [t2.d1, t2.d2, t2.d3, window_start, window_end, t1.c1, t1.c2, t1.c3, window_start, window_end] }
└─LogicalFilter { predicate: IsNotNull(t1.c3) }
└─LogicalHopWindow { time_col: t2.d3, slide: 00:10:00, size: 00:20:00, output: all }
└─LogicalFilter { predicate: IsNotNull(t2.d3) }
└─LogicalJoin { type: Inner, on: (t2.d1 = t1.c2), output: all }
├─LogicalScan { table: t2, columns: [t2.d1, t2.d2, t2.d3], predicate: IsNotNull(t2.d3) }
└─LogicalScan { table: t1, columns: [t1.c1, t1.c2, t1.c3], predicate: IsNotNull(t1.c3) }
batch_plan: |
BatchHopWindow { time_col: idx_t1.c3, slide: 00:10:00, size: 00:20:00, output: [t2.d1, t2.d2, t2.d3, window_start, window_end, idx_t1.c1, idx_t1.c2, idx_t1.c3, window_start, window_end] }
└─BatchExchange { order: [], dist: Single }
└─BatchFilter { predicate: IsNotNull(idx_t1.c3) }
└─BatchHopWindow { time_col: t2.d3, slide: 00:10:00, size: 00:20:00, output: all }
└─BatchFilter { predicate: IsNotNull(t2.d3) }
└─BatchLookupJoin { type: Inner, predicate: t2.d1 = idx_t1.c2 AND IsNotNull(idx_t1.c3), output: all }
└─BatchExchange { order: [], dist: UpstreamHashShard(t2.d1) }
└─BatchFilter { predicate: IsNotNull(t2.d3) }
└─BatchScan { table: t2, columns: [t2.d1, t2.d2, t2.d3], distribution: SomeShard }
- sql: |
CREATE TABLE auction (id BIGINT, item_name CHARACTER VARYING, description CHARACTER VARYING, initial_bid BIGINT, reserve BIGINT, date_time TIMESTAMP, expires TIMESTAMP, seller BIGINT, category BIGINT, extra CHARACTER VARYING, PRIMARY KEY (id));
CREATE TABLE nation (n_nationkey INT, n_name CHARACTER VARYING, n_regionkey INT, n_comment CHARACTER VARYING, PRIMARY KEY (n_nationkey));
CREATE TABLE alltypes2 (c1 BOOLEAN, c2 SMALLINT, c3 INT, c4 BIGINT, c5 REAL, c6 DOUBLE, c7 NUMERIC, c8 DATE, c9 CHARACTER VARYING, c10 TIME, c11 TIMESTAMP, c13 INTERVAL, c14 STRUCT<a INT>, c15 INT[], c16 CHARACTER VARYING[]);
SELECT ((CASE WHEN (hop_0.c2 = (32)) THEN TIMESTAMP '2022-07-29 15:06:36' WHEN hop_0.c1 THEN t_2.expires WHEN false THEN (t_2.expires + (INTERVAL '-18')) ELSE t_2.expires END) + (INTERVAL '87')) AS col_0, ((INT '618') % ((311))) AS col_1, (SMALLINT '35') AS col_2, ((coalesce(NULL, NULL, NULL, NULL, NULL, (INT '0'), NULL, NULL, NULL, NULL)) - hop_0.c7) AS col_3 FROM hop(alltypes2, alltypes2.c11, INTERVAL '3600', INTERVAL '144000') AS hop_0, nation AS t_1 JOIN auction AS t_2 ON t_1.n_name = t_2.item_name AND true WHERE (t_2.seller > ((CAST(((SMALLINT '62')) IN (hop_0.c2, hop_0.c2, hop_0.c2, (- hop_0.c2), hop_0.c2, hop_0.c2, hop_0.c2, (SMALLINT '-32768')) AS INT) | t_1.n_nationkey) + hop_0.c7)) GROUP BY t_2.description, hop_0.c7, t_2.category, t_2.item_name, hop_0.c15, hop_0.c2, hop_0.c1, hop_0.c9, t_2.expires, hop_0.c6;
logical_plan: |
LogicalProject { exprs: [(Case((alltypes2.c2 = 32:Int32), '2022-07-29 15:06:36':Timestamp, alltypes2.c1, auction.expires, false:Boolean, (auction.expires + '-00:00:18':Interval), auction.expires) + '00:01:27':Interval) as $expr1, (618:Int32 % 311:Int32) as $expr2, 35:Int16, (Coalesce(null:Int32, null:Int32, null:Int32, null:Int32, null:Int32, 0:Int32, null:Int32, null:Int32, null:Int32, null:Int32) - alltypes2.c7) as $expr3] }
└─LogicalAgg { group_key: [auction.description, alltypes2.c7, auction.category, auction.item_name, alltypes2.c15, alltypes2.c2, alltypes2.c1, alltypes2.c9, auction.expires, alltypes2.c6], aggs: [] }
└─LogicalProject { exprs: [auction.description, alltypes2.c7, auction.category, auction.item_name, alltypes2.c15, alltypes2.c2, alltypes2.c1, alltypes2.c9, auction.expires, alltypes2.c6] }
└─LogicalFilter { predicate: (auction.seller > (((((((((In(62:Int16, -32768:Int16) OR (62:Int16 = alltypes2.c2)) OR (62:Int16 = alltypes2.c2)) OR (62:Int16 = alltypes2.c2)) OR (62:Int16 = Neg(alltypes2.c2))) OR (62:Int16 = alltypes2.c2)) OR (62:Int16 = alltypes2.c2)) OR (62:Int16 = alltypes2.c2))::Int32 | nation.n_nationkey) + alltypes2.c7)) }
└─LogicalJoin { type: Inner, on: true, output: all }
├─LogicalHopWindow { time_col: alltypes2.c11, slide: 01:00:00, size: 40:00:00, output: all }
| └─LogicalFilter { predicate: IsNotNull(alltypes2.c11) }
| └─LogicalScan { table: alltypes2, columns: [alltypes2.c1, alltypes2.c2, alltypes2.c3, alltypes2.c4, alltypes2.c5, alltypes2.c6, alltypes2.c7, alltypes2.c8, alltypes2.c9, alltypes2.c10, alltypes2.c11, alltypes2.c13, alltypes2.c14, alltypes2.c15, alltypes2.c16, alltypes2._row_id] }
└─LogicalJoin { type: Inner, on: (nation.n_name = auction.item_name), output: all }
├─LogicalScan { table: nation, columns: [nation.n_nationkey, nation.n_name, nation.n_regionkey, nation.n_comment] }
└─LogicalScan { table: auction, columns: [auction.id, auction.item_name, auction.description, auction.initial_bid, auction.reserve, auction.date_time, auction.expires, auction.seller, auction.category, auction.extra] }
optimized_logical_plan_for_batch: |
LogicalProject { exprs: [(Case((alltypes2.c2 = 32:Int32), '2022-07-29 15:06:36':Timestamp, alltypes2.c1, auction.expires, false:Boolean, (auction.expires + '-00:00:18':Interval), auction.expires) + '00:01:27':Interval) as $expr1, (618:Int32 % 311:Int32) as $expr2, 35:Int16, (Coalesce(null:Int32, null:Int32, null:Int32, null:Int32, null:Int32, 0:Int32, null:Int32, null:Int32, null:Int32, null:Int32) - alltypes2.c7) as $expr3] }
└─LogicalAgg { group_key: [auction.description, alltypes2.c7, auction.category, auction.item_name, alltypes2.c15, alltypes2.c2, alltypes2.c1, alltypes2.c9, auction.expires, alltypes2.c6], aggs: [] }
└─LogicalHopWindow { time_col: alltypes2.c11, slide: 01:00:00, size: 40:00:00, output: [auction.description, alltypes2.c7, auction.category, auction.item_name, alltypes2.c15, alltypes2.c2, alltypes2.c1, alltypes2.c9, auction.expires, alltypes2.c6] }
└─LogicalFilter { predicate: IsNotNull(alltypes2.c11) }
└─LogicalJoin { type: Inner, on: (auction.seller > (((((((((In(62:Int16, -32768:Int16) OR (62:Int16 = alltypes2.c2)) OR (62:Int16 = alltypes2.c2)) OR (62:Int16 = alltypes2.c2)) OR (62:Int16 = Neg(alltypes2.c2))) OR (62:Int16 = alltypes2.c2)) OR (62:Int16 = alltypes2.c2)) OR (62:Int16 = alltypes2.c2))::Int32 | nation.n_nationkey) + alltypes2.c7)), output: all }
├─LogicalJoin { type: Inner, on: (nation.n_name = auction.item_name), output: [nation.n_nationkey, auction.item_name, auction.description, auction.expires, auction.seller, auction.category] }
| ├─LogicalScan { table: nation, columns: [nation.n_nationkey, nation.n_name] }
| └─LogicalScan { table: auction, columns: [auction.item_name, auction.description, auction.expires, auction.seller, auction.category] }
└─LogicalScan { table: alltypes2, columns: [alltypes2.c1, alltypes2.c2, alltypes2.c6, alltypes2.c7, alltypes2.c9, alltypes2.c11, alltypes2.c15], predicate: IsNotNull(alltypes2.c11) }
7 changes: 7 additions & 0 deletions src/frontend/src/expr/input_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ impl InputRef {
self.index = (self.index as isize + offset) as usize;
}

pub fn clone_with_offset(&self, offset: isize) -> Self {
Self {
index: (self.index as isize + offset) as usize,
data_type: self.data_type.clone(),
}
}

/// Convert to protobuf.
pub fn to_proto(&self) -> PbInputRef {
PbInputRef {
Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/optimizer/logical_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,12 @@ lazy_static! {
vec![AlwaysFalseFilterRule::create()],
ApplyOrder::TopDown,
);

static ref PULL_UP_HOP: OptimizationStage = OptimizationStage::new(
"Pull up hop",
vec![PullUpHopRule::create()],
ApplyOrder::BottomUp,
);
}

impl LogicalOptimizer {
Expand Down Expand Up @@ -481,6 +487,8 @@ impl LogicalOptimizer {

plan = plan.optimize_by_rules(&PROJECT_REMOVE);

plan = plan.optimize_by_rules(&PULL_UP_HOP);

plan = plan.optimize_by_rules(&CONVERT_WINDOW_AGG);

if has_logical_over_agg(plan.clone()) {
Expand Down
22 changes: 20 additions & 2 deletions src/frontend/src/optimizer/plan_node/logical_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ pub struct LogicalHopWindow {
}

impl LogicalHopWindow {
/// Hop windows will add `windows_start` and `windows_end` columns at the end.
/// Take care to modify the code referring it if above rule changes.
pub const ADDITION_COLUMN_LEN: usize = 2;

/// just used in optimizer and the function will not check if the `time_col`'s value is NULL
/// compared with `LogicalHopWindow::create`
fn new(
Expand Down Expand Up @@ -122,6 +126,16 @@ impl LogicalHopWindow {
self.core.internal_window_end_col_idx()
}

pub fn output_window_start_col_idx(&self) -> Option<usize> {
self.internal2output_col_mapping()
.try_map(self.internal_window_start_col_idx())
}

pub fn output_window_end_col_idx(&self) -> Option<usize> {
self.internal2output_col_mapping()
.try_map(self.internal_window_end_col_idx())
}

pub fn o2i_col_mapping(&self) -> ColIndexMapping {
self.core.o2i_col_mapping()
}
Expand All @@ -134,11 +148,15 @@ impl LogicalHopWindow {
self.core.internal_column_num()
}

fn output2internal_col_mapping(&self) -> ColIndexMapping {
pub fn output2internal_col_mapping(&self) -> ColIndexMapping {
self.core.output2internal_col_mapping()
}

fn clone_with_output_indices(&self, output_indices: Vec<usize>) -> Self {
pub fn internal2output_col_mapping(&self) -> ColIndexMapping {
self.core.internal2output_col_mapping()
}

pub fn clone_with_output_indices(&self, output_indices: Vec<usize>) -> Self {
Self::new(
self.input(),
self.core.time_col.clone(),
Expand Down
22 changes: 22 additions & 0 deletions src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,28 @@ impl LogicalJoin {
&self.core.on
}

/// Collect all input ref in the on condition. And separate them into left and right.
pub fn input_idx_on_condition(&self) -> (Vec<usize>, Vec<usize>) {
let input_refs = self
.core
.on
.collect_input_refs(self.core.left.schema().len() + self.core.right.schema().len());
let index_group = input_refs
.ones()
.group_by(|i| *i < self.core.left.schema().len());
let left_index = index_group
.into_iter()
.next()
.map_or(vec![], |group| group.1.collect_vec());
let right_index = index_group.into_iter().next().map_or(vec![], |group| {
group
.1
.map(|i| i - self.core.left.schema().len())
.collect_vec()
});
(left_index, right_index)
}

/// Get the join type of the logical join.
pub fn join_type(&self) -> JoinType {
self.core.join_type
Expand Down
Loading

0 comments on commit db072cb

Please sign in to comment.