Skip to content

Commit

Permalink
feat(optimizer): support functional index selection (risingwavelabs#9067
Browse files Browse the repository at this point in the history
)
  • Loading branch information
chenzl25 authored Apr 10, 2023
1 parent a63807d commit 046f1d2
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 34 deletions.
20 changes: 20 additions & 0 deletions e2e_test/batch/basic/index.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,23 @@ NULL 5

statement ok
drop table t1;

# create functional indexes
statement ok

create table t (v1 varchar, v2 varchar);

statement ok
insert into t values ('Hello', 'World');

statement ok
create index idx on t(lower(v1));

# functional indexes selection
query II
select * from t where lower(v1) = 'hello';
----
Hello World

statement ok
drop table t;
38 changes: 31 additions & 7 deletions src/frontend/planner_test/tests/testdata/index_selection.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -672,10 +672,34 @@
└─BatchExchange { order: [], dist: UpstreamHashShard(t2.d1) }
└─BatchScan { table: t2, columns: [t2.d1, t2.d2], distribution: SomeShard }
- sql: |
create table t (j jsonb);
explain create index idx on t(j->'k');
explain_output: |
StreamMaterialize { columns: [JSONB_ACCESS_INNER, j, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [JSONB_ACCESS_INNER, t._row_id], pk_conflict: "NoCheck" }
└─StreamExchange { dist: HashShard($expr1) }
└─StreamProject { exprs: [JsonbAccessInner(t.j, 'k':Varchar) as $expr1, t.j, t._row_id] }
└─StreamTableScan { table: t, columns: [j, _row_id] }
create table t (j jsonb, v1 int, v2 int);
create index idx1 on t(j->>'k1');
select * from t where j->>'k1' = 'abc';
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchScan { table: idx1, columns: [idx1.j, idx1.v1, idx1.v2], scan_ranges: [idx1.JSONB_ACCESS_STR = Utf8("abc")], distribution: SomeShard }
- sql: |
create table t (j jsonb, v1 int, v2 int);
create index idx1 on t(j->>'k1') include(v1);
select * from t where j->>'k1' = 'abc';
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchLookupJoin { type: Inner, predicate: idx1.t._row_id IS NOT DISTINCT FROM t._row_id, output: [t.j, t.v1, t.v2] }
└─BatchExchange { order: [], dist: UpstreamHashShard(idx1.t._row_id) }
└─BatchScan { table: idx1, columns: [idx1.t._row_id], scan_ranges: [idx1.JSONB_ACCESS_STR = Utf8("abc")], distribution: SomeShard }
- sql: |
create table t (j jsonb, v1 int, v2 int);
create index idx1 on t(j->>'k1') include(v1);
create index idx2 on t(j->>'k2') include(v2);
select * from t where j->>'k1' = 'abc' or j->>'k2' = 'ABC';
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchLookupJoin { type: Inner, predicate: idx1.t._row_id IS NOT DISTINCT FROM t._row_id AND ((JsonbAccessStr(t.j, 'k1':Varchar) = 'abc':Varchar) OR (JsonbAccessStr(t.j, 'k2':Varchar) = 'ABC':Varchar)), output: [t.j, t.v1, t.v2] }
└─BatchExchange { order: [], dist: UpstreamHashShard(idx1.t._row_id) }
└─BatchHashAgg { group_key: [idx1.t._row_id], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(idx1.t._row_id) }
└─BatchUnion { all: true }
├─BatchExchange { order: [], dist: Single }
| └─BatchScan { table: idx1, columns: [idx1.t._row_id], scan_ranges: [idx1.JSONB_ACCESS_STR = Utf8("abc")], distribution: SomeShard }
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: idx2, columns: [idx2.t._row_id], scan_ranges: [idx2.JSONB_ACCESS_STR = Utf8("ABC")], distribution: SomeShard }
33 changes: 30 additions & 3 deletions src/frontend/src/catalog/index_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::ops::Deref;
use std::sync::Arc;

use derivative::Derivative;
use itertools::Itertools;
use risingwave_common::catalog::IndexId;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::PbIndex;

