From 046f1d2e107c6022f89f538938e25519c572fa1b Mon Sep 17 00:00:00 2001 From: Dylan Date: Mon, 10 Apr 2023 15:08:50 +0800 Subject: [PATCH] feat(optimizer): support functional index selection (#9067) --- e2e_test/batch/basic/index.slt.part | 20 ++++++ .../tests/testdata/index_selection.yaml | 38 +++++++++-- src/frontend/src/catalog/index_catalog.rs | 33 +++++++++- .../src/optimizer/plan_node/logical_scan.rs | 20 +++++- .../optimizer/plan_node/stream_table_scan.rs | 14 ++-- .../optimizer/rule/index_delta_join_rule.rs | 1 + .../optimizer/rule/index_selection_rule.rs | 66 ++++++++++++++----- 7 files changed, 158 insertions(+), 34 deletions(-) diff --git a/e2e_test/batch/basic/index.slt.part b/e2e_test/batch/basic/index.slt.part index a45cd724bb451..d4ec2dceb8aa5 100644 --- a/e2e_test/batch/basic/index.slt.part +++ b/e2e_test/batch/basic/index.slt.part @@ -90,3 +90,23 @@ NULL 5 statement ok drop table t1; + +# create functional indexes +statement ok + +create table t (v1 varchar, v2 varchar); + +statement ok +insert into t values ('Hello', 'World'); + +statement ok +create index idx on t(lower(v1)); + +# functional indexes selection +query II +select * from t where lower(v1) = 'hello'; +---- +Hello World + +statement ok +drop table t; \ No newline at end of file diff --git a/src/frontend/planner_test/tests/testdata/index_selection.yaml b/src/frontend/planner_test/tests/testdata/index_selection.yaml index af3ab8c551893..f79e229c88796 100644 --- a/src/frontend/planner_test/tests/testdata/index_selection.yaml +++ b/src/frontend/planner_test/tests/testdata/index_selection.yaml @@ -672,10 +672,34 @@ └─BatchExchange { order: [], dist: UpstreamHashShard(t2.d1) } └─BatchScan { table: t2, columns: [t2.d1, t2.d2], distribution: SomeShard } - sql: | - create table t (j jsonb); - explain create index idx on t(j->'k'); - explain_output: | - StreamMaterialize { columns: [JSONB_ACCESS_INNER, j, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [JSONB_ACCESS_INNER, t._row_id], pk_conflict: "NoCheck" } - └─StreamExchange { dist: HashShard($expr1) } - └─StreamProject { exprs: [JsonbAccessInner(t.j, 'k':Varchar) as $expr1, t.j, t._row_id] } - └─StreamTableScan { table: t, columns: [j, _row_id] } + create table t (j jsonb, v1 int, v2 int); + create index idx1 on t(j->>'k1'); + select * from t where j->>'k1' = 'abc'; + batch_plan: | + BatchExchange { order: [], dist: Single } + └─BatchScan { table: idx1, columns: [idx1.j, idx1.v1, idx1.v2], scan_ranges: [idx1.JSONB_ACCESS_STR = Utf8("abc")], distribution: SomeShard } +- sql: | + create table t (j jsonb, v1 int, v2 int); + create index idx1 on t(j->>'k1') include(v1); + select * from t where j->>'k1' = 'abc'; + batch_plan: | + BatchExchange { order: [], dist: Single } + └─BatchLookupJoin { type: Inner, predicate: idx1.t._row_id IS NOT DISTINCT FROM t._row_id, output: [t.j, t.v1, t.v2] } + └─BatchExchange { order: [], dist: UpstreamHashShard(idx1.t._row_id) } + └─BatchScan { table: idx1, columns: [idx1.t._row_id], scan_ranges: [idx1.JSONB_ACCESS_STR = Utf8("abc")], distribution: SomeShard } +- sql: | + create table t (j jsonb, v1 int, v2 int); + create index idx1 on t(j->>'k1') include(v1); + create index idx2 on t(j->>'k2') include(v2); + select * from t where j->>'k1' = 'abc' or j->>'k2' = 'ABC'; + batch_plan: | + BatchExchange { order: [], dist: Single } + └─BatchLookupJoin { type: Inner, predicate: idx1.t._row_id IS NOT DISTINCT FROM t._row_id AND ((JsonbAccessStr(t.j, 'k1':Varchar) = 'abc':Varchar) OR (JsonbAccessStr(t.j, 'k2':Varchar) = 'ABC':Varchar)), output: [t.j, t.v1, t.v2] } + └─BatchExchange { order: [], dist: UpstreamHashShard(idx1.t._row_id) } + └─BatchHashAgg { group_key: [idx1.t._row_id], aggs: [] } + └─BatchExchange { order: [], dist: HashShard(idx1.t._row_id) } + └─BatchUnion { all: true } + ├─BatchExchange { order: [], dist: Single } + | └─BatchScan { table: idx1, columns: [idx1.t._row_id], scan_ranges: [idx1.JSONB_ACCESS_STR = Utf8("abc")], distribution: SomeShard } + └─BatchExchange { order: [], dist: Single } + └─BatchScan { table: idx2, columns: [idx2.t._row_id], scan_ranges: [idx2.JSONB_ACCESS_STR = Utf8("ABC")], distribution: SomeShard } diff --git a/src/frontend/src/catalog/index_catalog.rs b/src/frontend/src/catalog/index_catalog.rs index 5412396d962ec..69ea91e671a60 100644 --- a/src/frontend/src/catalog/index_catalog.rs +++ b/src/frontend/src/catalog/index_catalog.rs @@ -12,9 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; +use std::ops::Deref; use std::sync::Arc; +use derivative::Derivative; use itertools::Itertools; use risingwave_common::catalog::IndexId; use risingwave_common::util::sort_util::ColumnOrder; @@ -22,10 +24,11 @@ use risingwave_pb::catalog::PbIndex; use super::ColumnId; use crate::catalog::{DatabaseId, RelationCatalog, SchemaId, TableCatalog}; -use crate::expr::{Expr, ExprImpl}; +use crate::expr::{Expr, ExprImpl, FunctionCall}; use crate::user::UserId; -#[derive(Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Clone, Debug, Derivative)] +#[derivative(PartialEq, Eq, Hash)] pub struct IndexCatalog { pub id: IndexId, @@ -45,6 +48,15 @@ pub struct IndexCatalog { pub secondary_to_primary_mapping: BTreeMap, + /// Map function call from the primary table to the index table. + /// Use `HashMap` instead of `BTreeMap`, because `FunctionCall` can't be used as the key for + /// `BTreeMap`. BTW, the trait `std::hash::Hash` is not implemented for + /// `HashMap`, so we need to ignore it. It will not + /// affect the correctness, since it can be derived by `index_item`. + #[derivative(PartialEq = "ignore")] + #[derivative(Hash = "ignore")] + pub function_mapping: HashMap, + pub original_columns: Vec, } @@ -78,6 +90,16 @@ impl IndexCatalog { .map(|(x, y)| (y, x)), ); + let function_mapping: HashMap = index_item + .iter() + .enumerate() + .filter_map(|(i, expr)| match expr { + ExprImpl::InputRef(_) => None, + ExprImpl::FunctionCall(func) => Some((func.deref().clone(), i)), + _ => unreachable!(), + }) + .collect(); + let original_columns = index_prost .original_columns .clone() @@ -93,6 +115,7 @@ impl IndexCatalog { primary_table: Arc::new(primary_table.clone()), primary_to_secondary_mapping, secondary_to_primary_mapping, + function_mapping, original_columns, } } @@ -133,6 +156,10 @@ impl IndexCatalog { &self.primary_to_secondary_mapping } + pub fn function_mapping(&self) -> &HashMap { + &self.function_mapping + } + pub fn to_prost(&self, schema_id: SchemaId, database_id: DatabaseId) -> PbIndex { PbIndex { id: self.id.index_id, diff --git a/src/frontend/src/optimizer/plan_node/logical_scan.rs b/src/frontend/src/optimizer/plan_node/logical_scan.rs index 52798ece2bed5..ce9f5c12eb6d5 100644 --- a/src/frontend/src/optimizer/plan_node/logical_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_scan.rs @@ -29,7 +29,8 @@ use super::{ }; use crate::catalog::{ColumnId, IndexCatalog}; use crate::expr::{ - CollectInputRef, CorrelatedInputRef, Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef, + CollectInputRef, CorrelatedInputRef, Expr, ExprImpl, ExprRewriter, ExprVisitor, FunctionCall, + InputRef, }; use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::plan_node::{ @@ -314,6 +315,7 @@ impl LogicalScan { &index.name, index.index_table.table_desc().into(), p2s_mapping, + index.function_mapping(), ); Some(index_scan) } else { @@ -328,6 +330,7 @@ impl LogicalScan { index_name: &str, index_table_desc: Rc, primary_to_secondary_mapping: &BTreeMap, + function_mapping: &HashMap, ) -> LogicalScan { let new_output_col_idx = self .output_col_idx() @@ -337,6 +340,7 @@ impl LogicalScan { struct Rewriter<'a> { primary_to_secondary_mapping: &'a BTreeMap, + function_mapping: &'a HashMap, } impl ExprRewriter for Rewriter<'_> { fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl { @@ -349,9 +353,23 @@ impl LogicalScan { ) .into() } + + fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl { + if let Some(index) = self.function_mapping.get(&func_call) { + return InputRef::new(*index, func_call.return_type()).into(); + } + + let (func_type, inputs, ret) = func_call.decompose(); + let inputs = inputs + .into_iter() + .map(|expr| self.rewrite_expr(expr)) + .collect(); + FunctionCall::new_unchecked(func_type, inputs, ret).into() + } } let mut rewriter = Rewriter { primary_to_secondary_mapping, + function_mapping, }; let new_predicate = self.predicate().clone().rewrite_expr(&mut rewriter); diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 303e87c7fef64..710096c958bdf 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::fmt; use std::rc::Rc; @@ -23,7 +23,7 @@ use risingwave_pb::stream_plan::{ChainType, PbStreamNode}; use super::{ExprRewritable, LogicalScan, PlanBase, PlanNodeId, PlanRef, StreamNode}; use crate::catalog::ColumnId; -use crate::expr::ExprRewriter; +use crate::expr::{ExprRewriter, FunctionCall}; use crate::optimizer::plan_node::utils::IndicesDisplay; use crate::optimizer::property::{Distribution, DistributionDisplay}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -95,11 +95,15 @@ impl StreamTableScan { index_name: &str, index_table_desc: Rc, primary_to_secondary_mapping: &BTreeMap, + function_mapping: &HashMap, chain_type: ChainType, ) -> StreamTableScan { - let logical_index_scan = - self.logical - .to_index_scan(index_name, index_table_desc, primary_to_secondary_mapping); + let logical_index_scan = self.logical.to_index_scan( + index_name, + index_table_desc, + primary_to_secondary_mapping, + function_mapping, + ); logical_index_scan .distribution_key() .expect("distribution key of stream chain must exist in output columns"); diff --git a/src/frontend/src/optimizer/rule/index_delta_join_rule.rs b/src/frontend/src/optimizer/rule/index_delta_join_rule.rs index 1ca0e5c34e603..89baf9dc000cc 100644 --- a/src/frontend/src/optimizer/rule/index_delta_join_rule.rs +++ b/src/frontend/src/optimizer/rule/index_delta_join_rule.rs @@ -95,6 +95,7 @@ impl Rule for IndexDeltaJoinRule { index.index_table.name.as_str(), index.index_table.table_desc().into(), p2s_mapping, + index.function_mapping(), chain_type, ) .into(), diff --git a/src/frontend/src/optimizer/rule/index_selection_rule.rs b/src/frontend/src/optimizer/rule/index_selection_rule.rs index 86abc9c24766d..7fa0ae97eea04 100644 --- a/src/frontend/src/optimizer/rule/index_selection_rule.rs +++ b/src/frontend/src/optimizer/rule/index_selection_rule.rs @@ -143,8 +143,30 @@ impl Rule for IndexSelectionRule { struct IndexPredicateRewriter<'a> { p2s_mapping: &'a BTreeMap, + function_mapping: &'a HashMap, offset: usize, + covered_by_index: bool, } + +impl<'a> IndexPredicateRewriter<'a> { + fn new( + p2s_mapping: &'a BTreeMap, + function_mapping: &'a HashMap, + offset: usize, + ) -> Self { + Self { + p2s_mapping, + function_mapping, + offset, + covered_by_index: true, + } + } + + fn covered_by_index(&self) -> bool { + self.covered_by_index + } +} + impl ExprRewriter for IndexPredicateRewriter<'_> { fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl { // transform primary predicate to index predicate if it can @@ -155,9 +177,23 @@ impl ExprRewriter for IndexPredicateRewriter<'_> { ) .into() } else { + self.covered_by_index = false; InputRef::new(input_ref.index() + self.offset, input_ref.return_type()).into() } } + + fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl { + if let Some(index) = self.function_mapping.get(&func_call) { + return InputRef::new(*index, func_call.return_type()).into(); + } + + let (func_type, inputs, ret) = func_call.decompose(); + let inputs = inputs + .into_iter() + .map(|expr| self.rewrite_expr(expr)) + .collect(); + FunctionCall::new_unchecked(func_type, inputs, ret).into() + } } impl IndexSelectionRule { @@ -171,10 +207,11 @@ impl IndexSelectionRule { // index_scan primary_table_scan let predicate = logical_scan.predicate().clone(); let offset = index.index_item.len(); - let mut rewriter = IndexPredicateRewriter { - p2s_mapping: index.primary_to_secondary_mapping(), + let mut rewriter = IndexPredicateRewriter::new( + index.primary_to_secondary_mapping(), + index.function_mapping(), offset, - }; + ); let new_predicate = predicate.rewrite_expr(&mut rewriter); let index_scan = LogicalScan::create( @@ -535,25 +572,18 @@ impl IndexSelectionRule { predicate: Condition, ctx: OptimizerContextRef, ) -> Option { - // check condition is covered by index. - let mut input_ref_finder = ExprInputRefFinder::default(); - predicate.visit_expr(&mut input_ref_finder); + let mut rewriter = IndexPredicateRewriter::new( + index.primary_to_secondary_mapping(), + index.function_mapping(), + 0, + ); + let new_predicate = predicate.rewrite_expr(&mut rewriter); - let p2s_mapping = index.primary_to_secondary_mapping(); - if !input_ref_finder - .input_ref_index_set - .iter() - .all(|x| p2s_mapping.contains_key(x)) - { + // check condition is covered by index. + if !rewriter.covered_by_index() { return None; } - let mut rewriter = IndexPredicateRewriter { - p2s_mapping, - offset: 0, - }; - let new_predicate = predicate.rewrite_expr(&mut rewriter); - Some( LogicalScan::new( index.index_table.name.to_string(),