Skip to content

Commit

Permalink
feat(optimizer): filter elim (risingwavelabs#1430)
Browse files Browse the repository at this point in the history
* add filter if need

* filter only if need

* fix

* fix

* rename

* clippy fix
  • Loading branch information
st1page authored Mar 30, 2022
1 parent 069b40d commit 43dc7b3
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 34 deletions.
14 changes: 11 additions & 3 deletions rust/frontend/src/optimizer/plan_node/logical_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::fmt;

use fixedbitset::FixedBitSet;
use risingwave_common::error::Result;

use super::{
ColPrunable, CollectInputRef, LogicalProject, PlanBase, PlanNode, PlanRef, PlanTreeNodeUnary,
Expand Down Expand Up @@ -52,10 +51,19 @@ impl LogicalFilter {
}
}

/// Create a `LogicalFilter` unless the predicate is always true
pub fn create(input: PlanRef, predicate: Condition) -> PlanRef {
if predicate.always_true() {
input
} else {
LogicalFilter::new(input, predicate).into()
}
}

/// the function will check if the predicate is bool expression
pub fn create(input: PlanRef, predicate: ExprImpl) -> Result<PlanRef> {
pub fn create_with_expr(input: PlanRef, predicate: ExprImpl) -> PlanRef {
let predicate = Condition::with_expr(predicate);
Ok(Self::new(input, predicate).into())
Self::new(input, predicate).into()
}

/// Get the predicate of the logical join.
Expand Down
11 changes: 4 additions & 7 deletions rust/frontend/src/optimizer/rule/filter_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,10 @@ impl Rule for FilterAggRule {
let pushed_predicate = pushed_predicate.rewrite_expr(&mut subst);

let input = agg.input();
let pushed_filter = LogicalFilter::new(input, pushed_predicate);
let new_agg = agg.clone_with_input(pushed_filter.into()).into();
if agg_call_pred.always_true() {
Some(new_agg)
} else {
Some(LogicalFilter::new(new_agg, agg_call_pred).into())
}
let pushed_filter = LogicalFilter::create(input, pushed_predicate);
let new_agg = agg.clone_with_input(pushed_filter).into();

Some(LogicalFilter::create(new_agg, agg_call_pred))
}
}

Expand Down
10 changes: 3 additions & 7 deletions rust/frontend/src/optimizer/rule/filter_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,18 @@ impl Rule for FilterJoinRule {
let right_predicate = right_from_filter.and_then(|c1| right_from_on.map(|c2| c1.and(c2)));

let new_left: PlanRef = if let Some(predicate) = left_predicate {
LogicalFilter::new(join.left(), predicate).into()
LogicalFilter::create(join.left(), predicate)
} else {
join.left()
};
let new_right: PlanRef = if let Some(predicate) = right_predicate {
LogicalFilter::new(join.right(), predicate).into()
LogicalFilter::create(join.right(), predicate)
} else {
join.right()
};
let new_join = LogicalJoin::new(new_left, new_right, join_type, new_on);

if new_filter_predicate.always_true() {
Some(new_join.into())
} else {
Some(LogicalFilter::new(new_join.into(), new_filter_predicate).into())
}
Some(LogicalFilter::create(new_join.into(), new_filter_predicate))
}
}

Expand Down
4 changes: 2 additions & 2 deletions rust/frontend/src/optimizer/rule/filter_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ impl Rule for FilterProjectRule {
let predicate = filter.predicate().clone().rewrite_expr(&mut subst);

let input = project.input();
let pushed_filter = LogicalFilter::new(input, predicate);
Some(project.clone_with_input(pushed_filter.into()).into())
let pushed_filter = LogicalFilter::create(input, predicate);
Some(project.clone_with_input(pushed_filter).into())
}
}

