diff --git a/src/frontend/src/optimizer/plan_node/batch_delete.rs b/src/frontend/src/optimizer/plan_node/batch_delete.rs index f9c233eb61cd..a7344e98e9d2 100644 --- a/src/frontend/src/optimizer/plan_node/batch_delete.rs +++ b/src/frontend/src/optimizer/plan_node/batch_delete.rs @@ -12,12 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::DeleteNode; +use super::utils::impl_distill_by_unit; use super::{ generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, }; @@ -43,11 +42,7 @@ impl BatchDelete { } } -impl fmt::Display for BatchDelete { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.logical.fmt_with_name(f, "BatchDelete") - } -} +impl_distill_by_unit!(BatchDelete, logical, "BatchDelete"); impl PlanTreeNodeUnary for BatchDelete { fn input(&self) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/batch_exchange.rs b/src/frontend/src/optimizer/plan_node/batch_exchange.rs index d69ae041f65b..34a27d843bdc 100644 --- a/src/frontend/src/optimizer/plan_node/batch_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/batch_exchange.rs @@ -14,10 +14,12 @@ use std::fmt; +use pretty_xmlish::Pretty; use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{ExchangeNode, MergeSortExchangeNode}; +use super::utils::Distill; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch}; use crate::optimizer::plan_node::ToLocalBatch; use crate::optimizer::property::{Distribution, DistributionDisplay, Order, OrderDisplay}; @@ -41,20 +43,45 @@ impl BatchExchange { impl fmt::Display for BatchExchange { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let input_schema = self.input.schema(); write!( f, "BatchExchange {{ order: {}, dist: {} }}", OrderDisplay { order: &self.base.order, - input_schema: self.input.schema() + input_schema }, DistributionDisplay { distribution: &self.base.dist, - input_schema: self.input.schema() + input_schema } ) } } +impl Distill for BatchExchange { + fn distill<'a>(&self) -> Pretty<'a> { + let input_schema = self.input.schema(); + Pretty::childless_record( + "BatchExchange", + vec![ + ( + "order", + Pretty::display(&OrderDisplay { + order: &self.base.order, + input_schema, + }), + ), + ( + "dist", + Pretty::display(&DistributionDisplay { + distribution: &self.base.dist, + input_schema, + }), + ), + ], + ) + } +} impl PlanTreeNodeUnary for BatchExchange { fn input(&self) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/batch_expand.rs b/src/frontend/src/optimizer/plan_node/batch_expand.rs index f81b1d7e8463..72caa27858be 100644 --- a/src/frontend/src/optimizer/plan_node/batch_expand.rs +++ b/src/frontend/src/optimizer/plan_node/batch_expand.rs @@ -12,14 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use itertools::Itertools; use risingwave_common::error::Result; use risingwave_pb::batch_plan::expand_node::Subset; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::ExpandNode; +use super::utils::impl_distill_by_unit; use super::{generic, ExprRewritable}; use crate::optimizer::plan_node::{ PlanBase, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, ToLocalBatch, @@ -51,11 +50,7 @@ impl BatchExpand { } } -impl fmt::Display for BatchExpand { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.logical.fmt_with_name(f, "BatchExpand") - } -} +impl_distill_by_unit!(BatchExpand, logical, "BatchExpand"); impl PlanTreeNodeUnary for BatchExpand { fn input(&self) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/batch_filter.rs b/src/frontend/src/optimizer/plan_node/batch_filter.rs index 476bbe4f1fcc..aadbda9800b1 100644 --- a/src/frontend/src/optimizer/plan_node/batch_filter.rs +++ b/src/frontend/src/optimizer/plan_node/batch_filter.rs @@ -12,12 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::FilterNode; +use super::utils::impl_distill_by_unit; use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch}; use crate::expr::{Expr, ExprImpl, ExprRewriter}; use crate::optimizer::plan_node::{PlanBase, ToLocalBatch}; @@ -45,12 +44,7 @@ impl BatchFilter { &self.logical.predicate } } - -impl fmt::Display for BatchFilter { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.logical.fmt_with_name(f, "BatchFilter") - } -} +impl_distill_by_unit!(BatchFilter, logical, "BatchFilter"); impl PlanTreeNodeUnary for BatchFilter { fn input(&self) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/batch_group_topn.rs b/src/frontend/src/optimizer/plan_node/batch_group_topn.rs index 21e137c0da19..1d61b4e9eb37 100644 --- a/src/frontend/src/optimizer/plan_node/batch_group_topn.rs +++ b/src/frontend/src/optimizer/plan_node/batch_group_topn.rs @@ -12,12 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::GroupTopNNode; +use super::utils::impl_distill_by_unit; use super::{ generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, }; @@ -47,11 +46,7 @@ impl BatchGroupTopN { } } -impl fmt::Display for BatchGroupTopN { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.logical.fmt_with_name(f, "BatchGroupTopN") - } -} +impl_distill_by_unit!(BatchGroupTopN, logical, "BatchGroupTopN"); impl PlanTreeNodeUnary for BatchGroupTopN { fn input(&self) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/batch_hash_agg.rs b/src/frontend/src/optimizer/plan_node/batch_hash_agg.rs index bc573d339616..a05d8ff915e7 100644 --- a/src/frontend/src/optimizer/plan_node/batch_hash_agg.rs +++ b/src/frontend/src/optimizer/plan_node/batch_hash_agg.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use fixedbitset::FixedBitSet; use itertools::Itertools; use risingwave_common::error::Result; @@ -21,6 +19,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::HashAggNode; use super::generic::{self, GenericPlanRef, PlanAggCall}; +use super::utils::impl_distill_by_unit; use super::{ ExprRewritable, PlanBase, PlanNodeType, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, @@ -100,11 +99,7 @@ impl BatchHashAgg { } } -impl fmt::Display for BatchHashAgg { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.logical.fmt_with_name(f, "BatchHashAgg") - } -} +impl_distill_by_unit!(BatchHashAgg, logical, "BatchHashAgg"); impl PlanTreeNodeUnary for BatchHashAgg { fn input(&self) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/batch_hop_window.rs b/src/frontend/src/optimizer/plan_node/batch_hop_window.rs index fab259b0747d..c4b84b7232d1 100644 --- a/src/frontend/src/optimizer/plan_node/batch_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/batch_hop_window.rs @@ -12,12 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::HopWindowNode; +use super::utils::impl_distill_by_unit; use super::{ generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, }; @@ -58,12 +57,7 @@ impl BatchHopWindow { } } } - -impl fmt::Display for BatchHopWindow { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.logical.fmt_with_name(f, "BatchHopWindow") - } -} +impl_distill_by_unit!(BatchHopWindow, logical, "BatchHopWindow"); impl PlanTreeNodeUnary for BatchHopWindow { fn input(&self) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/batch_insert.rs b/src/frontend/src/optimizer/plan_node/batch_insert.rs index 95f88c0db19d..f954dc04c2d8 100644 --- a/src/frontend/src/optimizer/plan_node/batch_insert.rs +++ b/src/frontend/src/optimizer/plan_node/batch_insert.rs @@ -53,6 +53,7 @@ impl fmt::Display for BatchInsert { self.logical.fmt_with_name(f, "BatchInsert") } } +// impl_distill_by_unit!(BatchInsert, logical, "BatchInsert"); impl PlanTreeNodeUnary for BatchInsert { fn input(&self) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/batch_project_set.rs b/src/frontend/src/optimizer/plan_node/batch_project_set.rs index 765dfab71035..b86211aaaa21 100644 --- a/src/frontend/src/optimizer/plan_node/batch_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/batch_project_set.rs @@ -12,13 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use itertools::Itertools; use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::ProjectSetNode; +use super::utils::impl_distill_by_unit; use super::{generic, ExprRewritable}; use crate::expr::ExprRewriter; use crate::optimizer::plan_node::{ @@ -48,11 +47,7 @@ impl BatchProjectSet { } } -impl fmt::Display for BatchProjectSet { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.logical.fmt_with_name(f, "BatchProjectSet") - } -} +impl_distill_by_unit!(BatchProjectSet, logical, "BatchProjectSet"); impl PlanTreeNodeUnary for BatchProjectSet { fn input(&self) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs b/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs index d32a90e9247e..b9c747c20505 100644 --- a/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs @@ -12,13 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::SortAggNode; use super::generic::{self, PlanAggCall}; +use super::utils::impl_distill_by_unit; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch}; use crate::expr::ExprRewriter; use crate::optimizer::plan_node::{BatchExchange, ToLocalBatch}; @@ -50,12 +49,7 @@ impl BatchSimpleAgg { self.logical.can_two_phase_agg() && self.two_phase_agg_enabled() } } - -impl fmt::Display for BatchSimpleAgg { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.logical.fmt_with_name(f, "BatchSimpleAgg") - } -} +impl_distill_by_unit!(BatchSimpleAgg, logical, "BatchSimpleAgg"); impl PlanTreeNodeUnary for BatchSimpleAgg { fn input(&self) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/batch_sort_agg.rs b/src/frontend/src/optimizer/plan_node/batch_sort_agg.rs index 31552750a904..c81bbc7bbde8 100644 --- a/src/frontend/src/optimizer/plan_node/batch_sort_agg.rs +++ b/src/frontend/src/optimizer/plan_node/batch_sort_agg.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use fixedbitset::FixedBitSet; use itertools::Itertools; use risingwave_common::error::Result; @@ -22,6 +20,7 @@ use risingwave_pb::batch_plan::SortAggNode; use risingwave_pb::expr::ExprNode; use super::generic::{self, GenericPlanRef, PlanAggCall}; +use super::utils::impl_distill_by_unit; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch}; use crate::expr::{Expr, ExprImpl, ExprRewriter, InputRef}; use crate::optimizer::plan_node::ToLocalBatch; @@ -78,12 +77,7 @@ impl BatchSortAgg { &self.logical.group_key } } - -impl fmt::Display for BatchSortAgg { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.logical.fmt_with_name(f, "BatchSortAgg") - } -} +impl_distill_by_unit!(BatchSortAgg, logical, "BatchSortAgg"); impl PlanTreeNodeUnary for BatchSortAgg { fn input(&self) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/batch_topn.rs b/src/frontend/src/optimizer/plan_node/batch_topn.rs index be5ac6559720..f6f61410889f 100644 --- a/src/frontend/src/optimizer/plan_node/batch_topn.rs +++ b/src/frontend/src/optimizer/plan_node/batch_topn.rs @@ -12,13 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::TopNNode; use super::generic::Limit; +use super::utils::impl_distill_by_unit; use super::{ generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, }; @@ -79,11 +78,7 @@ impl BatchTopN { } } -impl fmt::Display for BatchTopN { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.logical.fmt_with_name(f, "BatchTopN") - } -} +impl_distill_by_unit!(BatchTopN, logical, "BatchTopN"); impl PlanTreeNodeUnary for BatchTopN { fn input(&self) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/batch_update.rs b/src/frontend/src/optimizer/plan_node/batch_update.rs index 5927785144bd..15ab0ca59ba7 100644 --- a/src/frontend/src/optimizer/plan_node/batch_update.rs +++ b/src/frontend/src/optimizer/plan_node/batch_update.rs @@ -12,13 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use risingwave_common::catalog::Schema; use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::UpdateNode; +use super::utils::impl_distill_by_unit; use super::{ generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, }; @@ -41,11 +40,7 @@ impl BatchUpdate { } } -impl fmt::Display for BatchUpdate { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.logical.fmt_with_name(f, "BatchUpdate") - } -} +impl_distill_by_unit!(BatchUpdate, logical, "BatchUpdate"); impl PlanTreeNodeUnary for BatchUpdate { fn input(&self) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index f1f33367efa9..bdb587cc2292 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use fixedbitset::FixedBitSet; use itertools::Itertools; use risingwave_common::error::{ErrorCode, Result, TrackingIssue}; @@ -22,6 +20,7 @@ use risingwave_common::util::sort_util::ColumnOrder; use risingwave_expr::agg::AggKind; use super::generic::{self, Agg, GenericPlanRef, PlanAggCall, ProjectBuilder}; +use super::utils::impl_distill_by_unit; use super::{ BatchHashAgg, BatchSimpleAgg, ColPrunable, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamHashAgg, StreamProject, StreamSimpleAgg, @@ -862,11 +861,7 @@ impl PlanTreeNodeUnary for LogicalAgg { impl_plan_tree_node_for_unary! {LogicalAgg} -impl fmt::Display for LogicalAgg { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.core.fmt_with_name(f, "LogicalAgg") - } -} +impl_distill_by_unit!(LogicalAgg, core, "LogicalAgg"); impl ExprRewritable for LogicalAgg { fn has_rewritable_expr(&self) -> bool { diff --git a/src/frontend/src/optimizer/plan_node/logical_dedup.rs b/src/frontend/src/optimizer/plan_node/logical_dedup.rs index 28b20de15caa..6be7020aa57f 100644 --- a/src/frontend/src/optimizer/plan_node/logical_dedup.rs +++ b/src/frontend/src/optimizer/plan_node/logical_dedup.rs @@ -12,14 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use core::fmt; - use fixedbitset::FixedBitSet; use itertools::Itertools; use risingwave_common::error::Result; use risingwave_common::util::column_index_mapping::ColIndexMapping; use super::generic::Limit; +use super::utils::impl_distill_by_unit; use super::{ gen_filter_and_pushdown, generic, BatchGroupTopN, ColPrunable, ColumnPruningContext, ExprRewritable, LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, @@ -183,8 +182,4 @@ impl ColPrunable for LogicalDedup { } } -impl fmt::Display for LogicalDedup { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.core.fmt_with_name(f, "LogicalDedup") - } -} +impl_distill_by_unit!(LogicalDedup, core, "LogicalDedup"); diff --git a/src/frontend/src/optimizer/plan_node/logical_delete.rs b/src/frontend/src/optimizer/plan_node/logical_delete.rs index c1d4d6c7244a..e5b34c6cf4cd 100644 --- a/src/frontend/src/optimizer/plan_node/logical_delete.rs +++ b/src/frontend/src/optimizer/plan_node/logical_delete.rs @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{fmt, vec}; +use std::vec; use risingwave_common::catalog::{Field, Schema, TableVersionId}; use risingwave_common::error::Result; use risingwave_common::types::DataType; +use super::utils::impl_distill_by_unit; use super::{ gen_filter_and_pushdown, generic, BatchDelete, ColPrunable, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, @@ -79,12 +80,7 @@ impl PlanTreeNodeUnary for LogicalDelete { } impl_plan_tree_node_for_unary! { LogicalDelete } - -impl fmt::Display for LogicalDelete { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.core.fmt_with_name(f, "LogicalDelete") - } -} +impl_distill_by_unit!(LogicalDelete, core, "LogicalDelete"); impl ColPrunable for LogicalDelete { fn prune_col(&self, _required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/logical_except.rs b/src/frontend/src/optimizer/plan_node/logical_except.rs index 3b77908b4ea0..11ff8b0210ee 100644 --- a/src/frontend/src/optimizer/plan_node/logical_except.rs +++ b/src/frontend/src/optimizer/plan_node/logical_except.rs @@ -12,12 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use itertools::Itertools; use risingwave_common::catalog::Schema; use risingwave_common::error::Result; +use super::utils::impl_distill_by_unit; use super::{ColPrunable, ExprRewritable, PlanBase, PlanRef, PredicatePushdown, ToBatch, ToStream}; use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{ @@ -46,10 +45,6 @@ impl LogicalExcept { LogicalExcept::new(all, inputs).into() } - pub(super) fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result { - self.core.fmt_with_name(f, name) - } - pub fn all(&self) -> bool { self.core.all } @@ -65,11 +60,7 @@ impl PlanTreeNode for LogicalExcept { } } -impl fmt::Display for LogicalExcept { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.fmt_with_name(f, "LogicalExcept") - } -} +impl_distill_by_unit!(LogicalExcept, core, "LogicalExcept"); impl ColPrunable for LogicalExcept { fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/logical_expand.rs b/src/frontend/src/optimizer/plan_node/logical_expand.rs index b984bcd941f8..2cd662cf2da6 100644 --- a/src/frontend/src/optimizer/plan_node/logical_expand.rs +++ b/src/frontend/src/optimizer/plan_node/logical_expand.rs @@ -12,11 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use itertools::Itertools; use risingwave_common::error::Result; +use super::utils::impl_distill_by_unit; use super::{ gen_filter_and_pushdown, generic, BatchExpand, ColPrunable, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamExpand, ToBatch, ToStream, @@ -62,10 +61,6 @@ impl LogicalExpand { pub fn column_subsets(&self) -> &Vec> { &self.core.column_subsets } - - pub(super) fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result { - self.core.fmt_with_name(f, name) - } } impl PlanTreeNodeUnary for LogicalExpand { @@ -110,12 +105,7 @@ impl PlanTreeNodeUnary for LogicalExpand { } impl_plan_tree_node_for_unary! {LogicalExpand} - -impl fmt::Display for LogicalExpand { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.fmt_with_name(f, "LogicalExpand") - } -} +impl_distill_by_unit!(LogicalExpand, core, "LogicalExpand"); impl ColPrunable for LogicalExpand { fn prune_col(&self, _required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/logical_filter.rs b/src/frontend/src/optimizer/plan_node/logical_filter.rs index 843ece69388f..4ba172aea9b7 100644 --- a/src/frontend/src/optimizer/plan_node/logical_filter.rs +++ b/src/frontend/src/optimizer/plan_node/logical_filter.rs @@ -12,14 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use fixedbitset::FixedBitSet; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::error::Result; use risingwave_common::types::DataType; +use super::utils::impl_distill_by_unit; use super::{ generic, ColPrunable, ExprRewritable, LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, @@ -117,12 +116,7 @@ impl PlanTreeNodeUnary for LogicalFilter { } impl_plan_tree_node_for_unary! {LogicalFilter} - -impl fmt::Display for LogicalFilter { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.core.fmt_with_name(f, "LogicalFilter") - } -} +impl_distill_by_unit!(LogicalFilter, core, "LogicalFilter"); impl ColPrunable for LogicalFilter { fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs index 0bd9d8d8c9f2..530b4a198875 100644 --- a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs @@ -12,14 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use fixedbitset::FixedBitSet; use itertools::Itertools; use risingwave_common::error::Result; use risingwave_common::types::Interval; use super::generic::GenericPlanNode; +use super::utils::impl_distill_by_unit; use super::{ gen_filter_and_pushdown, generic, BatchHopWindow, ColPrunable, ExprRewritable, LogicalFilter, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamHopWindow, ToBatch, ToStream, @@ -217,12 +216,7 @@ impl PlanTreeNodeUnary for LogicalHopWindow { } impl_plan_tree_node_for_unary! {LogicalHopWindow} - -impl fmt::Display for LogicalHopWindow { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.core.fmt_with_name(f, "LogicalHopWindow") - } -} +impl_distill_by_unit!(LogicalHopWindow, core, "LogicalHopWindow"); impl ColPrunable for LogicalHopWindow { fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/logical_intersect.rs b/src/frontend/src/optimizer/plan_node/logical_intersect.rs index 909ea90dcffb..10013e2e5794 100644 --- a/src/frontend/src/optimizer/plan_node/logical_intersect.rs +++ b/src/frontend/src/optimizer/plan_node/logical_intersect.rs @@ -18,6 +18,7 @@ use itertools::Itertools; use risingwave_common::catalog::Schema; use risingwave_common::error::Result; +use super::utils::impl_distill_by_unit; use super::{ColPrunable, ExprRewritable, PlanBase, PlanRef, PredicatePushdown, ToBatch, ToStream}; use crate::optimizer::plan_node::{ generic, ColumnPruningContext, PlanTreeNode, PredicatePushdownContext, RewriteStreamContext, @@ -45,10 +46,6 @@ impl LogicalIntersect { LogicalIntersect::new(all, inputs).into() } - pub(super) fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result { - self.core.fmt_with_name(f, name) - } - pub fn fmt_fields_with_builder(&self, builder: &mut fmt::DebugStruct<'_, '_>) { self.core.fmt_fields_with_builder(builder) } @@ -68,11 +65,7 @@ impl PlanTreeNode for LogicalIntersect { } } -impl fmt::Display for LogicalIntersect { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.fmt_with_name(f, "LogicalIntersect") - } -} +impl_distill_by_unit!(LogicalIntersect, core, "LogicalIntersect"); impl ColPrunable for LogicalIntersect { fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/logical_over_window.rs b/src/frontend/src/optimizer/plan_node/logical_over_window.rs index 9a6e5093ce72..d24cf86541bf 100644 --- a/src/frontend/src/optimizer/plan_node/logical_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_over_window.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use fixedbitset::FixedBitSet; use itertools::Itertools; use risingwave_common::error::{ErrorCode, Result}; @@ -23,6 +21,7 @@ use risingwave_expr::agg::AggKind; use risingwave_expr::function::window::{Frame, FrameBound, WindowFuncKind}; use super::generic::{GenericPlanRef, OverWindow, PlanWindowFunction, ProjectBuilder}; +use super::utils::impl_distill_by_unit; use super::{ gen_filter_and_pushdown, ColPrunable, ExprRewritable, LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamEowcOverWindow, StreamSort, ToBatch, ToStream, @@ -558,12 +557,7 @@ impl PlanTreeNodeUnary for LogicalOverWindow { } impl_plan_tree_node_for_unary! { LogicalOverWindow } - -impl fmt::Display for LogicalOverWindow { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.core.fmt_with_name(f, "LogicalOverWindow") - } -} +impl_distill_by_unit!(LogicalOverWindow, core, "LogicalOverWindow"); impl ColPrunable for LogicalOverWindow { fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/logical_project.rs b/src/frontend/src/optimizer/plan_node/logical_project.rs index 51147dd0bb6b..b8a3ef8a2e32 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project.rs @@ -16,8 +16,10 @@ use std::fmt; use fixedbitset::FixedBitSet; use itertools::Itertools; +use pretty_xmlish::Pretty; use risingwave_common::error::Result; +use super::utils::Distill; use super::{ gen_filter_and_pushdown, generic, BatchProject, ColPrunable, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamProject, ToBatch, ToStream, @@ -84,10 +86,6 @@ impl LogicalProject { &self.core.exprs } - pub(super) fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result { - self.core.fmt_with_name(f, name, self.base.schema()) - } - pub fn is_identity(&self) -> bool { self.core.is_identity() } @@ -136,7 +134,16 @@ impl_plan_tree_node_for_unary! {LogicalProject} impl fmt::Display for LogicalProject { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.fmt_with_name(f, "LogicalProject") + self.core + .fmt_with_name(f, "LogicalProject", self.base.schema()) + } +} +impl Distill for LogicalProject { + fn distill<'a>(&self) -> Pretty<'a> { + Pretty::childless_record( + "LogicalProject", + self.core.fields_pretty(self.base.schema()), + ) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_topn.rs b/src/frontend/src/optimizer/plan_node/logical_topn.rs index e4b27fd397bd..f09e405b43ca 100644 --- a/src/frontend/src/optimizer/plan_node/logical_topn.rs +++ b/src/frontend/src/optimizer/plan_node/logical_topn.rs @@ -12,14 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use fixedbitset::FixedBitSet; use itertools::Itertools; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::util::sort_util::ColumnOrder; use super::generic::Limit; +use super::utils::impl_distill_by_unit; use super::{ gen_filter_and_pushdown, generic, BatchGroupTopN, ColPrunable, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamGroupTopN, StreamProject, ToBatch, @@ -209,11 +208,7 @@ impl PlanTreeNodeUnary for LogicalTopN { } } impl_plan_tree_node_for_unary! {LogicalTopN} -impl fmt::Display for LogicalTopN { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.core.fmt_with_name(f, "LogicalTopN") - } -} +impl_distill_by_unit!(LogicalTopN, core, "LogicalTopN"); impl ColPrunable for LogicalTopN { fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/logical_update.rs b/src/frontend/src/optimizer/plan_node/logical_update.rs index 556874f2d962..abe3e6d64849 100644 --- a/src/frontend/src/optimizer/plan_node/logical_update.rs +++ b/src/frontend/src/optimizer/plan_node/logical_update.rs @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{fmt, vec}; +use std::vec; use risingwave_common::catalog::{Field, Schema, TableVersionId}; use risingwave_common::error::Result; use risingwave_common::types::DataType; +use super::utils::impl_distill_by_unit; use super::{ gen_filter_and_pushdown, generic, BatchUpdate, ColPrunable, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, @@ -86,12 +87,7 @@ impl PlanTreeNodeUnary for LogicalUpdate { } impl_plan_tree_node_for_unary! { LogicalUpdate } - -impl fmt::Display for LogicalUpdate { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.core.fmt_with_name(f, "LogicalUpdate") - } -} +impl_distill_by_unit!(LogicalUpdate, core, "LogicalUpdate"); impl ExprRewritable for LogicalUpdate { fn has_rewritable_expr(&self) -> bool { diff --git a/src/frontend/src/optimizer/plan_node/stream_dedup.rs b/src/frontend/src/optimizer/plan_node/stream_dedup.rs index d9abd7b9af87..44acf722eae6 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dedup.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dedup.rs @@ -12,15 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use itertools::Itertools; use risingwave_common::util::sort_util::OrderType; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::DedupNode; use super::generic::{self, GenericPlanNode, GenericPlanRef}; -use super::utils::TableCatalogBuilder; +use super::utils::{impl_distill_by_unit, TableCatalogBuilder}; use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::optimizer::plan_node::PlanRef; @@ -71,12 +69,8 @@ impl StreamDedup { } } -impl fmt::Display for StreamDedup { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - assert!(self.base.append_only()); - self.logical.fmt_with_name(f, "StreamAppendOnlyDedup") - } -} +// assert!(self.base.append_only()); +impl_distill_by_unit!(StreamDedup, logical, "StreamAppendOnlyDedup"); impl PlanTreeNodeUnary for StreamDedup { fn input(&self) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs index 13f59b02504a..dea3d0eb4988 100644 --- a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs @@ -13,14 +13,13 @@ // limitations under the License. use std::collections::HashSet; -use std::fmt; use fixedbitset::FixedBitSet; use risingwave_common::util::sort_util::OrderType; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use super::generic::{self, PlanWindowFunction}; -use super::utils::TableCatalogBuilder; +use super::utils::{impl_distill_by_unit, TableCatalogBuilder}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -114,11 +113,7 @@ impl StreamEowcOverWindow { } } -impl fmt::Display for StreamEowcOverWindow { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.logical.fmt_with_name(f, "StreamEowcOverWindow") - } -} +impl_distill_by_unit!(StreamEowcOverWindow, logical, "StreamEowcOverWindow"); impl PlanTreeNodeUnary for StreamEowcOverWindow { fn input(&self) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/stream_expand.rs b/src/frontend/src/optimizer/plan_node/stream_expand.rs index dbcec9fa9349..88b5ae12df4d 100644 --- a/src/frontend/src/optimizer/plan_node/stream_expand.rs +++ b/src/frontend/src/optimizer/plan_node/stream_expand.rs @@ -12,14 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use fixedbitset::FixedBitSet; use risingwave_pb::stream_plan::expand_node::Subset; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::ExpandNode; use super::stream::StreamPlanRef; +use super::utils::impl_distill_by_unit; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::property::Distribution; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -66,11 +65,7 @@ impl StreamExpand { } } -impl fmt::Display for StreamExpand { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.logical.fmt_with_name(f, "StreamExpand") - } -} +impl_distill_by_unit!(StreamExpand, logical, "StreamExpand"); impl PlanTreeNodeUnary for StreamExpand { fn input(&self) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/stream_filter.rs b/src/frontend/src/optimizer/plan_node/stream_filter.rs index 5581ebfe8d27..760ac76d1e87 100644 --- a/src/frontend/src/optimizer/plan_node/stream_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_filter.rs @@ -12,12 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::FilterNode; use super::stream::StreamPlanRef; +use super::utils::impl_distill_by_unit; use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::{Expr, ExprImpl, ExprRewriter}; use crate::optimizer::plan_node::PlanBase; @@ -51,11 +50,7 @@ impl StreamFilter { } } -impl fmt::Display for StreamFilter { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.logical.fmt_with_name(f, "StreamFilter") - } -} +impl_distill_by_unit!(StreamFilter, logical, "StreamFilter"); impl PlanTreeNodeUnary for StreamFilter { fn input(&self) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/stream_project_set.rs b/src/frontend/src/optimizer/plan_node/stream_project_set.rs index 0e5c19937e08..566bbde6675e 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project_set.rs @@ -12,14 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use fixedbitset::FixedBitSet; use itertools::Itertools; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::ProjectSetNode; use super::stream::StreamPlanRef; +use super::utils::impl_distill_by_unit; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::{try_derive_watermark, ExprRewriter}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -60,12 +59,7 @@ impl StreamProjectSet { StreamProjectSet { base, logical } } } - -impl fmt::Display for StreamProjectSet { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.logical.fmt_with_name(f, "StreamProjectSet") - } -} +impl_distill_by_unit!(StreamProjectSet, logical, "StreamProjectSet"); impl PlanTreeNodeUnary for StreamProjectSet { fn input(&self) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs index 28a251f4949e..546687130632 100644 --- a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs @@ -12,13 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; - use fixedbitset::FixedBitSet; use itertools::Itertools; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use super::generic::{self, PlanAggCall}; +use super::utils::impl_distill_by_unit; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::ExprRewriter; use crate::optimizer::plan_node::stream::StreamPlanRef; @@ -65,12 +64,11 @@ impl StreamStatelessSimpleAgg { &self.logical.agg_calls } } - -impl fmt::Display for StreamStatelessSimpleAgg { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.logical.fmt_with_name(f, "StreamStatelessSimpleAgg") - } -} +impl_distill_by_unit!( + StreamStatelessSimpleAgg, + logical, + "StreamStatelessSimpleAgg" +); impl PlanTreeNodeUnary for StreamStatelessSimpleAgg { fn input(&self) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index fd827a86862e..04e869dc8b83 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -17,6 +17,7 @@ use std::{fmt, vec}; use fixedbitset::FixedBitSet; use itertools::Itertools; +use pretty_xmlish::Pretty; use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema}; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; @@ -159,6 +160,31 @@ impl TableCatalogBuilder { } } +/// See also [`super::generic::DistillUnit`]. +pub trait Distill { + fn distill<'a>(&self) -> Pretty<'a>; +} + +macro_rules! impl_distill_by_unit { + ($ty:ty, $core:ident, $name:expr) => { + use pretty_xmlish::Pretty; + use $crate::optimizer::plan_node::generic::DistillUnit; + use $crate::optimizer::plan_node::utils::Distill; + impl Distill for $ty { + fn distill<'a>(&self) -> Pretty<'a> { + self.$core.distill_with_name($name) + } + } + + impl std::fmt::Display for $ty { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.$core.fmt_with_name(f, $name) + } + } + }; +} +pub(crate) use impl_distill_by_unit; + #[derive(Clone, Copy)] pub struct IndicesDisplay<'a> { pub indices: &'a [usize],