diff --git a/src/frontend/src/optimizer/plan_node/generic/agg.rs b/src/frontend/src/optimizer/plan_node/generic/agg.rs index 750d15de651c..a0322817b4ae 100644 --- a/src/frontend/src/optimizer/plan_node/generic/agg.rs +++ b/src/frontend/src/optimizer/plan_node/generic/agg.rs @@ -27,6 +27,7 @@ use super::super::utils::TableCatalogBuilder; use super::{stream, GenericPlanNode, GenericPlanRef}; use crate::expr::{Expr, ExprRewriter, InputRef, InputRefDisplay}; use crate::optimizer::optimizer_context::OptimizerContextRef; +use crate::optimizer::property::FunctionalDependencySet; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::utils::{ ColIndexMapping, ColIndexMappingRewriteExt, Condition, ConditionDisplay, IndexRewriter, @@ -46,12 +47,31 @@ pub struct Agg { pub input: PlanRef, } -impl Agg { +impl Agg { pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) { self.agg_calls.iter_mut().for_each(|call| { call.filter = call.filter.clone().rewrite_expr(r); }); } + + fn output_len(&self) -> usize { + self.group_key.len() + self.agg_calls.len() + } + + /// get the Mapping of columnIndex from input column index to output column index,if a input + /// column corresponds more than one out columns, mapping to any one + pub fn o2i_col_mapping(&self) -> ColIndexMapping { + let mut map = vec![None; self.output_len()]; + for (i, key) in self.group_key.iter().enumerate() { + map[i] = Some(*key); + } + ColIndexMapping::with_target_size(map, self.input.schema().len()) + } + + /// get the Mapping of columnIndex from input column index to out column index + pub fn i2o_col_mapping(&self) -> ColIndexMapping { + self.o2i_col_mapping().inverse() + } } impl GenericPlanNode for Agg { @@ -80,6 +100,21 @@ impl GenericPlanNode for Agg { fn ctx(&self) -> OptimizerContextRef { self.input.ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + let output_len = self.output_len(); + let _input_len = self.input.schema().len(); + let mut fd_set = + FunctionalDependencySet::with_key(output_len, &(0..self.group_key.len()).collect_vec()); + // take group keys from input_columns, then grow the target size to column_cnt + let i2o = self.i2o_col_mapping(); + for fd in self.input.functional_dependency().as_dependencies() { + if let Some(fd) = i2o.rewrite_functional_dependency(fd) { + fd_set.add_functional_dependency(fd); + } + } + fd_set + } } pub enum AggCallState { @@ -318,24 +353,6 @@ impl Agg { .collect() } - /// get the Mapping of columnIndex from input column index to output column index,if a input - /// column corresponds more than one out columns, mapping to any one - pub fn o2i_col_mapping(&self) -> ColIndexMapping { - let input_len = self.input.schema().len(); - let agg_cal_num = self.agg_calls.len(); - let group_key = &self.group_key; - let mut map = vec![None; agg_cal_num + group_key.len()]; - for (i, key) in group_key.iter().enumerate() { - map[i] = Some(*key); - } - ColIndexMapping::with_target_size(map, input_len) - } - - /// get the Mapping of columnIndex from input column index to out column index - pub fn i2o_col_mapping(&self) -> ColIndexMapping { - self.o2i_col_mapping().inverse() - } - pub fn infer_result_table( &self, me: &impl GenericPlanRef, diff --git a/src/frontend/src/optimizer/plan_node/generic/expand.rs b/src/frontend/src/optimizer/plan_node/generic/expand.rs index 736ed8c68dcc..020d9d027748 100644 --- a/src/frontend/src/optimizer/plan_node/generic/expand.rs +++ b/src/frontend/src/optimizer/plan_node/generic/expand.rs @@ -15,9 +15,11 @@ use itertools::Itertools; use risingwave_common::catalog::{Field, FieldDisplay, Schema}; use risingwave_common::types::DataType; +use risingwave_common::util::column_index_mapping::ColIndexMapping; use super::{GenericPlanNode, GenericPlanRef}; use crate::optimizer::optimizer_context::OptimizerContextRef; +use crate::optimizer::property::FunctionalDependencySet; /// [`Expand`] expand one row multiple times according to `column_subsets` and also keep /// original columns of input. It can be used to implement distinct aggregation and group set. @@ -35,6 +37,16 @@ pub struct Expand { pub input: PlanRef, } +impl Expand { + fn output_len(&self) -> usize { + self.input.schema().len() * 2 + 1 + } + + fn flag_index(&self) -> usize { + self.output_len() - 1 + } +} + impl GenericPlanNode for Expand { fn schema(&self) -> Schema { let mut fields = self.input.schema().clone().into_fields(); @@ -59,6 +71,31 @@ impl GenericPlanNode for Expand { fn ctx(&self) -> OptimizerContextRef { self.input.ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + let input_fd = self + .input + .functional_dependency() + .clone() + .into_dependencies(); + let output_len = self.output_len(); + let flag_index = self.flag_index(); + + self.input + .functional_dependency() + .as_dependencies() + .iter() + .map(|_input_fd| {}) + .collect_vec(); + + let mut current_fd = FunctionalDependencySet::new(output_len); + for mut fd in input_fd { + fd.grow(output_len); + fd.set_from(flag_index, true); + current_fd.add_functional_dependency(fd); + } + current_fd + } } impl Expand { @@ -73,4 +110,16 @@ impl Expand { }) .collect_vec() } + + pub fn i2o_col_mapping(&self) -> ColIndexMapping { + let input_len = self.input.schema().len(); + let map = (0..input_len) + .map(|source| Some(source + input_len)) + .collect_vec(); + ColIndexMapping::with_target_size(map, self.output_len()) + } + + pub fn o2i_col_mapping(&self) -> ColIndexMapping { + self.i2o_col_mapping().inverse() + } } diff --git a/src/frontend/src/optimizer/plan_node/generic/filter.rs b/src/frontend/src/optimizer/plan_node/generic/filter.rs index cfa0adb44c54..f7e3629a5435 100644 --- a/src/frontend/src/optimizer/plan_node/generic/filter.rs +++ b/src/frontend/src/optimizer/plan_node/generic/filter.rs @@ -17,6 +17,7 @@ use risingwave_common::catalog::Schema; use super::{GenericPlanNode, GenericPlanRef}; use crate::expr::ExprRewriter; use crate::optimizer::optimizer_context::OptimizerContextRef; +use crate::optimizer::property::FunctionalDependencySet; use crate::utils::Condition; /// [`Filter`] iterates over its input and returns elements for which `predicate` evaluates to @@ -29,6 +30,7 @@ pub struct Filter { pub input: PlanRef, } +impl Filter {} impl GenericPlanNode for Filter { fn schema(&self) -> Schema { self.input.schema().clone() @@ -41,6 +43,21 @@ impl GenericPlanNode for Filter { fn ctx(&self) -> OptimizerContextRef { self.input.ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + let mut functional_dependency = self.input.functional_dependency().clone(); + for i in &self.predicate.conjunctions { + if let Some((col, _)) = i.as_eq_const() { + functional_dependency.add_constant_columns(&[col.index()]) + } else if let Some((left, right)) = i.as_eq_cond() { + functional_dependency + .add_functional_dependency_by_column_indices(&[left.index()], &[right.index()]); + functional_dependency + .add_functional_dependency_by_column_indices(&[right.index()], &[left.index()]); + } + } + functional_dependency + } } impl Filter { diff --git a/src/frontend/src/optimizer/plan_node/generic/hop_window.rs b/src/frontend/src/optimizer/plan_node/generic/hop_window.rs index d93ef973a300..05b7adcd6b5a 100644 --- a/src/frontend/src/optimizer/plan_node/generic/hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/generic/hop_window.rs @@ -26,6 +26,8 @@ use super::super::utils::IndicesDisplay; use super::{GenericPlanNode, GenericPlanRef}; use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef, InputRefDisplay, Literal}; use crate::optimizer::optimizer_context::OptimizerContextRef; +use crate::optimizer::property::FunctionalDependencySet; +use crate::utils::ColIndexMappingRewriteExt; /// [`HopWindow`] implements Hop Table Function. #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -95,6 +97,24 @@ impl GenericPlanNode for HopWindow { fn ctx(&self) -> OptimizerContextRef { self.input.ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + let mut fd_set = self + .i2o_col_mapping() + .rewrite_functional_dependency_set(self.input.functional_dependency().clone()); + let (start_idx_in_output, end_idx_in_output) = { + let internal2output = self.internal2output_col_mapping(); + ( + internal2output.try_map(self.internal_window_start_col_idx()), + internal2output.try_map(self.internal_window_end_col_idx()), + ) + }; + if let Some(start_idx) = start_idx_in_output && let Some(end_idx) = end_idx_in_output { + fd_set.add_functional_dependency_by_column_indices(&[start_idx], &[end_idx]); + fd_set.add_functional_dependency_by_column_indices(&[end_idx], &[start_idx]); + } + fd_set + } } impl HopWindow { @@ -113,7 +133,7 @@ impl HopWindow { } pub fn internal_window_end_col_idx(&self) -> usize { - self.internal_window_start_col_idx() + 1 + self.input.schema().len() + 1 } pub fn o2i_col_mapping(&self) -> ColIndexMapping { @@ -127,7 +147,7 @@ impl HopWindow { } pub fn internal_column_num(&self) -> usize { - self.internal_window_start_col_idx() + 2 + self.input.schema().len() + 2 } pub fn output2internal_col_mapping(&self) -> ColIndexMapping { @@ -139,17 +159,11 @@ impl HopWindow { } pub fn input2internal_col_mapping(&self) -> ColIndexMapping { - ColIndexMapping::identity_or_none( - self.internal_window_start_col_idx(), - self.internal_column_num(), - ) + ColIndexMapping::identity_or_none(self.input.schema().len(), self.internal_column_num()) } pub fn internal2input_col_mapping(&self) -> ColIndexMapping { - ColIndexMapping::identity_or_none( - self.internal_column_num(), - self.internal_window_start_col_idx(), - ) + ColIndexMapping::identity_or_none(self.internal_column_num(), self.input.schema().len()) } pub fn derive_window_start_and_end_exprs(&self) -> Result<(Vec, Vec)> { diff --git a/src/frontend/src/optimizer/plan_node/generic/join.rs b/src/frontend/src/optimizer/plan_node/generic/join.rs index b8cf9e132fed..ae5dedf213c6 100644 --- a/src/frontend/src/optimizer/plan_node/generic/join.rs +++ b/src/frontend/src/optimizer/plan_node/generic/join.rs @@ -19,7 +19,8 @@ use risingwave_pb::plan_common::JoinType; use super::{EqJoinPredicate, GenericPlanNode, GenericPlanRef}; use crate::expr::ExprRewriter; use crate::optimizer::optimizer_context::OptimizerContextRef; -use crate::utils::{ColIndexMapping, Condition}; +use crate::optimizer::property::FunctionalDependencySet; +use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition}; /// [`Join`] combines two relations according to some condition. /// @@ -141,6 +142,62 @@ impl GenericPlanNode for Join { fn ctx(&self) -> OptimizerContextRef { self.left.ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + let left_len = self.left.schema().len(); + let right_len = self.right.schema().len(); + let left_fd_set = self.left.functional_dependency().clone(); + let right_fd_set = self.right.functional_dependency().clone(); + + let full_out_col_num = self.internal_column_num(); + + let get_new_left_fd_set = |left_fd_set: FunctionalDependencySet| { + ColIndexMapping::with_shift_offset(left_len, 0) + .composite(&ColIndexMapping::identity(full_out_col_num)) + .rewrite_functional_dependency_set(left_fd_set) + }; + let get_new_right_fd_set = |right_fd_set: FunctionalDependencySet| { + ColIndexMapping::with_shift_offset(right_len, left_len.try_into().unwrap()) + .rewrite_functional_dependency_set(right_fd_set) + }; + let fd_set: FunctionalDependencySet = match self.join_type { + JoinType::Inner => { + let mut fd_set = FunctionalDependencySet::new(full_out_col_num); + for i in &self.on.conjunctions { + if let Some((col, _)) = i.as_eq_const() { + fd_set.add_constant_columns(&[col.index()]) + } else if let Some((left, right)) = i.as_eq_cond() { + fd_set.add_functional_dependency_by_column_indices( + &[left.index()], + &[right.index()], + ); + fd_set.add_functional_dependency_by_column_indices( + &[right.index()], + &[left.index()], + ); + } + } + get_new_left_fd_set(left_fd_set) + .into_dependencies() + .into_iter() + .chain( + get_new_right_fd_set(right_fd_set) + .into_dependencies() + .into_iter(), + ) + .for_each(|fd| fd_set.add_functional_dependency(fd)); + fd_set + } + JoinType::LeftOuter => get_new_left_fd_set(left_fd_set), + JoinType::RightOuter => get_new_right_fd_set(right_fd_set), + JoinType::FullOuter => FunctionalDependencySet::new(full_out_col_num), + JoinType::LeftSemi | JoinType::LeftAnti => left_fd_set, + JoinType::RightSemi | JoinType::RightAnti => right_fd_set, + JoinType::Unspecified => unreachable!(), + }; + ColIndexMapping::with_remaining_columns(&self.output_indices, full_out_col_num) + .rewrite_functional_dependency_set(fd_set) + } } impl Join { diff --git a/src/frontend/src/optimizer/plan_node/generic/mod.rs b/src/frontend/src/optimizer/plan_node/generic/mod.rs index 19f0d9a497e6..79cf6771e0e1 100644 --- a/src/frontend/src/optimizer/plan_node/generic/mod.rs +++ b/src/frontend/src/optimizer/plan_node/generic/mod.rs @@ -16,6 +16,7 @@ use risingwave_common::catalog::Schema; use super::{stream, EqJoinPredicate}; use crate::optimizer::optimizer_context::OptimizerContextRef; +use crate::optimizer::property::FunctionalDependencySet; pub mod dynamic_filter; pub use dynamic_filter::*; @@ -47,10 +48,20 @@ pub use share::*; pub trait GenericPlanRef { fn schema(&self) -> &Schema; fn logical_pk(&self) -> &[usize]; + fn functional_dependency(&self) -> &FunctionalDependencySet; fn ctx(&self) -> OptimizerContextRef; } pub trait GenericPlanNode { + /// return (schema, `logical_pk`, fds) + fn logical_properties(&self) -> (Schema, Option>, FunctionalDependencySet) { + ( + self.schema(), + self.logical_pk(), + self.functional_dependency(), + ) + } + fn functional_dependency(&self) -> FunctionalDependencySet; fn schema(&self) -> Schema; fn logical_pk(&self) -> Option>; fn ctx(&self) -> OptimizerContextRef; diff --git a/src/frontend/src/optimizer/plan_node/generic/project.rs b/src/frontend/src/optimizer/plan_node/generic/project.rs index b9da2bedbf1f..0955ebe4fffe 100644 --- a/src/frontend/src/optimizer/plan_node/generic/project.rs +++ b/src/frontend/src/optimizer/plan_node/generic/project.rs @@ -24,7 +24,8 @@ use risingwave_common::util::iter_util::ZipEqFast; use super::{GenericPlanNode, GenericPlanRef}; use crate::expr::{assert_input_ref, Expr, ExprDisplay, ExprImpl, ExprRewriter, InputRef}; use crate::optimizer::optimizer_context::OptimizerContextRef; -use crate::utils::ColIndexMapping; +use crate::optimizer::property::FunctionalDependencySet; +use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt}; fn check_expr_type(expr: &ExprImpl) -> std::result::Result<(), &'static str> { if expr.has_subquery() { @@ -109,6 +110,11 @@ impl GenericPlanNode for Project { fn ctx(&self) -> OptimizerContextRef { self.input.ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + let i2o = self.i2o_col_mapping(); + i2o.rewrite_functional_dependency_set(self.input.functional_dependency().clone()) + } } impl Project { diff --git a/src/frontend/src/optimizer/plan_node/generic/project_set.rs b/src/frontend/src/optimizer/plan_node/generic/project_set.rs index cf1aa1679538..924351dd6e9d 100644 --- a/src/frontend/src/optimizer/plan_node/generic/project_set.rs +++ b/src/frontend/src/optimizer/plan_node/generic/project_set.rs @@ -18,7 +18,8 @@ use risingwave_common::types::DataType; use super::{GenericPlanNode, GenericPlanRef}; use crate::expr::{Expr, ExprDisplay, ExprImpl, ExprRewriter}; use crate::optimizer::optimizer_context::OptimizerContextRef; -use crate::utils::ColIndexMapping; +use crate::optimizer::property::FunctionalDependencySet; +use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt}; /// [`ProjectSet`] projects one row multiple times according to `select_list`. /// @@ -86,6 +87,11 @@ impl GenericPlanNode for ProjectSet { fn ctx(&self) -> OptimizerContextRef { self.input.ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + let i2o = self.i2o_col_mapping(); + i2o.rewrite_functional_dependency_set(self.input.functional_dependency().clone()) + } } impl ProjectSet { diff --git a/src/frontend/src/optimizer/plan_node/generic/scan.rs b/src/frontend/src/optimizer/plan_node/generic/scan.rs index ac7cdf4b3f88..8bbea69222e8 100644 --- a/src/frontend/src/optimizer/plan_node/generic/scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/scan.rs @@ -15,16 +15,19 @@ use std::collections::HashMap; use std::rc::Rc; +use derivative::Derivative; use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableDesc}; use super::GenericPlanNode; use crate::catalog::{ColumnId, IndexCatalog}; use crate::expr::ExprRewriter; use crate::optimizer::optimizer_context::OptimizerContextRef; +use crate::optimizer::property::FunctionalDependencySet; use crate::utils::Condition; /// [`Scan`] returns contents of a table or other equivalent object -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Derivative)] +#[derivative(PartialEq, Eq, Hash)] pub struct Scan { pub table_name: String, pub is_sys_table: bool, @@ -40,6 +43,9 @@ pub struct Scan { /// Help RowSeqScan executor use a better chunk size pub chunk_size: Option, pub for_system_time_as_of_now: bool, + #[derivative(PartialEq = "ignore")] + #[derivative(Hash = "ignore")] + pub ctx: OptimizerContextRef, } impl Scan { @@ -75,7 +81,16 @@ impl GenericPlanNode for Scan { } fn ctx(&self) -> OptimizerContextRef { - unimplemented!() + self.ctx.clone() + } + + fn functional_dependency(&self) -> FunctionalDependencySet { + let pk_indices = self.logical_pk(); + let col_num = self.output_col_idx.len(); + match &pk_indices { + Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices), + None => FunctionalDependencySet::new(col_num), + } } } diff --git a/src/frontend/src/optimizer/plan_node/generic/share.rs b/src/frontend/src/optimizer/plan_node/generic/share.rs index a4292e5e98b7..387d296fc621 100644 --- a/src/frontend/src/optimizer/plan_node/generic/share.rs +++ b/src/frontend/src/optimizer/plan_node/generic/share.rs @@ -18,6 +18,7 @@ use std::hash::Hash; use risingwave_common::catalog::Schema; use super::{GenericPlanNode, GenericPlanRef}; +use crate::optimizer::property::FunctionalDependencySet; use crate::OptimizerContextRef; #[derive(Debug, Clone, PartialEq, Eq)] @@ -43,4 +44,8 @@ impl GenericPlanNode for Share { fn ctx(&self) -> OptimizerContextRef { self.input.borrow().ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + self.input.borrow().functional_dependency().clone() + } } diff --git a/src/frontend/src/optimizer/plan_node/generic/source.rs b/src/frontend/src/optimizer/plan_node/generic/source.rs index 3efbe4e60ea8..56d80b3ae644 100644 --- a/src/frontend/src/optimizer/plan_node/generic/source.rs +++ b/src/frontend/src/optimizer/plan_node/generic/source.rs @@ -11,10 +11,10 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - use std::collections::HashMap; use std::rc::Rc; +use derivative::Derivative; use risingwave_common::catalog::{ColumnDesc, Field, Schema}; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::OrderType; @@ -24,10 +24,12 @@ use super::GenericPlanNode; use crate::catalog::source_catalog::SourceCatalog; use crate::catalog::ColumnId; use crate::optimizer::optimizer_context::OptimizerContextRef; +use crate::optimizer::property::FunctionalDependencySet; use crate::{TableCatalog, WithOptions}; /// [`Source`] returns contents of a table or other equivalent object -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Derivative)] +#[derivative(PartialEq, Eq, Hash)] pub struct Source { /// If there is an external stream source, `catalog` will be `Some`. Otherwise, it is `None`. pub catalog: Option>, @@ -40,6 +42,9 @@ pub struct Source { pub gen_row_id: bool, /// True if it is a source created when creating table with a source. pub for_table: bool, + #[derivative(PartialEq = "ignore")] + #[derivative(Hash = "ignore")] + pub ctx: OptimizerContextRef, } impl GenericPlanNode for Source { @@ -60,7 +65,17 @@ impl GenericPlanNode for Source { } fn ctx(&self) -> OptimizerContextRef { - unimplemented!() + self.ctx.clone() + } + + fn functional_dependency(&self) -> FunctionalDependencySet { + let pk_indices = self.logical_pk(); + match pk_indices { + Some(pk_indices) => { + FunctionalDependencySet::with_key(self.column_descs.len(), &pk_indices) + } + None => FunctionalDependencySet::new(self.column_descs.len()), + } } } diff --git a/src/frontend/src/optimizer/plan_node/generic/top_n.rs b/src/frontend/src/optimizer/plan_node/generic/top_n.rs index 2c8d34833154..a356ef937528 100644 --- a/src/frontend/src/optimizer/plan_node/generic/top_n.rs +++ b/src/frontend/src/optimizer/plan_node/generic/top_n.rs @@ -20,7 +20,7 @@ use risingwave_common::util::sort_util::OrderType; use super::super::utils::TableCatalogBuilder; use super::{stream, GenericPlanNode, GenericPlanRef}; use crate::optimizer::optimizer_context::OptimizerContextRef; -use crate::optimizer::property::Order; +use crate::optimizer::property::{FunctionalDependencySet, Order}; use crate::TableCatalog; /// `TopN` sorts the input data and fetches up to `limit` rows from `offset` #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -98,4 +98,8 @@ impl GenericPlanNode for TopN { fn ctx(&self) -> OptimizerContextRef { self.input.ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + self.input.functional_dependency().clone() + } } diff --git a/src/frontend/src/optimizer/plan_node/generic/union.rs b/src/frontend/src/optimizer/plan_node/generic/union.rs index 578d04fe68e4..56dbce2141f3 100644 --- a/src/frontend/src/optimizer/plan_node/generic/union.rs +++ b/src/frontend/src/optimizer/plan_node/generic/union.rs @@ -18,6 +18,7 @@ use risingwave_common::catalog::Schema; use super::{GenericPlanNode, GenericPlanRef}; use crate::optimizer::optimizer_context::OptimizerContextRef; +use crate::optimizer::property::FunctionalDependencySet; /// `Union` returns the union of the rows of its inputs. /// If `all` is false, it needs to eliminate duplicates. @@ -55,6 +56,10 @@ impl GenericPlanNode for Union { fn ctx(&self) -> OptimizerContextRef { self.inputs[0].ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + FunctionalDependencySet::new(self.inputs[0].schema().len()) + } } impl Union { diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index b16806cf7655..03ac112e9178 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -22,9 +22,7 @@ use risingwave_common::types::{DataType, Datum, OrderedF64, ScalarImpl}; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_expr::expr::AggKind; -use super::generic::{ - self, AggCallState, GenericPlanNode, GenericPlanRef, PlanAggCall, ProjectBuilder, -}; +use super::generic::{self, AggCallState, GenericPlanRef, PlanAggCall, ProjectBuilder}; use super::{ BatchHashAgg, BatchSimpleAgg, ColPrunable, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamGlobalSimpleAgg, StreamHashAgg, @@ -39,7 +37,7 @@ use crate::optimizer::plan_node::{ gen_filter_and_pushdown, BatchSortAgg, ColumnPruningContext, LogicalProject, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; -use crate::optimizer::property::{Distribution, FunctionalDependencySet, Order, RequiredDist}; +use crate::optimizer::property::{Distribution, Order, RequiredDist}; use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition, Substitute}; /// `LogicalAgg` groups input data by their group key and computes aggregation functions. @@ -879,27 +877,12 @@ impl ExprRewriter for LogicalAggBuilder { impl LogicalAgg { pub fn new(agg_calls: Vec, group_key: Vec, input: PlanRef) -> Self { - let ctx = input.ctx(); let core = generic::Agg { agg_calls, group_key, input, }; - let schema = core.schema(); - let pk_indices = core.logical_pk(); - let functional_dependency = Self::derive_fd( - schema.len(), - core.input.schema().len(), - core.input.functional_dependency(), - &core.group_key, - ); - - let base = PlanBase::new_logical( - ctx, - schema, - pk_indices.unwrap_or_default(), - functional_dependency, - ); + let base = PlanBase::new_logical_with_core(&core); Self { base, core } } @@ -914,26 +897,6 @@ impl LogicalAgg { self.core.i2o_col_mapping() } - fn derive_fd( - column_cnt: usize, - input_len: usize, - input_fd_set: &FunctionalDependencySet, - group_key: &[usize], - ) -> FunctionalDependencySet { - let mut fd_set = - FunctionalDependencySet::with_key(column_cnt, &(0..group_key.len()).collect_vec()); - // take group keys from input_columns, then grow the target size to column_cnt - let i2o = ColIndexMapping::with_remaining_columns(group_key, input_len).composite( - &ColIndexMapping::identity_or_none(group_key.len(), column_cnt), - ); - for fd in input_fd_set.as_dependencies() { - if let Some(fd) = i2o.rewrite_functional_dependency(fd) { - fd_set.add_functional_dependency(fd); - } - } - fd_set - } - /// `create` will analyze select exprs, group exprs and having, and construct a plan like /// /// ```text diff --git a/src/frontend/src/optimizer/plan_node/logical_expand.rs b/src/frontend/src/optimizer/plan_node/logical_expand.rs index 02a431f77288..a387b2d14516 100644 --- a/src/frontend/src/optimizer/plan_node/logical_expand.rs +++ b/src/frontend/src/optimizer/plan_node/logical_expand.rs @@ -18,7 +18,6 @@ use itertools::Itertools; use risingwave_common::catalog::FieldDisplay; use risingwave_common::error::Result; -use super::generic::GenericPlanNode; use super::{ gen_filter_and_pushdown, generic, BatchExpand, ColPrunable, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamExpand, ToBatch, ToStream, @@ -26,7 +25,6 @@ use super::{ use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; -use crate::optimizer::property::FunctionalDependencySet; use crate::utils::{ColIndexMapping, Condition}; /// [`LogicalExpand`] expands one row multiple times according to `column_subsets` and also keeps @@ -53,29 +51,7 @@ impl LogicalExpand { column_subsets, input, }; - - let ctx = core.ctx(); - let schema = core.schema(); - let pk_indices = core.logical_pk(); - - // TODO(Wenzhuo): change fd according to expand's new definition. - let flag_index = schema.len() - 1; // assume that `flag` is the last column - let functional_dependency = { - let input_fd = core - .input - .functional_dependency() - .clone() - .into_dependencies(); - let mut current_fd = FunctionalDependencySet::new(schema.len()); - for mut fd in input_fd { - fd.grow(schema.len()); - fd.set_from(flag_index, true); - current_fd.add_functional_dependency(fd); - } - current_fd - }; - - let base = PlanBase::new_logical(ctx, schema, pk_indices.unwrap(), functional_dependency); + let base = PlanBase::new_logical_with_core(&core); LogicalExpand { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/logical_filter.rs b/src/frontend/src/optimizer/plan_node/logical_filter.rs index 2d38c6d37a5a..35fa7094fc55 100644 --- a/src/frontend/src/optimizer/plan_node/logical_filter.rs +++ b/src/frontend/src/optimizer/plan_node/logical_filter.rs @@ -20,9 +20,8 @@ use risingwave_common::bail; use risingwave_common::error::Result; use risingwave_common::types::DataType; -use super::generic::{self, GenericPlanNode}; use super::{ - ColPrunable, CollectInputRef, ExprRewritable, LogicalProject, PlanBase, PlanRef, + generic, ColPrunable, CollectInputRef, ExprRewritable, LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, }; use crate::expr::{assert_input_ref, ExprImpl, ExprRewriter, ExprType, FunctionCall, InputRef}; @@ -44,30 +43,12 @@ pub struct LogicalFilter { impl LogicalFilter { pub fn new(input: PlanRef, predicate: Condition) -> Self { - let ctx = input.ctx(); + let _ctx = input.ctx(); for cond in &predicate.conjunctions { assert_input_ref!(cond, input.schema().fields().len()); } - let mut functional_dependency = input.functional_dependency().clone(); - for i in &predicate.conjunctions { - if let Some((col, _)) = i.as_eq_const() { - functional_dependency.add_constant_columns(&[col.index()]) - } else if let Some((left, right)) = i.as_eq_cond() { - functional_dependency - .add_functional_dependency_by_column_indices(&[left.index()], &[right.index()]); - functional_dependency - .add_functional_dependency_by_column_indices(&[right.index()], &[left.index()]); - } - } let core = generic::Filter { predicate, input }; - let schema = core.schema(); - let pk_indices = core.logical_pk(); - let base = PlanBase::new_logical( - ctx, - schema, - pk_indices.unwrap_or_default(), - functional_dependency, - ); + let base = PlanBase::new_logical_with_core(&core); LogicalFilter { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs index d95de4674897..86daeb1030be 100644 --- a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs @@ -16,9 +16,8 @@ use std::fmt; use fixedbitset::FixedBitSet; use itertools::Itertools; -use risingwave_common::catalog::{Field, Schema}; use risingwave_common::error::Result; -use risingwave_common::types::{DataType, IntervalUnit}; +use risingwave_common::types::IntervalUnit; use super::generic::GenericPlanNode; use super::{ @@ -52,24 +51,6 @@ impl LogicalHopWindow { // if output_indices is not specified, use default output_indices let output_indices = output_indices.unwrap_or_else(|| (0..input.schema().len() + 2).collect_vec()); - let output_type = DataType::window_of(&time_col.data_type).unwrap(); - let original_schema: Schema = input - .schema() - .clone() - .into_fields() - .into_iter() - .chain([ - Field::with_name(output_type.clone(), "window_start"), - Field::with_name(output_type, "window_end"), - ]) - .collect(); - let window_start_index = output_indices - .iter() - .position(|&idx| idx == input.schema().len()); - let window_end_index = output_indices - .iter() - .position(|&idx| idx == input.schema().len() + 1); - let core = generic::HopWindow { input, time_col, @@ -78,23 +59,9 @@ impl LogicalHopWindow { output_indices, }; - let schema = core.schema(); - let pk_indices = core.logical_pk(); + let _schema = core.schema(); + let _pk_indices = core.logical_pk(); let ctx = core.ctx(); - let functional_dependency = { - let mut fd_set = - ColIndexMapping::identity_or_none(core.input.schema().len(), original_schema.len()) - .composite(&ColIndexMapping::with_remaining_columns( - &core.output_indices, - original_schema.len(), - )) - .rewrite_functional_dependency_set(core.input.functional_dependency().clone()); - if let Some(start_idx) = window_start_index && let Some(end_idx) = window_end_index { - fd_set.add_functional_dependency_by_column_indices(&[start_idx], &[end_idx]); - fd_set.add_functional_dependency_by_column_indices(&[end_idx], &[start_idx]); - } - fd_set - }; // NOTE(st1page): add join keys in the pk_indices a work around before we really have stream // key. @@ -107,9 +74,9 @@ impl LogicalHopWindow { let base = PlanBase::new_logical( ctx, - schema, - pk_indices.unwrap_or_default(), - functional_dependency, + core.schema(), + core.logical_pk().unwrap_or_default(), + core.functional_dependency(), ); LogicalHopWindow { base, core } diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index aa209043f1d4..a3bc88cdb701 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -23,7 +23,6 @@ use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_pb::plan_common::JoinType; use risingwave_pb::stream_plan::ChainType; -use super::generic::GenericPlanNode; use super::{ generic, ColPrunable, CollectInputRef, ExprRewritable, LogicalProject, PlanBase, PlanRef, PlanTreeNodeBinary, PredicatePushdown, StreamHashJoin, StreamProject, ToBatch, ToStream, @@ -40,7 +39,7 @@ use crate::optimizer::plan_node::{ StreamDynamicFilter, StreamFilter, StreamTableScan, StreamTemporalJoin, ToStreamContext, }; use crate::optimizer::plan_visitor::{MaxOneRowVisitor, PlanVisitor}; -use crate::optimizer::property::{Distribution, FunctionalDependencySet, Order, RequiredDist}; +use crate::optimizer::property::{Distribution, Order, RequiredDist}; use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition, ConditionDisplay}; /// `LogicalJoin` combines two relations according to some condition. @@ -125,28 +124,7 @@ impl LogicalJoin { } pub fn with_core(core: generic::Join) -> Self { - let ctx = core.ctx(); - let schema = core.schema(); - let pk_indices = core.logical_pk(); - - // NOTE(st1page) over - let functional_dependency = Self::derive_fd(&core); - - // NOTE(st1page): add join keys in the pk_indices a work around before we really have stream - // key. - // let pk_indices = match pk_indices { - // Some(pk_indices) if functional_dependency.is_key(&pk_indices) => { - // functional_dependency.minimize_key(&pk_indices) - // } - // _ => pk_indices.unwrap_or_default(), - // }; - - let base = PlanBase::new_logical( - ctx, - schema, - pk_indices.unwrap_or_default(), - functional_dependency, - ); + let base = PlanBase::new_logical_with_core(&core); LogicalJoin { base, core } } @@ -185,62 +163,6 @@ impl LogicalJoin { ColIndexMapping::new(self.output_indices().iter().map(|x| Some(*x)).collect()) } - fn derive_fd(core: &generic::Join) -> FunctionalDependencySet { - let left_len = core.left.schema().len(); - let right_len = core.right.schema().len(); - let left_fd_set = core.left.functional_dependency().clone(); - let right_fd_set = core.right.functional_dependency().clone(); - - let full_out_col_num = core.internal_column_num(); - - let get_new_left_fd_set = |left_fd_set: FunctionalDependencySet| { - ColIndexMapping::with_shift_offset(left_len, 0) - .composite(&ColIndexMapping::identity(full_out_col_num)) - .rewrite_functional_dependency_set(left_fd_set) - }; - let get_new_right_fd_set = |right_fd_set: FunctionalDependencySet| { - ColIndexMapping::with_shift_offset(right_len, left_len.try_into().unwrap()) - .rewrite_functional_dependency_set(right_fd_set) - }; - let fd_set: FunctionalDependencySet = match core.join_type { - JoinType::Inner => { - let mut fd_set = FunctionalDependencySet::new(full_out_col_num); - for i in &core.on.conjunctions { - if let Some((col, _)) = i.as_eq_const() { - fd_set.add_constant_columns(&[col.index()]) - } else if let Some((left, right)) = i.as_eq_cond() { - fd_set.add_functional_dependency_by_column_indices( - &[left.index()], - &[right.index()], - ); - fd_set.add_functional_dependency_by_column_indices( - &[right.index()], - &[left.index()], - ); - } - } - get_new_left_fd_set(left_fd_set) - .into_dependencies() - .into_iter() - .chain( - get_new_right_fd_set(right_fd_set) - .into_dependencies() - .into_iter(), - ) - .for_each(|fd| fd_set.add_functional_dependency(fd)); - fd_set - } - JoinType::LeftOuter => get_new_left_fd_set(left_fd_set), - JoinType::RightOuter => get_new_right_fd_set(right_fd_set), - JoinType::FullOuter => FunctionalDependencySet::new(full_out_col_num), - JoinType::LeftSemi | JoinType::LeftAnti => left_fd_set, - JoinType::RightSemi | JoinType::RightAnti => right_fd_set, - JoinType::Unspecified => unreachable!(), - }; - ColIndexMapping::with_remaining_columns(&core.output_indices, full_out_col_num) - .rewrite_functional_dependency_set(fd_set) - } - /// Get a reference to the logical join's on. pub fn on(&self) -> &Condition { &self.core.on diff --git a/src/frontend/src/optimizer/plan_node/logical_project.rs b/src/frontend/src/optimizer/plan_node/logical_project.rs index 5cef94330ad3..a46757accb62 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project.rs @@ -18,9 +18,8 @@ use fixedbitset::FixedBitSet; use itertools::Itertools; use risingwave_common::error::Result; -use super::generic::{self, GenericPlanNode, Project}; use super::{ - gen_filter_and_pushdown, BatchProject, ColPrunable, ExprRewritable, PlanBase, PlanRef, + gen_filter_and_pushdown, generic, BatchProject, ColPrunable, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamProject, ToBatch, ToStream, }; use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor, InputRef}; @@ -29,7 +28,7 @@ use crate::optimizer::plan_node::{ CollectInputRef, ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; -use crate::optimizer::property::{Distribution, FunctionalDependencySet, Order, RequiredDist}; +use crate::optimizer::property::{Distribution, Order, RequiredDist}; use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition, Substitute}; /// `LogicalProject` computes a set of expressions from its input relation. @@ -50,18 +49,7 @@ impl LogicalProject { } pub fn with_core(core: generic::Project) -> Self { - let ctx = core.input.ctx(); - - let schema = core.schema(); - let pk_indices = core.logical_pk(); - let functional_dependency = Self::derive_fd(&core, core.input.functional_dependency()); - - let base = PlanBase::new_logical( - ctx, - schema, - pk_indices.unwrap_or_default(), - functional_dependency, - ); + let base = PlanBase::new_logical_with_core(&core); LogicalProject { base, core } } @@ -93,20 +81,6 @@ impl LogicalProject { Self::with_core(generic::Project::with_out_col_idx(input, out_fields)) } - fn derive_fd( - core: &Project, - input_fd_set: &FunctionalDependencySet, - ) -> FunctionalDependencySet { - let i2o = core.i2o_col_mapping(); - let mut fd_set = FunctionalDependencySet::new(core.exprs.len()); - for fd in input_fd_set.as_dependencies() { - if let Some(fd) = i2o.rewrite_functional_dependency(fd) { - fd_set.add_functional_dependency(fd); - } - } - fd_set - } - pub fn exprs(&self) -> &Vec { &self.core.exprs } diff --git a/src/frontend/src/optimizer/plan_node/logical_project_set.rs b/src/frontend/src/optimizer/plan_node/logical_project_set.rs index 1f2e32dabaeb..52cb7bda9e3e 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project_set.rs @@ -21,11 +21,10 @@ use super::{ PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamProjectSet, ToBatch, ToStream, }; use crate::expr::{Expr, ExprImpl, ExprRewriter, FunctionCall, InputRef, TableFunction}; -use crate::optimizer::plan_node::generic::GenericPlanNode; use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; -use crate::optimizer::property::{FunctionalDependencySet, Order}; +use crate::optimizer::property::Order; use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition}; /// `LogicalProjectSet` projects one row multiple times according to `select_list`. @@ -50,13 +49,7 @@ impl LogicalProjectSet { ); let core = generic::ProjectSet { select_list, input }; - - let ctx = core.ctx(); - let schema = core.schema(); - let pk_indices = core.logical_pk(); - let functional_dependency = Self::derive_fd(&core, core.input.functional_dependency()); - - let base = PlanBase::new_logical(ctx, schema, pk_indices.unwrap(), functional_dependency); + let base = PlanBase::new_logical_with_core(&core); LogicalProjectSet { base, core } } @@ -174,14 +167,6 @@ impl LogicalProjectSet { } } - fn derive_fd( - core: &generic::ProjectSet, - input_fd_set: &FunctionalDependencySet, - ) -> FunctionalDependencySet { - let i2o = core.i2o_col_mapping(); - i2o.rewrite_functional_dependency_set(input_fd_set.clone()) - } - pub fn select_list(&self) -> &Vec { &self.core.select_list } diff --git a/src/frontend/src/optimizer/plan_node/logical_scan.rs b/src/frontend/src/optimizer/plan_node/logical_scan.rs index 17f2cb3e6572..1a4050f90a30 100644 --- a/src/frontend/src/optimizer/plan_node/logical_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_scan.rs @@ -22,7 +22,7 @@ use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableDesc}; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::util::sort_util::ColumnOrder; -use super::generic::{GenericPlanNode, GenericPlanRef}; +use super::generic::GenericPlanRef; use super::{ generic, BatchFilter, BatchProject, ColPrunable, ExprRewritable, PlanBase, PlanRef, PredicatePushdown, StreamTableScan, ToBatch, ToStream, @@ -36,7 +36,7 @@ use crate::optimizer::plan_node::{ BatchSeqScan, ColumnPruningContext, LogicalFilter, LogicalProject, LogicalValues, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; -use crate::optimizer::property::{FunctionalDependencySet, Order}; +use crate::optimizer::property::Order; use crate::optimizer::rule::IndexSelectionRule; use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition, ConditionDisplay}; @@ -89,21 +89,10 @@ impl LogicalScan { predicate, chunk_size: None, for_system_time_as_of_now, + ctx, }; - let schema = core.schema(); - let pk_indices = core.logical_pk(); - - let functional_dependency = match &pk_indices { - Some(pk_indices) => FunctionalDependencySet::with_key(schema.len(), pk_indices), - None => FunctionalDependencySet::new(schema.len()), - }; - let base = PlanBase::new_logical( - ctx, - schema, - pk_indices.unwrap_or_default(), - functional_dependency, - ); + let base = PlanBase::new_logical_with_core(&core); Self { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/logical_share.rs b/src/frontend/src/optimizer/plan_node/logical_share.rs index 33869e52db0e..99d358242363 100644 --- a/src/frontend/src/optimizer/plan_node/logical_share.rs +++ b/src/frontend/src/optimizer/plan_node/logical_share.rs @@ -18,10 +18,9 @@ use std::fmt; use risingwave_common::error::ErrorCode::NotImplemented; use risingwave_common::error::Result; -use super::generic::{self, GenericPlanNode}; use super::{ - ColPrunable, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, ToBatch, - ToStream, + generic, ColPrunable, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, + ToBatch, ToStream, }; use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{ @@ -55,19 +54,12 @@ pub struct LogicalShare { impl LogicalShare { pub fn new(input: PlanRef) -> Self { - let ctx = input.ctx(); - let functional_dependency = input.functional_dependency().clone(); + let _ctx = input.ctx(); + let _functional_dependency = input.functional_dependency().clone(); let core = generic::Share { input: RefCell::new(input), }; - let schema = core.schema(); - let pk_indices = core.logical_pk(); - let base = PlanBase::new_logical( - ctx, - schema, - pk_indices.unwrap_or_default(), - functional_dependency, - ); + let base = PlanBase::new_logical_with_core(&core); LogicalShare { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 24f8ddec34cb..5300db3ca261 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -22,7 +22,6 @@ use risingwave_common::catalog::{ColumnDesc, Schema}; use risingwave_common::error::Result; use risingwave_connector::source::DataType; -use super::generic::GenericPlanNode; use super::stream_watermark_filter::StreamWatermarkFilter; use super::{ generic, BatchSource, ColPrunable, ExprRewritable, LogicalFilter, LogicalProject, PlanBase, @@ -35,7 +34,6 @@ use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; -use crate::optimizer::property::FunctionalDependencySet; use crate::utils::{ColIndexMapping, Condition}; use crate::TableCatalog; @@ -72,20 +70,10 @@ impl LogicalSource { row_id_index, gen_row_id, for_table, + ctx, }; - let schema = core.schema(); - let pk_indices = core.logical_pk(); - - let (functional_dependency, pk_indices) = match pk_indices { - Some(pk_indices) => ( - FunctionalDependencySet::with_key(schema.len(), &pk_indices), - pk_indices, - ), - None => (FunctionalDependencySet::new(schema.len()), vec![]), - }; - - let base = PlanBase::new_logical(ctx, schema, pk_indices, functional_dependency); + let base = PlanBase::new_logical_with_core(&core); let kafka_timestamp_range = (Bound::Unbounded, Bound::Unbounded); LogicalSource { diff --git a/src/frontend/src/optimizer/plan_node/logical_union.rs b/src/frontend/src/optimizer/plan_node/logical_union.rs index 1dc2326602e2..ce3d52ac1bc8 100644 --- a/src/frontend/src/optimizer/plan_node/logical_union.rs +++ b/src/frontend/src/optimizer/plan_node/logical_union.rs @@ -20,13 +20,13 @@ use risingwave_common::types::{DataType, Scalar}; use super::{ColPrunable, ExprRewritable, PlanBase, PlanRef, PredicatePushdown, ToBatch, ToStream}; use crate::expr::{ExprImpl, InputRef, Literal}; -use crate::optimizer::plan_node::generic::{GenericPlanNode, GenericPlanRef}; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::stream_union::StreamUnion; use crate::optimizer::plan_node::{ generic, BatchHashAgg, BatchUnion, ColumnPruningContext, LogicalAgg, LogicalProject, PlanTreeNode, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; -use crate::optimizer::property::{FunctionalDependencySet, RequiredDist}; +use crate::optimizer::property::RequiredDist; use crate::utils::{ColIndexMapping, Condition}; /// `LogicalUnion` returns the union of the rows of its inputs. @@ -50,16 +50,7 @@ impl LogicalUnion { inputs, source_col, }; - let ctx = core.ctx(); - let pk_indices = core.logical_pk(); - let schema = core.schema(); - let functional_dependency = FunctionalDependencySet::new(schema.len()); - let base = PlanBase::new_logical( - ctx, - schema, - pk_indices.unwrap_or_default(), - functional_dependency, - ); + let base = PlanBase::new_logical_with_core(&core); LogicalUnion { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index a902c3a2a013..f31ba853bc2c 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -397,6 +397,10 @@ impl GenericPlanRef for PlanRef { fn ctx(&self) -> OptimizerContextRef { self.plan_base().ctx() } + + fn functional_dependency(&self) -> &FunctionalDependencySet { + self.plan_base().functional_dependency() + } } /// In order to let expression display id started from 1 for explaining, hidden column names and diff --git a/src/frontend/src/optimizer/plan_node/plan_base.rs b/src/frontend/src/optimizer/plan_node/plan_base.rs index 174640e613f6..30749ed0ce6e 100644 --- a/src/frontend/src/optimizer/plan_node/plan_base.rs +++ b/src/frontend/src/optimizer/plan_node/plan_base.rs @@ -17,6 +17,7 @@ use fixedbitset::FixedBitSet; use paste::paste; use risingwave_common::catalog::Schema; +use super::generic::GenericPlanNode; use super::*; use crate::for_all_plan_nodes; use crate::optimizer::optimizer_context::OptimizerContextRef; @@ -63,6 +64,10 @@ impl generic::GenericPlanRef for PlanBase { fn ctx(&self) -> OptimizerContextRef { self.ctx.clone() } + + fn functional_dependency(&self) -> &FunctionalDependencySet { + &self.functional_dependency + } } impl stream::StreamPlanRef for PlanBase { @@ -97,6 +102,15 @@ impl PlanBase { } } + pub fn new_logical_with_core(node: &impl GenericPlanNode) -> Self { + Self::new_logical( + node.ctx(), + node.schema(), + node.logical_pk().unwrap_or_default(), + node.functional_dependency(), + ) + } + pub fn new_stream( ctx: OptimizerContextRef, schema: Schema, diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs index 66d41d239600..0222cd9e82d0 100644 --- a/src/frontend/src/optimizer/plan_node/stream.rs +++ b/src/frontend/src/optimizer/plan_node/stream.rs @@ -29,7 +29,7 @@ use super::{generic, EqJoinPredicate, PlanNodeId}; use crate::expr::{Expr, ExprImpl}; use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::plan_node::plan_tree_node_v2::PlanTreeNodeV2; -use crate::optimizer::property::Distribution; +use crate::optimizer::property::{Distribution, FunctionalDependencySet}; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::TableCatalog; @@ -99,6 +99,10 @@ impl generic::GenericPlanRef for PlanRef { fn ctx(&self) -> OptimizerContextRef { self.0.ctx.clone() } + + fn functional_dependency(&self) -> &FunctionalDependencySet { + self.0.functional_dependency() + } } impl generic::GenericPlanRef for PlanBase { @@ -113,6 +117,10 @@ impl generic::GenericPlanRef for PlanBase { fn ctx(&self) -> OptimizerContextRef { self.ctx.clone() } + + fn functional_dependency(&self) -> &FunctionalDependencySet { + todo!() + } } impl StreamPlanRef for PlanBase { diff --git a/src/frontend/src/optimizer/plan_node/stream_derive.rs b/src/frontend/src/optimizer/plan_node/stream_derive.rs index 63a918f8ed89..fdc092cfa951 100644 --- a/src/frontend/src/optimizer/plan_node/stream_derive.rs +++ b/src/frontend/src/optimizer/plan_node/stream_derive.rs @@ -17,7 +17,7 @@ use risingwave_common::catalog::Schema; use super::generic::GenericPlanNode; use super::stream::*; use crate::optimizer::optimizer_context::OptimizerContextRef; -use crate::optimizer::property::Distribution; +use crate::optimizer::property::{Distribution, FunctionalDependencySet}; use crate::utils::ColIndexMappingRewriteExt; impl GenericPlanNode for DynamicFilter { @@ -32,8 +32,11 @@ impl GenericPlanNode for DynamicFilter { fn ctx(&self) -> OptimizerContextRef { todo!("new plan node derivation") } -} + fn functional_dependency(&self) -> FunctionalDependencySet { + todo!("new plan node derivation") + } +} impl StreamPlanNode for DynamicFilter { fn distribution(&self) -> Distribution { todo!() @@ -56,6 +59,10 @@ impl GenericPlanNode for Exchange { fn ctx(&self) -> OptimizerContextRef { todo!("new plan node derivation") } + + fn functional_dependency(&self) -> FunctionalDependencySet { + todo!("new plan node derivation") + } } impl StreamPlanNode for Exchange { @@ -80,6 +87,10 @@ impl GenericPlanNode for DeltaJoin { fn ctx(&self) -> OptimizerContextRef { self.core.ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + self.core.functional_dependency() + } } impl StreamPlanNode for DeltaJoin { @@ -104,6 +115,10 @@ impl GenericPlanNode for Expand { fn ctx(&self) -> OptimizerContextRef { self.core.ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + self.core.functional_dependency() + } } impl StreamPlanNode for Expand { @@ -128,6 +143,10 @@ impl GenericPlanNode for Filter { fn ctx(&self) -> OptimizerContextRef { self.core.ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + self.core.functional_dependency() + } } impl StreamPlanNode for Filter { @@ -152,6 +171,10 @@ impl GenericPlanNode for GlobalSimpleAgg { fn ctx(&self) -> OptimizerContextRef { self.core.ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + self.core.functional_dependency() + } } impl StreamPlanNode for GlobalSimpleAgg { @@ -176,6 +199,10 @@ impl GenericPlanNode for GroupTopN { fn ctx(&self) -> OptimizerContextRef { self.core.ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + self.core.functional_dependency() + } } impl StreamPlanNode for GroupTopN { @@ -200,6 +227,10 @@ impl GenericPlanNode for HashAgg { fn ctx(&self) -> OptimizerContextRef { self.core.ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + self.core.functional_dependency() + } } impl StreamPlanNode for HashAgg { @@ -224,6 +255,10 @@ impl GenericPlanNode for HashJoin { fn ctx(&self) -> OptimizerContextRef { self.core.ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + self.core.functional_dependency() + } } impl StreamPlanNode for HashJoin { @@ -248,6 +283,10 @@ impl GenericPlanNode for HopWindow { fn ctx(&self) -> OptimizerContextRef { self.core.ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + self.core.functional_dependency() + } } impl StreamPlanNode for HopWindow { @@ -272,6 +311,10 @@ impl GenericPlanNode for IndexScan { fn ctx(&self) -> OptimizerContextRef { self.core.ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + self.core.functional_dependency() + } } impl StreamPlanNode for IndexScan { @@ -296,6 +339,10 @@ impl GenericPlanNode for LocalSimpleAgg { fn ctx(&self) -> OptimizerContextRef { self.core.ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + self.core.functional_dependency() + } } impl StreamPlanNode for LocalSimpleAgg { @@ -320,6 +367,10 @@ impl GenericPlanNode for Materialize { fn ctx(&self) -> OptimizerContextRef { todo!("new plan node derivation") } + + fn functional_dependency(&self) -> FunctionalDependencySet { + todo!("new plan node derivation") + } } impl StreamPlanNode for Materialize { @@ -344,6 +395,10 @@ impl GenericPlanNode for ProjectSet { fn ctx(&self) -> OptimizerContextRef { self.core.ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + self.core.functional_dependency() + } } impl StreamPlanNode for ProjectSet { @@ -368,6 +423,10 @@ impl GenericPlanNode for Project { fn ctx(&self) -> OptimizerContextRef { self.core.ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + self.core.functional_dependency() + } } impl StreamPlanNode for Project { @@ -394,6 +453,10 @@ impl GenericPlanNode for Sink { fn ctx(&self) -> OptimizerContextRef { todo!("new plan node derivation") } + + fn functional_dependency(&self) -> FunctionalDependencySet { + todo!("new plan node derivation") + } } impl StreamPlanNode for Sink { @@ -418,6 +481,10 @@ impl GenericPlanNode for Source { fn ctx(&self) -> OptimizerContextRef { self.core.ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + self.core.functional_dependency() + } } impl StreamPlanNode for Source { @@ -442,6 +509,10 @@ impl GenericPlanNode for TableScan { fn ctx(&self) -> OptimizerContextRef { self.core.ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + self.core.functional_dependency() + } } impl StreamPlanNode for TableScan { @@ -466,6 +537,10 @@ impl GenericPlanNode for TopN { fn ctx(&self) -> OptimizerContextRef { self.core.ctx() } + + fn functional_dependency(&self) -> FunctionalDependencySet { + self.core.functional_dependency() + } } impl StreamPlanNode for TopN {