diff --git a/src/common/src/catalog/schema.rs b/src/common/src/catalog/schema.rs index 14a4d9d874de..e942bfcf4d47 100644 --- a/src/common/src/catalog/schema.rs +++ b/src/common/src/catalog/schema.rs @@ -140,6 +140,10 @@ impl Schema { self.fields().iter().map(|f| f.name.clone()).collect() } + pub fn names_str(&self) -> Vec<&str> { + self.fields().iter().map(|f| f.name.as_str()).collect() + } + pub fn data_types(&self) -> Vec { self.fields .iter() diff --git a/src/frontend/src/optimizer/plan_node/batch_source.rs b/src/frontend/src/optimizer/plan_node/batch_source.rs index bca07506c2e8..c6f34d5fd5fb 100644 --- a/src/frontend/src/optimizer/plan_node/batch_source.rs +++ b/src/frontend/src/optimizer/plan_node/batch_source.rs @@ -20,7 +20,7 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::SourceNode; -use super::utils::Distill; +use super::utils::{column_names_pretty, Distill}; use super::{ generic, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch, ToLocalBatch, }; @@ -47,11 +47,7 @@ impl BatchSource { } pub fn column_names(&self) -> Vec<&str> { - self.schema() - .fields() - .iter() - .map(|f| f.name.as_str()) - .collect() + self.schema().names_str() } pub fn source_catalog(&self) -> Option> { @@ -87,17 +83,10 @@ impl fmt::Display for BatchSource { impl Distill for BatchSource { fn distill<'a>(&self) -> Pretty<'a> { - let columns = self - .column_names() - .into_iter() - .map(|s| Pretty::from(s.to_owned())) - .collect(); + let src = Pretty::from(self.source_catalog().unwrap().name.clone()); let fields = vec![ - ( - "source", - Pretty::from(self.source_catalog().unwrap().name.clone()), - ), - ("columns", Pretty::Array(columns)), + ("source", src), + ("columns", column_names_pretty(self.schema())), ("filter", Pretty::debug(&self.kafka_timestamp_range_value())), ]; Pretty::childless_record("BatchSource", fields) diff --git a/src/frontend/src/optimizer/plan_node/logical_now.rs b/src/frontend/src/optimizer/plan_node/logical_now.rs index 983788ca0544..3ddfbd4ca712 100644 --- a/src/frontend/src/optimizer/plan_node/logical_now.rs +++ b/src/frontend/src/optimizer/plan_node/logical_now.rs @@ -26,6 +26,7 @@ use super::{ ColPrunable, ColumnPruningContext, ExprRewritable, LogicalFilter, PlanBase, PlanRef, PredicatePushdown, RewriteStreamContext, StreamNow, ToBatch, ToStream, ToStreamContext, }; +use crate::optimizer::plan_node::utils::column_names_pretty; use crate::optimizer::property::FunctionalDependencySet; use crate::utils::ColIndexMapping; use crate::OptimizerContextRef; @@ -51,11 +52,7 @@ impl LogicalNow { impl Distill for LogicalNow { fn distill<'a>(&self) -> Pretty<'a> { let vec = if self.base.ctx.is_explain_verbose() { - let disp = Pretty::debug(&IndicesDisplay { - indices: &(0..self.schema().fields.len()).collect_vec(), - input_schema: self.schema(), - }); - vec![("output", disp)] + vec![("output", column_names_pretty(self.schema()))] } else { vec![] }; diff --git a/src/frontend/src/optimizer/plan_node/logical_share.rs b/src/frontend/src/optimizer/plan_node/logical_share.rs index ac771e704fa5..44c15bc37c68 100644 --- a/src/frontend/src/optimizer/plan_node/logical_share.rs +++ b/src/frontend/src/optimizer/plan_node/logical_share.rs @@ -15,9 +15,11 @@ use std::cell::RefCell; use std::fmt; +use pretty_xmlish::Pretty; use risingwave_common::error::ErrorCode::NotImplemented; use risingwave_common::error::Result; +use super::utils::Distill; use super::{ generic, ColPrunable, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, @@ -74,6 +76,10 @@ impl LogicalShare { ) -> fmt::Result { write!(f, "{} {{ id = {} }}", name, &base.id.0) } + + pub(super) fn pretty_fields<'a>(base: &PlanBase, name: &'a str) -> Pretty<'a> { + Pretty::childless_record(name, vec![("id", Pretty::debug(&base.id.0))]) + } } impl PlanTreeNodeUnary for LogicalShare { @@ -108,6 +114,11 @@ impl fmt::Display for LogicalShare { Self::fmt_with_name(&self.base, f, "LogicalShare") } } +impl Distill for LogicalShare { + fn distill<'a>(&self) -> Pretty<'a> { + Self::pretty_fields(&self.base, "LogicalShare") + } +} impl ColPrunable for LogicalShare { fn prune_col(&self, _required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 3ad1530464ee..94d0feb2e036 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -19,6 +19,7 @@ use std::ops::Bound::{Excluded, Included, Unbounded}; use std::rc::Rc; use itertools::Itertools; +use pretty_xmlish::Pretty; use risingwave_common::catalog::{ColumnCatalog, Schema}; use risingwave_common::error::Result; use risingwave_connector::source::DataType; @@ -26,6 +27,7 @@ use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::GeneratedColumnDesc; use super::stream_watermark_filter::StreamWatermarkFilter; +use super::utils::Distill; use super::{ generic, BatchProject, BatchSource, ColPrunable, ExprRewritable, LogicalFilter, LogicalProject, PlanBase, PlanRef, PredicatePushdown, StreamProject, StreamRowIdGen, StreamSource, ToBatch, @@ -34,6 +36,7 @@ use super::{ use crate::catalog::source_catalog::SourceCatalog; use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType, InputRef}; use crate::optimizer::optimizer_context::OptimizerContextRef; +use crate::optimizer::plan_node::utils::column_names_pretty; use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; @@ -170,14 +173,6 @@ impl LogicalSource { }) } - pub(super) fn column_names(&self) -> Vec { - self.schema() - .fields() - .iter() - .map(|f| f.name.clone()) - .collect() - } - pub fn source_catalog(&self) -> Option> { self.core.catalog.clone() } @@ -245,7 +240,7 @@ impl fmt::Display for LogicalSource { f, "LogicalSource {{ source: {}, columns: [{}], time_range: [{:?}] }}", catalog.name, - self.column_names().join(", "), + self.schema().names_str().join(", "), self.core.kafka_timestamp_range, ) } else { @@ -253,6 +248,22 @@ impl fmt::Display for LogicalSource { } } } +impl Distill for LogicalSource { + fn distill<'a>(&self) -> Pretty<'a> { + let fields = if let Some(catalog) = self.source_catalog() { + let src = Pretty::from(catalog.name.clone()); + let time = Pretty::debug(&self.core.kafka_timestamp_range); + vec![ + ("source", src), + ("columns", column_names_pretty(self.schema())), + ("time_range", time), + ] + } else { + vec![] + }; + Pretty::childless_record("LogicalSource", fields) + } +} impl ColPrunable for LogicalSource { fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs index cf5ed46b9d5f..42ebee7c3458 100644 --- a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs @@ -15,13 +15,14 @@ use std::fmt; use std::ops::BitAnd; +use pretty_xmlish::Pretty; use risingwave_common::catalog::ColumnDesc; use risingwave_pb::plan_common::JoinType; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ArrangementInfo, DeltaIndexJoinNode}; use super::generic::{self}; -use super::utils::formatter_debug_plan_node; +use super::utils::{formatter_debug_plan_node, Distill}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamNode}; use crate::expr::{Expr, ExprRewriter}; use crate::optimizer::plan_node::stream::StreamPlanRef; @@ -114,6 +115,30 @@ impl fmt::Display for StreamDeltaJoin { builder.finish() } } +impl Distill for StreamDeltaJoin { + fn distill<'a>(&self) -> Pretty<'a> { + let verbose = self.base.ctx.is_explain_verbose(); + let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 }); + vec.push(("type", Pretty::debug(&self.logical.join_type))); + + let concat_schema = self.logical.concat_schema(); + vec.push(( + "predicate", + Pretty::debug(&EqJoinPredicateDisplay { + eq_join_predicate: self.eq_join_predicate(), + input_schema: &concat_schema, + }), + )); + + if verbose { + let data = IndicesDisplay::from_join(&self.logical, &concat_schema) + .map_or_else(|| Pretty::from("all"), |id| Pretty::display(&id)); + vec.push(("output", data)); + } + + Pretty::childless_record("StreamDeltaJoin", vec) + } +} impl PlanTreeNodeBinary for StreamDeltaJoin { fn left(&self) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/stream_dml.rs b/src/frontend/src/optimizer/plan_node/stream_dml.rs index 2416b5bb0b8c..319c73319189 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dml.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dml.rs @@ -15,9 +15,11 @@ use std::fmt; use fixedbitset::FixedBitSet; +use pretty_xmlish::Pretty; use risingwave_common::catalog::{ColumnDesc, INITIAL_TABLE_VERSION_ID}; use risingwave_pb::stream_plan::stream_node::PbNodeBody; +use super::utils::Distill; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -48,10 +50,10 @@ impl StreamDml { } } - fn column_names(&self) -> Vec { + fn column_names(&self) -> Vec<&str> { self.column_descs .iter() - .map(|column_desc| column_desc.name.clone()) + .map(|column_desc| column_desc.name.as_str()) .collect() } } @@ -65,6 +67,17 @@ impl fmt::Display for StreamDml { ) } } +impl Distill for StreamDml { + fn distill<'a>(&self) -> Pretty<'a> { + let col = self + .column_names() + .iter() + .map(|n| Pretty::from(n.to_string())) + .collect(); + let col = Pretty::Array(col); + Pretty::childless_record("StreamDml", vec![("columns", col)]) + } +} impl PlanTreeNodeUnary for StreamDml { fn input(&self) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/stream_share.rs b/src/frontend/src/optimizer/plan_node/stream_share.rs index 51d772399eb6..48d0227fef0c 100644 --- a/src/frontend/src/optimizer/plan_node/stream_share.rs +++ b/src/frontend/src/optimizer/plan_node/stream_share.rs @@ -14,9 +14,11 @@ use std::fmt; +use pretty_xmlish::Pretty; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::PbStreamNode; +use super::utils::Distill; use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamExchange, StreamNode}; use crate::optimizer::plan_node::{LogicalShare, PlanBase, PlanTreeNode}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -49,6 +51,11 @@ impl fmt::Display for StreamShare { LogicalShare::fmt_with_name(&self.base, f, "StreamShare") } } +impl Distill for StreamShare { + fn distill<'a>(&self) -> Pretty<'a> { + LogicalShare::pretty_fields(&self.base, "StreamShare") + } +} impl PlanTreeNodeUnary for StreamShare { fn input(&self) -> PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/stream_source.rs b/src/frontend/src/optimizer/plan_node/stream_source.rs index 64fb027f27dd..a1cbf862311c 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source.rs @@ -17,12 +17,14 @@ use std::rc::Rc; use fixedbitset::FixedBitSet; use itertools::Itertools; +use pretty_xmlish::Pretty; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::{PbStreamSource, SourceNode}; -use super::utils::formatter_debug_plan_node; +use super::utils::{formatter_debug_plan_node, Distill}; use super::{generic, ExprRewritable, PlanBase, StreamNode}; use crate::catalog::source_catalog::SourceCatalog; +use crate::optimizer::plan_node::utils::column_names_pretty; use crate::optimizer::property::Distribution; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -56,14 +58,6 @@ impl StreamSource { pub fn source_catalog(&self) -> Option> { self.logical.catalog.clone() } - - pub fn column_names(&self) -> Vec { - self.schema() - .fields() - .iter() - .map(|f| f.name.clone()) - .collect() - } } impl_plan_tree_node_for_leaf! { StreamSource } @@ -74,11 +68,25 @@ impl fmt::Display for StreamSource { if let Some(catalog) = self.source_catalog() { builder .field("source", &catalog.name) - .field("columns", &self.column_names()); + .field("columns", &self.schema().names_str()); } builder.finish() } } +impl Distill for StreamSource { + fn distill<'a>(&self) -> Pretty<'a> { + let fields = if let Some(catalog) = self.source_catalog() { + let src = Pretty::from(catalog.name.clone()); + vec![ + ("source", src), + ("columns", column_names_pretty(self.schema())), + ] + } else { + vec![] + }; + Pretty::childless_record("StreamSource", fields) + } +} impl StreamNode for StreamSource { fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody { diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 79a67f1a7a59..2f15e27b6042 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -185,6 +185,14 @@ macro_rules! impl_distill_by_unit { } pub(crate) use impl_distill_by_unit; +pub fn column_names_pretty<'a>(schema: &Schema) -> Pretty<'a> { + let columns = (schema.fields.iter()) + .map(|f| f.name.clone()) + .map(Pretty::from) + .collect(); + Pretty::Array(columns) +} + #[derive(Clone, Copy)] pub struct IndicesDisplay<'a> { pub indices: &'a [usize],