Expand Down
2 changes: 1 addition & 1 deletion rust/frontend/src/planner/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl Planner {
let source_id = delete.table_source.source_id;
let scan = self.plan_base_table(delete.table)?;
let input = if let Some(expr) = delete.selection {
LogicalFilter::create(scan, expr)?
LogicalFilter::create_with_expr(scan, expr)
} else {
scan
};
Expand Down
7 changes: 3 additions & 4 deletions rust/frontend/src/planner/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl Planner {
/// [`LogicalJoin`] using [`substitute_subqueries`].
fn plan_where(&mut self, mut input: PlanRef, where_clause: ExprImpl) -> Result<PlanRef> {
if !where_clause.has_subquery() {
return LogicalFilter::create(input, where_clause);
return Ok(LogicalFilter::create_with_expr(input, where_clause));
}

let (subquery_conjunctions, not_subquery_conjunctions, others) =
Expand Down Expand Up @@ -154,13 +154,12 @@ impl Planner {
Ok(input)
} else {
let (input, others) = self.substitute_subqueries(input, others.conjunctions)?;
Ok(LogicalFilter::new(
Ok(LogicalFilter::create(
input,
Condition {
conjunctions: others,
},
)
.into())
))
}
}

Expand Down
3 changes: 1 addition & 2 deletions rust/frontend/test_runner/tests/testdata/column_pruning.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@
LogicalProject { exprs: [$0, $1], expr_alias: [ , ] }
LogicalFilter { predicate: ($2 < 1:Int32) }
LogicalScan { table: t1, columns: [v1, v2, v3] }
LogicalFilter { predicate: }
LogicalScan { table: t2, columns: [v1, v2] }
LogicalScan { table: t2, columns: [v1, v2] }
- sql: |
/* mixed */
create table t (v1 bigint, v2 double precision, v3 int);
Expand Down
12 changes: 4 additions & 8 deletions rust/frontend/test_runner/tests/testdata/tpch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,9 @@
BatchFilter { predicate: ($1 = "FURNITURE":Varchar) AND true:Boolean AND true:Boolean }
BatchScan { table: customer, columns: [c_custkey, c_mktsegment] }
BatchExchange { order: [], dist: HashShard([1]) }
BatchFilter { predicate: }
BatchScan { table: orders, columns: [o_orderkey, o_custkey, o_orderdate, o_shippriority] }
BatchScan { table: orders, columns: [o_orderkey, o_custkey, o_orderdate, o_shippriority] }
BatchExchange { order: [], dist: HashShard([0]) }
BatchFilter { predicate: }
BatchScan { table: lineitem, columns: [l_orderkey, l_extendedprice, l_discount] }
BatchScan { table: lineitem, columns: [l_orderkey, l_extendedprice, l_discount] }
stream_plan: |
StreamMaterialize { columns: [l_orderkey, revenue, o_orderdate, o_shippriority], pk_columns: [revenue, o_orderdate, l_orderkey, o_shippriority] }
StreamProject { exprs: [$0, $4, $1, $2], expr_alias: [l_orderkey, revenue, o_orderdate, o_shippriority] }
Expand All @@ -145,11 +143,9 @@
StreamFilter { predicate: ($1 = "FURNITURE":Varchar) AND true:Boolean AND true:Boolean }
StreamTableScan { table: customer, columns: [c_custkey, c_mktsegment, _row_id#0], pk_indices: [2] }
StreamExchange { dist: HashShard([1]) }
StreamFilter { predicate: }
StreamTableScan { table: orders, columns: [o_orderkey, o_custkey, o_orderdate, o_shippriority, _row_id#0], pk_indices: [4] }
StreamTableScan { table: orders, columns: [o_orderkey, o_custkey, o_orderdate, o_shippriority, _row_id#0], pk_indices: [4] }
StreamExchange { dist: HashShard([0]) }
StreamFilter { predicate: }
StreamTableScan { table: lineitem, columns: [l_orderkey, l_extendedprice, l_discount, _row_id#0], pk_indices: [3] }
StreamTableScan { table: lineitem, columns: [l_orderkey, l_extendedprice, l_discount, _row_id#0], pk_indices: [3] }
- id: tpch_q6
before:
- create_tables
Expand Down

0 comments on commit 43dc7b3

Please sign in to comment.