use super::ColumnId;
use crate::catalog::{DatabaseId, RelationCatalog, SchemaId, TableCatalog};
use crate::expr::{Expr, ExprImpl};
use crate::expr::{Expr, ExprImpl, FunctionCall};
use crate::user::UserId;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[derive(Clone, Debug, Derivative)]
#[derivative(PartialEq, Eq, Hash)]
pub struct IndexCatalog {
pub id: IndexId,

Expand All @@ -45,6 +48,15 @@ pub struct IndexCatalog {

pub secondary_to_primary_mapping: BTreeMap<usize, usize>,

/// Map function call from the primary table to the index table.
/// Use `HashMap` instead of `BTreeMap`, because `FunctionCall` can't be used as the key for
/// `BTreeMap`. BTW, the trait `std::hash::Hash` is not implemented for
/// `HashMap<function_call::FunctionCall, usize>`, so we need to ignore it. It will not
/// affect the correctness, since it can be derived by `index_item`.
#[derivative(PartialEq = "ignore")]
#[derivative(Hash = "ignore")]
pub function_mapping: HashMap<FunctionCall, usize>,

pub original_columns: Vec<ColumnId>,
}

Expand Down Expand Up @@ -78,6 +90,16 @@ impl IndexCatalog {
.map(|(x, y)| (y, x)),
);

let function_mapping: HashMap<FunctionCall, usize> = index_item
.iter()
.enumerate()
.filter_map(|(i, expr)| match expr {
ExprImpl::InputRef(_) => None,
ExprImpl::FunctionCall(func) => Some((func.deref().clone(), i)),
_ => unreachable!(),
})
.collect();

let original_columns = index_prost
.original_columns
.clone()
Expand All @@ -93,6 +115,7 @@ impl IndexCatalog {
primary_table: Arc::new(primary_table.clone()),
primary_to_secondary_mapping,
secondary_to_primary_mapping,
function_mapping,
original_columns,
}
}
Expand Down Expand Up @@ -133,6 +156,10 @@ impl IndexCatalog {
&self.primary_to_secondary_mapping
}

pub fn function_mapping(&self) -> &HashMap<FunctionCall, usize> {
&self.function_mapping
}

pub fn to_prost(&self, schema_id: SchemaId, database_id: DatabaseId) -> PbIndex {
PbIndex {
id: self.id.index_id,
Expand Down
20 changes: 19 additions & 1 deletion src/frontend/src/optimizer/plan_node/logical_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use super::{
};
use crate::catalog::{ColumnId, IndexCatalog};
use crate::expr::{
CollectInputRef, CorrelatedInputRef, Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef,
CollectInputRef, CorrelatedInputRef, Expr, ExprImpl, ExprRewriter, ExprVisitor, FunctionCall,
InputRef,
};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::plan_node::{
Expand Down Expand Up @@ -314,6 +315,7 @@ impl LogicalScan {
&index.name,
index.index_table.table_desc().into(),
p2s_mapping,
index.function_mapping(),
);
Some(index_scan)
} else {
Expand All @@ -328,6 +330,7 @@ impl LogicalScan {
index_name: &str,
index_table_desc: Rc<TableDesc>,
primary_to_secondary_mapping: &BTreeMap<usize, usize>,
function_mapping: &HashMap<FunctionCall, usize>,
) -> LogicalScan {
let new_output_col_idx = self
.output_col_idx()
Expand All @@ -337,6 +340,7 @@ impl LogicalScan {

struct Rewriter<'a> {
primary_to_secondary_mapping: &'a BTreeMap<usize, usize>,
function_mapping: &'a HashMap<FunctionCall, usize>,
}
impl ExprRewriter for Rewriter<'_> {
fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
Expand All @@ -349,9 +353,23 @@ impl LogicalScan {
)
.into()
}

fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl {
if let Some(index) = self.function_mapping.get(&func_call) {
return InputRef::new(*index, func_call.return_type()).into();
}

let (func_type, inputs, ret) = func_call.decompose();
let inputs = inputs
.into_iter()
.map(|expr| self.rewrite_expr(expr))
.collect();
FunctionCall::new_unchecked(func_type, inputs, ret).into()
}
}
let mut rewriter = Rewriter {
primary_to_secondary_mapping,
function_mapping,
};

let new_predicate = self.predicate().clone().rewrite_expr(&mut rewriter);
Expand Down
14 changes: 9 additions & 5 deletions src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::fmt;
use std::rc::Rc;

Expand All @@ -23,7 +23,7 @@ use risingwave_pb::stream_plan::{ChainType, PbStreamNode};

use super::{ExprRewritable, LogicalScan, PlanBase, PlanNodeId, PlanRef, StreamNode};
use crate::catalog::ColumnId;
use crate::expr::ExprRewriter;
use crate::expr::{ExprRewriter, FunctionCall};
use crate::optimizer::plan_node::utils::IndicesDisplay;
use crate::optimizer::property::{Distribution, DistributionDisplay};
use crate::stream_fragmenter::BuildFragmentGraphState;
Expand Down Expand Up @@ -95,11 +95,15 @@ impl StreamTableScan {
index_name: &str,
index_table_desc: Rc<TableDesc>,
primary_to_secondary_mapping: &BTreeMap<usize, usize>,
function_mapping: &HashMap<FunctionCall, usize>,
chain_type: ChainType,
) -> StreamTableScan {
let logical_index_scan =
self.logical
.to_index_scan(index_name, index_table_desc, primary_to_secondary_mapping);
let logical_index_scan = self.logical.to_index_scan(
index_name,
index_table_desc,
primary_to_secondary_mapping,
function_mapping,
);
logical_index_scan
.distribution_key()
.expect("distribution key of stream chain must exist in output columns");
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/rule/index_delta_join_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl Rule for IndexDeltaJoinRule {
index.index_table.name.as_str(),
index.index_table.table_desc().into(),
p2s_mapping,
index.function_mapping(),
chain_type,
)
.into(),
Expand Down
66 changes: 48 additions & 18 deletions src/frontend/src/optimizer/rule/index_selection_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,30 @@ impl Rule for IndexSelectionRule {

struct IndexPredicateRewriter<'a> {
p2s_mapping: &'a BTreeMap<usize, usize>,
function_mapping: &'a HashMap<FunctionCall, usize>,
offset: usize,
covered_by_index: bool,
}

impl<'a> IndexPredicateRewriter<'a> {
fn new(
p2s_mapping: &'a BTreeMap<usize, usize>,
function_mapping: &'a HashMap<FunctionCall, usize>,
offset: usize,
) -> Self {
Self {
p2s_mapping,
function_mapping,
offset,
covered_by_index: true,
}
}

fn covered_by_index(&self) -> bool {
self.covered_by_index
}
}

impl ExprRewriter for IndexPredicateRewriter<'_> {
fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
// transform primary predicate to index predicate if it can
Expand All @@ -155,9 +177,23 @@ impl ExprRewriter for IndexPredicateRewriter<'_> {
)
.into()
} else {
self.covered_by_index = false;
InputRef::new(input_ref.index() + self.offset, input_ref.return_type()).into()
}
}

fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl {
if let Some(index) = self.function_mapping.get(&func_call) {
return InputRef::new(*index, func_call.return_type()).into();
}

let (func_type, inputs, ret) = func_call.decompose();
let inputs = inputs
.into_iter()
.map(|expr| self.rewrite_expr(expr))
.collect();
FunctionCall::new_unchecked(func_type, inputs, ret).into()
}
}

impl IndexSelectionRule {
Expand All @@ -171,10 +207,11 @@ impl IndexSelectionRule {
// index_scan primary_table_scan
let predicate = logical_scan.predicate().clone();
let offset = index.index_item.len();
let mut rewriter = IndexPredicateRewriter {
p2s_mapping: index.primary_to_secondary_mapping(),
let mut rewriter = IndexPredicateRewriter::new(
index.primary_to_secondary_mapping(),
index.function_mapping(),
offset,
};
);
let new_predicate = predicate.rewrite_expr(&mut rewriter);

let index_scan = LogicalScan::create(
Expand Down Expand Up @@ -535,25 +572,18 @@ impl IndexSelectionRule {
predicate: Condition,
ctx: OptimizerContextRef,
) -> Option<PlanRef> {
// check condition is covered by index.
let mut input_ref_finder = ExprInputRefFinder::default();
predicate.visit_expr(&mut input_ref_finder);
let mut rewriter = IndexPredicateRewriter::new(
index.primary_to_secondary_mapping(),
index.function_mapping(),
0,
);
let new_predicate = predicate.rewrite_expr(&mut rewriter);

let p2s_mapping = index.primary_to_secondary_mapping();
if !input_ref_finder
.input_ref_index_set
.iter()
.all(|x| p2s_mapping.contains_key(x))
{
// check condition is covered by index.
if !rewriter.covered_by_index() {
return None;
}

let mut rewriter = IndexPredicateRewriter {
p2s_mapping,
offset: 0,
};
let new_predicate = predicate.rewrite_expr(&mut rewriter);

Some(
LogicalScan::new(
index.index_table.name.to_string(),
Expand Down

0 comments on commit 046f1d2

Please sign in to comment.