diff --git a/src/frontend/src/optimizer/plan_node/batch_limit.rs b/src/frontend/src/optimizer/plan_node/batch_limit.rs index 9b087bc6ce49..fce347b27e48 100644 --- a/src/frontend/src/optimizer/plan_node/batch_limit.rs +++ b/src/frontend/src/optimizer/plan_node/batch_limit.rs @@ -19,8 +19,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::LimitNode; use super::{ - ExprRewritable, LogicalLimit, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, - ToDistributedBatch, + generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, }; use crate::optimizer::plan_node::ToLocalBatch; use crate::optimizer::property::{Order, RequiredDist}; @@ -29,25 +28,23 @@ use crate::optimizer::property::{Order, RequiredDist}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchLimit { pub base: PlanBase, - logical: LogicalLimit, + logical: generic::Limit, } impl BatchLimit { - pub fn new(logical: LogicalLimit) -> Self { - let ctx = logical.base.ctx.clone(); - let base = PlanBase::new_batch( - ctx, - logical.schema().clone(), - logical.input().distribution().clone(), - logical.input().order().clone(), + pub fn new(logical: generic::Limit) -> Self { + let base = PlanBase::new_batch_from_logical( + &logical, + logical.input.distribution().clone(), + logical.input.order().clone(), ); BatchLimit { base, logical } } fn two_phase_limit(&self, input: PlanRef) -> Result { - let new_limit = self.logical.limit() + self.logical.offset(); + let new_limit = self.logical.limit + self.logical.offset; let new_offset = 0; - let logical_partial_limit = LogicalLimit::new(input, new_limit, new_offset); + let logical_partial_limit = generic::Limit::new(input, new_limit, new_offset); let batch_partial_limit = Self::new(logical_partial_limit); let any_order = Order::any(); @@ -74,22 +71,19 @@ impl BatchLimit { impl fmt::Display for BatchLimit { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "BatchLimit {{ limit: {limit}, offset: {offset} }}", - limit = self.logical.limit, - offset = self.logical.offset - ) + self.logical.fmt_with_name(f, "BatchLimit") } } impl PlanTreeNodeUnary for BatchLimit { fn input(&self) -> PlanRef { - self.logical.input() + self.logical.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - Self::new(self.logical.clone_with_input(input)) + let mut core = self.logical.clone(); + core.input = input; + Self::new(core) } } impl_plan_tree_node_for_unary! {BatchLimit} @@ -102,8 +96,8 @@ impl ToDistributedBatch for BatchLimit { impl ToBatchPb for BatchLimit { fn to_batch_prost_body(&self) -> NodeBody { NodeBody::Limit(LimitNode { - limit: self.logical.limit(), - offset: self.logical.offset(), + limit: self.logical.limit, + offset: self.logical.offset, }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_topn.rs b/src/frontend/src/optimizer/plan_node/batch_topn.rs index f6f61410889f..b8b5ba710e46 100644 --- a/src/frontend/src/optimizer/plan_node/batch_topn.rs +++ b/src/frontend/src/optimizer/plan_node/batch_topn.rs @@ -16,13 +16,13 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::TopNNode; -use super::generic::Limit; +use super::generic::TopNLimit; use super::utils::impl_distill_by_unit; use super::{ generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, }; use crate::optimizer::plan_node::batch::BatchPlanRef; -use crate::optimizer::plan_node::{BatchLimit, LogicalLimit, ToLocalBatch}; +use crate::optimizer::plan_node::{BatchLimit, ToLocalBatch}; use crate::optimizer::property::{Order, RequiredDist}; /// `BatchTopN` implements [`super::LogicalTopN`] to find the top N elements with a heap @@ -45,13 +45,13 @@ impl BatchTopN { } fn two_phase_topn(&self, input: PlanRef) -> Result { - let new_limit = Limit::new( + let new_limit = TopNLimit::new( self.logical.limit_attr.limit() + self.logical.offset, self.logical.limit_attr.with_ties(), ); let new_offset = 0; let partial_input: PlanRef = if input.order().satisfies(&self.logical.order) { - let logical_partial_limit = LogicalLimit::new(input, new_limit.limit(), new_offset); + let logical_partial_limit = generic::Limit::new(input, new_limit.limit(), new_offset); let batch_partial_limit = BatchLimit::new(logical_partial_limit); batch_partial_limit.into() } else { diff --git a/src/frontend/src/optimizer/plan_node/generic/limit.rs b/src/frontend/src/optimizer/plan_node/generic/limit.rs new file mode 100644 index 000000000000..9e5da8be3429 --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/generic/limit.rs @@ -0,0 +1,76 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use std::fmt; +use std::hash::Hash; + +use pretty_xmlish::Pretty; +use risingwave_common::catalog::Schema; + +use super::{DistillUnit, GenericPlanNode, GenericPlanRef}; +use crate::optimizer::property::FunctionalDependencySet; +use crate::OptimizerContextRef; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Limit { + pub input: PlanRef, + pub limit: u64, + pub offset: u64, +} + +impl GenericPlanNode for Limit { + fn ctx(&self) -> OptimizerContextRef { + self.input.ctx() + } + + fn schema(&self) -> Schema { + self.input.schema().clone() + } + + fn functional_dependency(&self) -> FunctionalDependencySet { + self.input.functional_dependency().clone() + } + + fn logical_pk(&self) -> Option> { + Some(self.input.logical_pk().to_vec()) + } +} +impl Limit { + pub(crate) fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result { + write!( + f, + "{} {{ limit: {}, offset: {} }}", + name, self.limit, self.offset + ) + } + + pub fn new(input: PlanRef, limit: u64, offset: u64) -> Self { + Limit { + input, + limit, + offset, + } + } +} + +impl DistillUnit for Limit { + fn distill_with_name<'a>(&self, name: &'a str) -> Pretty<'a> { + Pretty::childless_record( + name, + vec![ + ("limit", Pretty::debug(&self.limit)), + ("offset", Pretty::debug(&self.offset)), + ], + ) + } +} diff --git a/src/frontend/src/optimizer/plan_node/generic/mod.rs b/src/frontend/src/optimizer/plan_node/generic/mod.rs index 141235efcca8..4bafdcbbd7e1 100644 --- a/src/frontend/src/optimizer/plan_node/generic/mod.rs +++ b/src/frontend/src/optimizer/plan_node/generic/mod.rs @@ -61,6 +61,8 @@ mod delete; pub use delete::*; mod insert; pub use insert::*; +mod limit; +pub use limit::*; pub trait DistillUnit { fn distill_with_name<'a>(&self, name: &'a str) -> Pretty<'a>; diff --git a/src/frontend/src/optimizer/plan_node/generic/top_n.rs b/src/frontend/src/optimizer/plan_node/generic/top_n.rs index e47c467c02b0..3aa78c098fcf 100644 --- a/src/frontend/src/optimizer/plan_node/generic/top_n.rs +++ b/src/frontend/src/optimizer/plan_node/generic/top_n.rs @@ -28,7 +28,7 @@ use crate::TableCatalog; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct TopN { pub input: PlanRef, - pub limit_attr: Limit, + pub limit_attr: TopNLimit, pub offset: u64, pub order: Order, pub group_key: Vec, @@ -89,7 +89,7 @@ impl TopN { impl TopN { pub fn with_group( input: PlanRef, - limit_attr: Limit, + limit_attr: TopNLimit, offset: u64, order: Order, group_key: Vec, @@ -107,7 +107,7 @@ impl TopN { } } - pub fn without_group(input: PlanRef, limit_attr: Limit, offset: u64, order: Order) -> Self { + pub fn without_group(input: PlanRef, limit_attr: TopNLimit, offset: u64, order: Order) -> Self { if limit_attr.with_ties() { assert!(offset == 0, "WITH TIES is not supported with OFFSET"); } @@ -194,7 +194,7 @@ impl GenericPlanNode for TopN { /// [`Limit`] is used to specify the number of records to return. #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] -pub enum Limit { +pub enum TopNLimit { /// The number of records returned is exactly the same as the number after `LIMIT` in the SQL /// query. Simple(u64), @@ -204,7 +204,7 @@ pub enum Limit { WithTies(u64), } -impl Limit { +impl TopNLimit { pub fn new(limit: u64, with_ties: bool) -> Self { if with_ties { Self::WithTies(limit) @@ -215,15 +215,15 @@ impl Limit { pub fn limit(&self) -> u64 { match self { - Limit::Simple(limit) => *limit, - Limit::WithTies(limit) => *limit, + TopNLimit::Simple(limit) => *limit, + TopNLimit::WithTies(limit) => *limit, } } pub fn with_ties(&self) -> bool { match self { - Limit::Simple(_) => false, - Limit::WithTies(_) => true, + TopNLimit::Simple(_) => false, + TopNLimit::WithTies(_) => true, } } @@ -231,8 +231,8 @@ impl Limit { /// `WITH TIES` satisfies this condition. pub fn max_one_row(&self) -> bool { match self { - Limit::Simple(limit) => *limit == 1, - Limit::WithTies(_) => false, + TopNLimit::Simple(limit) => *limit == 1, + TopNLimit::WithTies(_) => false, } } } diff --git a/src/frontend/src/optimizer/plan_node/logical_dedup.rs b/src/frontend/src/optimizer/plan_node/logical_dedup.rs index 6be7020aa57f..f070d51847fe 100644 --- a/src/frontend/src/optimizer/plan_node/logical_dedup.rs +++ b/src/frontend/src/optimizer/plan_node/logical_dedup.rs @@ -17,7 +17,7 @@ use itertools::Itertools; use risingwave_common::error::Result; use risingwave_common::util::column_index_mapping::ColIndexMapping; -use super::generic::Limit; +use super::generic::TopNLimit; use super::utils::impl_distill_by_unit; use super::{ gen_filter_and_pushdown, generic, BatchGroupTopN, ColPrunable, ColumnPruningContext, @@ -116,7 +116,7 @@ impl ToStream for LogicalDedup { // If the input is not append-only, we use a `StreamGroupTopN` with the limit being 1. let logical_top_n = generic::TopN::with_group( input, - Limit::new(1, false), + TopNLimit::new(1, false), 0, Order::default(), self.dedup_cols().to_vec(), @@ -131,7 +131,7 @@ impl ToBatch for LogicalDedup { let input = self.input().to_batch()?; let logical_top_n = generic::TopN::with_group( input, - Limit::new(1, false), + TopNLimit::new(1, false), 0, Order::default(), self.dedup_cols().to_vec(), diff --git a/src/frontend/src/optimizer/plan_node/logical_limit.rs b/src/frontend/src/optimizer/plan_node/logical_limit.rs index 707c1be88be3..1be42a3811ce 100644 --- a/src/frontend/src/optimizer/plan_node/logical_limit.rs +++ b/src/frontend/src/optimizer/plan_node/logical_limit.rs @@ -12,12 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use risingwave_common::error::{ErrorCode, Result, RwError}; +use super::utils::impl_distill_by_unit; use super::{ - gen_filter_and_pushdown, BatchLimit, ColPrunable, ExprRewritable, PlanBase, PlanRef, + gen_filter_and_pushdown, generic, BatchLimit, ColPrunable, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, }; use crate::optimizer::plan_node::{ @@ -29,47 +28,38 @@ use crate::utils::{ColIndexMapping, Condition}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct LogicalLimit { pub base: PlanBase, - input: PlanRef, - pub(super) limit: u64, - pub(super) offset: u64, + pub(super) core: generic::Limit, } impl LogicalLimit { - pub fn new(input: PlanRef, limit: u64, offset: u64) -> Self { - let ctx = input.ctx(); - let schema = input.schema().clone(); - let pk_indices = input.logical_pk().to_vec(); - let functional_dependency = input.functional_dependency().clone(); - let base = PlanBase::new_logical(ctx, schema, pk_indices, functional_dependency); - LogicalLimit { - base, - input, - limit, - offset, - } + pub fn new(core: generic::Limit) -> Self { + let base = PlanBase::new_logical_with_core(&core); + LogicalLimit { base, core } } /// the function will check if the cond is bool expression pub fn create(input: PlanRef, limit: u64, offset: u64) -> PlanRef { - Self::new(input, limit, offset).into() + Self::new(generic::Limit::new(input, limit, offset)).into() } pub fn limit(&self) -> u64 { - self.limit + self.core.limit } pub fn offset(&self) -> u64 { - self.offset + self.core.offset } } impl PlanTreeNodeUnary for LogicalLimit { fn input(&self) -> PlanRef { - self.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - Self::new(input, self.limit, self.offset) + let mut core = self.core.clone(); + core.input = input; + Self::new(core) } #[must_use] @@ -78,23 +68,15 @@ impl PlanTreeNodeUnary for LogicalLimit { input: PlanRef, input_col_change: ColIndexMapping, ) -> (Self, ColIndexMapping) { - (Self::new(input, self.limit, self.offset), input_col_change) + (self.clone_with_input(input), input_col_change) } } impl_plan_tree_node_for_unary! {LogicalLimit} -impl fmt::Display for LogicalLimit { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "LogicalLimit {{ limit: {}, offset: {} }}", - self.limit, self.offset - ) - } -} +impl_distill_by_unit!(LogicalLimit, core, "LogicalLimit"); impl ColPrunable for LogicalLimit { fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef { - let new_input = self.input.prune_col(required_cols, ctx); + let new_input = self.input().prune_col(required_cols, ctx); self.clone_with_input(new_input).into() } } @@ -115,7 +97,8 @@ impl PredicatePushdown for LogicalLimit { impl ToBatch for LogicalLimit { fn to_batch(&self) -> Result { let new_input = self.input().to_batch()?; - let new_logical = self.clone_with_input(new_input); + let mut new_logical = self.core.clone(); + new_logical.input = new_input; Ok(BatchLimit::new(new_logical).into()) } } @@ -132,7 +115,7 @@ impl ToStream for LogicalLimit { &self, ctx: &mut RewriteStreamContext, ) -> Result<(PlanRef, ColIndexMapping)> { - let (input, input_col_change) = self.input.logical_rewrite_for_stream(ctx)?; + let (input, input_col_change) = self.input().logical_rewrite_for_stream(ctx)?; let (filter, out_col_change) = self.rewrite_with_input(input, input_col_change); Ok((filter.into(), out_col_change)) } diff --git a/src/frontend/src/optimizer/plan_node/logical_topn.rs b/src/frontend/src/optimizer/plan_node/logical_topn.rs index f09e405b43ca..a8c20374cadf 100644 --- a/src/frontend/src/optimizer/plan_node/logical_topn.rs +++ b/src/frontend/src/optimizer/plan_node/logical_topn.rs @@ -17,7 +17,7 @@ use itertools::Itertools; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::util::sort_util::ColumnOrder; -use super::generic::Limit; +use super::generic::TopNLimit; use super::utils::impl_distill_by_unit; use super::{ gen_filter_and_pushdown, generic, BatchGroupTopN, ColPrunable, ExprRewritable, PlanBase, @@ -56,7 +56,7 @@ impl LogicalTopN { order: Order, group_key: Vec, ) -> Self { - let limit_attr = Limit::new(limit, with_ties); + let limit_attr = TopNLimit::new(limit, with_ties); let core = generic::TopN::with_group(input, limit_attr, offset, order, group_key); core.into() } @@ -79,7 +79,7 @@ impl LogicalTopN { Ok(Self::new(input, limit, offset, with_ties, order, group_key).into()) } - pub fn limit_attr(&self) -> Limit { + pub fn limit_attr(&self) -> TopNLimit { self.core.limit_attr } @@ -154,7 +154,7 @@ impl LogicalTopN { ); let vnode_col_idx = exprs.len() - 1; let project = StreamProject::new(generic::Project::new(exprs.clone(), stream_input)); - let limit_attr = Limit::new( + let limit_attr = TopNLimit::new( self.limit_attr().limit() + self.offset(), self.limit_attr().with_ties(), ); diff --git a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs index 188abde91ec0..595f3f8ffd86 100644 --- a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs @@ -19,7 +19,7 @@ use itertools::Itertools; use risingwave_common::catalog::FieldDisplay; use risingwave_pb::stream_plan::stream_node::PbNodeBody; -use super::generic::Limit; +use super::generic::TopNLimit; use super::{generic, ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::property::{Order, OrderDisplay}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -68,7 +68,7 @@ impl StreamGroupTopN { } } - pub fn limit_attr(&self) -> Limit { + pub fn limit_attr(&self) -> TopNLimit { self.logical.limit_attr } diff --git a/src/frontend/src/optimizer/plan_node/stream_topn.rs b/src/frontend/src/optimizer/plan_node/stream_topn.rs index 2d420b46ee10..fba1df109684 100644 --- a/src/frontend/src/optimizer/plan_node/stream_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_topn.rs @@ -17,7 +17,7 @@ use std::fmt; use fixedbitset::FixedBitSet; use risingwave_pb::stream_plan::stream_node::PbNodeBody; -use super::generic::Limit; +use super::generic::TopNLimit; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::property::{Distribution, Order}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -45,7 +45,7 @@ impl StreamTopN { StreamTopN { base, logical } } - pub fn limit_attr(&self) -> Limit { + pub fn limit_attr(&self) -> TopNLimit { self.logical.limit_attr } diff --git a/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs b/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs index c5950a4a00a4..d075c696404a 100644 --- a/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs +++ b/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs @@ -19,7 +19,7 @@ use risingwave_pb::plan_common::JoinType; use super::{DefaultBehavior, DefaultValue, PlanVisitor}; use crate::catalog::system_catalog::pg_catalog::PG_NAMESPACE_TABLE_NAME; -use crate::optimizer::plan_node::generic::Limit; +use crate::optimizer::plan_node::generic::TopNLimit; use crate::optimizer::plan_node::{self, PlanTreeNode, PlanTreeNodeBinary, PlanTreeNodeUnary}; use crate::optimizer::plan_visitor::PlanRef; use crate::optimizer::property::Cardinality; @@ -61,8 +61,8 @@ impl PlanVisitor for CardinalityVisitor { let input = self.visit(plan.input()); match plan.limit_attr() { - Limit::Simple(limit) => input.sub(plan.offset() as usize).min(limit as usize), - Limit::WithTies(limit) => { + TopNLimit::Simple(limit) => input.sub(plan.offset() as usize).min(limit as usize), + TopNLimit::WithTies(limit) => { assert_eq!(plan.offset(), 0, "ties with offset is not supported yet"); input.min((limit as usize)..) }