Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(optimizer): move fd derive into core #8540

Merged
merged 2 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 36 additions & 19 deletions src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use super::super::utils::TableCatalogBuilder;
use super::{stream, GenericPlanNode, GenericPlanRef};
use crate::expr::{Expr, ExprRewriter, InputRef, InputRefDisplay};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::{
ColIndexMapping, ColIndexMappingRewriteExt, Condition, ConditionDisplay, IndexRewriter,
Expand All @@ -46,12 +47,31 @@ pub struct Agg<PlanRef> {
pub input: PlanRef,
}

impl<PlanRef> Agg<PlanRef> {
impl<PlanRef: GenericPlanRef> Agg<PlanRef> {
pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
self.agg_calls.iter_mut().for_each(|call| {
call.filter = call.filter.clone().rewrite_expr(r);
});
}

fn output_len(&self) -> usize {
self.group_key.len() + self.agg_calls.len()
}

/// get the Mapping of columnIndex from input column index to output column index,if a input
/// column corresponds more than one out columns, mapping to any one
pub fn o2i_col_mapping(&self) -> ColIndexMapping {
let mut map = vec![None; self.output_len()];
for (i, key) in self.group_key.iter().enumerate() {
map[i] = Some(*key);
}
ColIndexMapping::with_target_size(map, self.input.schema().len())
}

/// get the Mapping of columnIndex from input column index to out column index
pub fn i2o_col_mapping(&self) -> ColIndexMapping {
self.o2i_col_mapping().inverse()
}
}

impl<PlanRef: GenericPlanRef> GenericPlanNode for Agg<PlanRef> {
Expand Down Expand Up @@ -80,6 +100,21 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Agg<PlanRef> {
fn ctx(&self) -> OptimizerContextRef {
self.input.ctx()
}

fn functional_dependency(&self) -> FunctionalDependencySet {
let output_len = self.output_len();
let _input_len = self.input.schema().len();
let mut fd_set =
FunctionalDependencySet::with_key(output_len, &(0..self.group_key.len()).collect_vec());
// take group keys from input_columns, then grow the target size to column_cnt
let i2o = self.i2o_col_mapping();
for fd in self.input.functional_dependency().as_dependencies() {
if let Some(fd) = i2o.rewrite_functional_dependency(fd) {
fd_set.add_functional_dependency(fd);
}
}
fd_set
}
}

pub enum AggCallState {
Expand Down Expand Up @@ -318,24 +353,6 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
.collect()
}

/// get the Mapping of columnIndex from input column index to output column index,if a input
/// column corresponds more than one out columns, mapping to any one
pub fn o2i_col_mapping(&self) -> ColIndexMapping {
let input_len = self.input.schema().len();
let agg_cal_num = self.agg_calls.len();
let group_key = &self.group_key;
let mut map = vec![None; agg_cal_num + group_key.len()];
for (i, key) in group_key.iter().enumerate() {
map[i] = Some(*key);
}
ColIndexMapping::with_target_size(map, input_len)
}

/// get the Mapping of columnIndex from input column index to out column index
pub fn i2o_col_mapping(&self) -> ColIndexMapping {
self.o2i_col_mapping().inverse()
}

pub fn infer_result_table(
&self,
me: &impl GenericPlanRef,
Expand Down
49 changes: 49 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
use itertools::Itertools;
use risingwave_common::catalog::{Field, FieldDisplay, Schema};
use risingwave_common::types::DataType;
use risingwave_common::util::column_index_mapping::ColIndexMapping;

use super::{GenericPlanNode, GenericPlanRef};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;

/// [`Expand`] expand one row multiple times according to `column_subsets` and also keep
/// original columns of input. It can be used to implement distinct aggregation and group set.
Expand All @@ -35,6 +37,16 @@ pub struct Expand<PlanRef> {
pub input: PlanRef,
}

impl<PlanRef: GenericPlanRef> Expand<PlanRef> {
fn output_len(&self) -> usize {
self.input.schema().len() * 2 + 1
}

fn flag_index(&self) -> usize {
self.output_len() - 1
}
}

impl<PlanRef: GenericPlanRef> GenericPlanNode for Expand<PlanRef> {
fn schema(&self) -> Schema {
let mut fields = self.input.schema().clone().into_fields();
Expand All @@ -59,6 +71,31 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Expand<PlanRef> {
fn ctx(&self) -> OptimizerContextRef {
self.input.ctx()
}

fn functional_dependency(&self) -> FunctionalDependencySet {
let input_fd = self
.input
.functional_dependency()
.clone()
.into_dependencies();
let output_len = self.output_len();
let flag_index = self.flag_index();

self.input
.functional_dependency()
.as_dependencies()
.iter()
.map(|_input_fd| {})
.collect_vec();

let mut current_fd = FunctionalDependencySet::new(output_len);
for mut fd in input_fd {
fd.grow(output_len);
fd.set_from(flag_index, true);
current_fd.add_functional_dependency(fd);
}
current_fd
}
}

impl<PlanRef: GenericPlanRef> Expand<PlanRef> {
Expand All @@ -73,4 +110,16 @@ impl<PlanRef: GenericPlanRef> Expand<PlanRef> {
})
.collect_vec()
}

pub fn i2o_col_mapping(&self) -> ColIndexMapping {
let input_len = self.input.schema().len();
let map = (0..input_len)
.map(|source| Some(source + input_len))
.collect_vec();
ColIndexMapping::with_target_size(map, self.output_len())
}

pub fn o2i_col_mapping(&self) -> ColIndexMapping {
self.i2o_col_mapping().inverse()
}
}
17 changes: 17 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use risingwave_common::catalog::Schema;
use super::{GenericPlanNode, GenericPlanRef};
use crate::expr::ExprRewriter;
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;
use crate::utils::Condition;

/// [`Filter`] iterates over its input and returns elements for which `predicate` evaluates to
Expand All @@ -29,6 +30,7 @@ pub struct Filter<PlanRef> {
pub input: PlanRef,
}

impl<PlanRef: GenericPlanRef> Filter<PlanRef> {}
impl<PlanRef: GenericPlanRef> GenericPlanNode for Filter<PlanRef> {
fn schema(&self) -> Schema {
self.input.schema().clone()
Expand All @@ -41,6 +43,21 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Filter<PlanRef> {
fn ctx(&self) -> OptimizerContextRef {
self.input.ctx()
}

fn functional_dependency(&self) -> FunctionalDependencySet {
let mut functional_dependency = self.input.functional_dependency().clone();
for i in &self.predicate.conjunctions {
if let Some((col, _)) = i.as_eq_const() {
functional_dependency.add_constant_columns(&[col.index()])
} else if let Some((left, right)) = i.as_eq_cond() {
functional_dependency
.add_functional_dependency_by_column_indices(&[left.index()], &[right.index()]);
functional_dependency
.add_functional_dependency_by_column_indices(&[right.index()], &[left.index()]);
}
}
functional_dependency
}
}

impl<PlanRef> Filter<PlanRef> {
Expand Down
34 changes: 24 additions & 10 deletions src/frontend/src/optimizer/plan_node/generic/hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use super::super::utils::IndicesDisplay;
use super::{GenericPlanNode, GenericPlanRef};
use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef, InputRefDisplay, Literal};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;
use crate::utils::ColIndexMappingRewriteExt;

/// [`HopWindow`] implements Hop Table Function.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -95,6 +97,24 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for HopWindow<PlanRef> {
fn ctx(&self) -> OptimizerContextRef {
self.input.ctx()
}

fn functional_dependency(&self) -> FunctionalDependencySet {
let mut fd_set = self
.i2o_col_mapping()
.rewrite_functional_dependency_set(self.input.functional_dependency().clone());
let (start_idx_in_output, end_idx_in_output) = {
let internal2output = self.internal2output_col_mapping();
(
internal2output.try_map(self.internal_window_start_col_idx()),
internal2output.try_map(self.internal_window_end_col_idx()),
)
};
if let Some(start_idx) = start_idx_in_output && let Some(end_idx) = end_idx_in_output {
fd_set.add_functional_dependency_by_column_indices(&[start_idx], &[end_idx]);
fd_set.add_functional_dependency_by_column_indices(&[end_idx], &[start_idx]);
}
fd_set
}
}

impl<PlanRef: GenericPlanRef> HopWindow<PlanRef> {
Expand All @@ -113,7 +133,7 @@ impl<PlanRef: GenericPlanRef> HopWindow<PlanRef> {
}

pub fn internal_window_end_col_idx(&self) -> usize {
self.internal_window_start_col_idx() + 1
self.input.schema().len() + 1
}

pub fn o2i_col_mapping(&self) -> ColIndexMapping {
Expand All @@ -127,7 +147,7 @@ impl<PlanRef: GenericPlanRef> HopWindow<PlanRef> {
}

pub fn internal_column_num(&self) -> usize {
self.internal_window_start_col_idx() + 2
self.input.schema().len() + 2
}

pub fn output2internal_col_mapping(&self) -> ColIndexMapping {
Expand All @@ -139,17 +159,11 @@ impl<PlanRef: GenericPlanRef> HopWindow<PlanRef> {
}

pub fn input2internal_col_mapping(&self) -> ColIndexMapping {
ColIndexMapping::identity_or_none(
self.internal_window_start_col_idx(),
self.internal_column_num(),
)
ColIndexMapping::identity_or_none(self.input.schema().len(), self.internal_column_num())
}

pub fn internal2input_col_mapping(&self) -> ColIndexMapping {
ColIndexMapping::identity_or_none(
self.internal_column_num(),
self.internal_window_start_col_idx(),
)
ColIndexMapping::identity_or_none(self.internal_column_num(), self.input.schema().len())
}

pub fn derive_window_start_and_end_exprs(&self) -> Result<(Vec<ExprImpl>, Vec<ExprImpl>)> {
Expand Down
59 changes: 58 additions & 1 deletion src/frontend/src/optimizer/plan_node/generic/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use risingwave_pb::plan_common::JoinType;
use super::{EqJoinPredicate, GenericPlanNode, GenericPlanRef};
use crate::expr::ExprRewriter;
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::utils::{ColIndexMapping, Condition};
use crate::optimizer::property::FunctionalDependencySet;
use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition};

/// [`Join`] combines two relations according to some condition.
///
Expand Down Expand Up @@ -141,6 +142,62 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Join<PlanRef> {
fn ctx(&self) -> OptimizerContextRef {
self.left.ctx()
}

fn functional_dependency(&self) -> FunctionalDependencySet {
let left_len = self.left.schema().len();
let right_len = self.right.schema().len();
let left_fd_set = self.left.functional_dependency().clone();
let right_fd_set = self.right.functional_dependency().clone();

let full_out_col_num = self.internal_column_num();

let get_new_left_fd_set = |left_fd_set: FunctionalDependencySet| {
ColIndexMapping::with_shift_offset(left_len, 0)
.composite(&ColIndexMapping::identity(full_out_col_num))
.rewrite_functional_dependency_set(left_fd_set)
};
let get_new_right_fd_set = |right_fd_set: FunctionalDependencySet| {
ColIndexMapping::with_shift_offset(right_len, left_len.try_into().unwrap())
.rewrite_functional_dependency_set(right_fd_set)
};
let fd_set: FunctionalDependencySet = match self.join_type {
JoinType::Inner => {
let mut fd_set = FunctionalDependencySet::new(full_out_col_num);
for i in &self.on.conjunctions {
if let Some((col, _)) = i.as_eq_const() {
fd_set.add_constant_columns(&[col.index()])
} else if let Some((left, right)) = i.as_eq_cond() {
fd_set.add_functional_dependency_by_column_indices(
&[left.index()],
&[right.index()],
);
fd_set.add_functional_dependency_by_column_indices(
&[right.index()],
&[left.index()],
);
}
}
get_new_left_fd_set(left_fd_set)
.into_dependencies()
.into_iter()
.chain(
get_new_right_fd_set(right_fd_set)
.into_dependencies()
.into_iter(),
)
.for_each(|fd| fd_set.add_functional_dependency(fd));
fd_set
}
JoinType::LeftOuter => get_new_left_fd_set(left_fd_set),
JoinType::RightOuter => get_new_right_fd_set(right_fd_set),
JoinType::FullOuter => FunctionalDependencySet::new(full_out_col_num),
JoinType::LeftSemi | JoinType::LeftAnti => left_fd_set,
JoinType::RightSemi | JoinType::RightAnti => right_fd_set,
JoinType::Unspecified => unreachable!(),
};
ColIndexMapping::with_remaining_columns(&self.output_indices, full_out_col_num)
.rewrite_functional_dependency_set(fd_set)
}
}

impl<PlanRef> Join<PlanRef> {
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use risingwave_common::catalog::Schema;

use super::{stream, EqJoinPredicate};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;

pub mod dynamic_filter;
pub use dynamic_filter::*;
Expand Down Expand Up @@ -47,10 +48,20 @@ pub use share::*;
pub trait GenericPlanRef {
fn schema(&self) -> &Schema;
fn logical_pk(&self) -> &[usize];
fn functional_dependency(&self) -> &FunctionalDependencySet;
fn ctx(&self) -> OptimizerContextRef;
}

pub trait GenericPlanNode {
/// return (schema, `logical_pk`, fds)
fn logical_properties(&self) -> (Schema, Option<Vec<usize>>, FunctionalDependencySet) {
(
self.schema(),
self.logical_pk(),
self.functional_dependency(),
)
}
fn functional_dependency(&self) -> FunctionalDependencySet;
fn schema(&self) -> Schema;
fn logical_pk(&self) -> Option<Vec<usize>>;
fn ctx(&self) -> OptimizerContextRef;
Expand Down
8 changes: 7 additions & 1 deletion src/frontend/src/optimizer/plan_node/generic/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use risingwave_common::util::iter_util::ZipEqFast;
use super::{GenericPlanNode, GenericPlanRef};
use crate::expr::{assert_input_ref, Expr, ExprDisplay, ExprImpl, ExprRewriter, InputRef};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::utils::ColIndexMapping;
use crate::optimizer::property::FunctionalDependencySet;
use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt};

fn check_expr_type(expr: &ExprImpl) -> std::result::Result<(), &'static str> {
if expr.has_subquery() {
Expand Down Expand Up @@ -109,6 +110,11 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Project<PlanRef> {
fn ctx(&self) -> OptimizerContextRef {
self.input.ctx()
}

fn functional_dependency(&self) -> FunctionalDependencySet {
let i2o = self.i2o_col_mapping();
i2o.rewrite_functional_dependency_set(self.input.functional_dependency().clone())
}
}

impl<PlanRef: GenericPlanRef> Project<PlanRef> {
Expand Down
Loading