diff --git a/src/frontend/planner_test/tests/testdata/sysinfo_funcs.yaml b/src/frontend/planner_test/tests/testdata/sysinfo_funcs.yaml index 3d9034c7ddfc..c40ca8c25279 100644 --- a/src/frontend/planner_test/tests/testdata/sysinfo_funcs.yaml +++ b/src/frontend/planner_test/tests/testdata/sysinfo_funcs.yaml @@ -36,6 +36,6 @@ No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml - sql: | select current_timestamp; - batch_plan: |- + batch_plan: | BatchProject { exprs: [Now] } └─BatchValues { rows: [[]] } diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index f53161351b2c..c511b80f358f 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -453,11 +453,12 @@ impl PlanRoot { // Convert to physical plan node plan = plan.to_batch_with_order_required(&self.required_order)?; - // SessionTimezone substitution + // TODO: SessionTimezone substitution // Const eval of exprs at the last minute // plan = const_eval_exprs(plan)?; - // if explain_trace { + // let ctx = plan.ctx(); + // if ctx.is_explain_trace() { // ctx.trace("Const eval exprs:"); // ctx.trace(plan.explain_to_string().unwrap()); // } @@ -614,6 +615,14 @@ impl PlanRoot { ); } + // Const eval of exprs at the last minute + // plan = const_eval_exprs(plan)?; + + // if ctx.is_explain_trace() { + // ctx.trace("Const eval exprs:"); + // ctx.trace(plan.explain_to_string().unwrap()); + // } + #[cfg(debug_assertions)] InputRefValidator.validate(plan.clone()); diff --git a/src/frontend/src/optimizer/plan_node/expr_rewritable.rs b/src/frontend/src/optimizer/plan_node/expr_rewritable.rs index baeb4caa1d61..2957ac89fa1a 100644 --- a/src/frontend/src/optimizer/plan_node/expr_rewritable.rs +++ b/src/frontend/src/optimizer/plan_node/expr_rewritable.rs @@ -14,11 +14,8 @@ use std::ops::Deref; -use paste::paste; - use super::*; use crate::expr::ExprRewriter; -use crate::for_stream_plan_nodes; /// Rewrites expressions in a `PlanRef`. Due to `Share` operator, /// the `ExprRewriter` needs to be idempotent i.e. applying it more than once @@ -48,12 +45,3 @@ impl ExprRewritable for PlanRef { } } } - -macro_rules! ban_expr_rewritable { - ($( { $convention:ident, $name:ident }),*) => { - paste!{ - $(impl ExprRewritable for [<$convention $name>] {} )* - } - } -} -for_stream_plan_nodes! {ban_expr_rewritable} diff --git a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs index 7eff29860ced..272e6115ddfa 100644 --- a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs @@ -21,8 +21,8 @@ use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ArrangementInfo, DeltaIndexJoinNode}; use super::generic::GenericPlanRef; -use super::{LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamNode}; -use crate::expr::Expr; +use super::{ExprRewritable, LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamNode}; +use crate::expr::{Expr, ExprRewriter}; use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::optimizer::plan_node::utils::IndicesDisplay; use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay}; @@ -231,3 +231,21 @@ impl StreamNode for StreamDeltaJoin { }) } } + +impl ExprRewritable for StreamDeltaJoin { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical + .rewrite_exprs(r) + .as_logical_join() + .unwrap() + .clone(), + self.eq_join_predicate.rewrite_exprs(r), + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/stream_dml.rs b/src/frontend/src/optimizer/plan_node/stream_dml.rs index 2efe3d998166..a96124eb9b17 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dml.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dml.rs @@ -18,7 +18,7 @@ use fixedbitset::FixedBitSet; use risingwave_common::catalog::ColumnDesc; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; -use super::{PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; +use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::stream_fragmenter::BuildFragmentGraphState; #[derive(Clone, Debug)] @@ -88,3 +88,5 @@ impl StreamNode for StreamDml { }) } } + +impl ExprRewritable for StreamDml {} diff --git a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs index 5e6f20de78a0..186e699087b7 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs @@ -21,8 +21,8 @@ pub use risingwave_pb::expr::expr_node::Type as ExprType; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::DynamicFilterNode; -use super::generic; use super::utils::IndicesDisplay; +use super::{generic, ExprRewritable}; use crate::expr::Expr; use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{PlanBase, PlanTreeNodeBinary, StreamNode}; @@ -150,3 +150,5 @@ impl StreamNode for StreamDynamicFilter { }) } } + +impl ExprRewritable for StreamDynamicFilter {} diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs index 825a77fa3915..e8f5d36cf53e 100644 --- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -17,7 +17,7 @@ use std::fmt; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{DispatchStrategy, DispatcherType, ExchangeNode}; -use super::{PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; +use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::property::{Distribution, DistributionDisplay}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -91,3 +91,5 @@ impl StreamNode for StreamExchange { }) } } + +impl ExprRewritable for StreamExchange {} diff --git a/src/frontend/src/optimizer/plan_node/stream_expand.rs b/src/frontend/src/optimizer/plan_node/stream_expand.rs index 5ff700297171..c4193ea15b62 100644 --- a/src/frontend/src/optimizer/plan_node/stream_expand.rs +++ b/src/frontend/src/optimizer/plan_node/stream_expand.rs @@ -19,7 +19,7 @@ use risingwave_pb::stream_plan::expand_node::Subset; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use risingwave_pb::stream_plan::ExpandNode; -use super::{LogicalExpand, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; +use super::{ExprRewritable, LogicalExpand, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::property::Distribution; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -101,3 +101,5 @@ fn subset_to_protobuf(subset: &[usize]) -> Subset { let column_indices = subset.iter().map(|key| *key as u32).collect(); Subset { column_indices } } + +impl ExprRewritable for StreamExpand {} diff --git a/src/frontend/src/optimizer/plan_node/stream_filter.rs b/src/frontend/src/optimizer/plan_node/stream_filter.rs index 7bc9f3f7a56b..6599c6934457 100644 --- a/src/frontend/src/optimizer/plan_node/stream_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_filter.rs @@ -18,8 +18,8 @@ use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use risingwave_pb::stream_plan::FilterNode; use super::generic::GenericPlanRef; -use super::{LogicalFilter, PlanRef, PlanTreeNodeUnary, StreamNode}; -use crate::expr::{Expr, ExprImpl}; +use super::{ExprRewritable, LogicalFilter, PlanRef, PlanTreeNodeUnary, StreamNode}; +use crate::expr::{Expr, ExprImpl, ExprRewriter}; use crate::optimizer::plan_node::PlanBase; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::utils::Condition; @@ -85,3 +85,20 @@ impl StreamNode for StreamFilter { }) } } + +impl ExprRewritable for StreamFilter { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical + .rewrite_exprs(r) + .as_logical_filter() + .unwrap() + .clone(), + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/stream_global_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_global_simple_agg.rs index dbabbf23a800..0928a9da4f7a 100644 --- a/src/frontend/src/optimizer/plan_node/stream_global_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_global_simple_agg.rs @@ -18,7 +18,8 @@ use fixedbitset::FixedBitSet; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use super::generic::PlanAggCall; -use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; +use super::{ExprRewritable, LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; +use crate::expr::ExprRewriter; use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::property::Distribution; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -119,3 +120,20 @@ impl StreamNode for StreamGlobalSimpleAgg { }) } } + +impl ExprRewritable for StreamGlobalSimpleAgg { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical + .rewrite_exprs(r) + .as_logical_agg() + .unwrap() + .clone(), + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs index 56cee4e84d14..b37c9c3a3787 100644 --- a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs @@ -17,7 +17,7 @@ use std::fmt; use fixedbitset::FixedBitSet; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; -use super::{LogicalTopN, PlanBase, PlanTreeNodeUnary, StreamNode}; +use super::{ExprRewritable, LogicalTopN, PlanBase, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::property::{Order, OrderDisplay}; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::PlanRef; @@ -151,3 +151,5 @@ impl PlanTreeNodeUnary for StreamGroupTopN { Self::new(self.logical.clone_with_input(input), self.vnode_col_idx) } } + +impl ExprRewritable for StreamGroupTopN {} diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs index 57e59c965199..60cb7ac010f9 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs @@ -18,7 +18,8 @@ use fixedbitset::FixedBitSet; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use super::generic::PlanAggCall; -use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; +use super::{ExprRewritable, LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; +use crate::expr::ExprRewriter; use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::property::Distribution; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -128,3 +129,21 @@ impl StreamNode for StreamHashAgg { }) } } + +impl ExprRewritable for StreamHashAgg { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical + .rewrite_exprs(r) + .as_logical_agg() + .unwrap() + .clone(), + self.vnode_col_idx, + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs index 166283092d08..3c6e9b72d80d 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs @@ -21,8 +21,10 @@ use risingwave_pb::plan_common::JoinType; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::HashJoinNode; -use super::{LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamDeltaJoin, StreamNode}; -use crate::expr::Expr; +use super::{ + ExprRewritable, LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamDeltaJoin, StreamNode, +}; +use crate::expr::{Expr, ExprRewriter}; use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::utils::IndicesDisplay; use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay}; @@ -283,3 +285,21 @@ impl StreamNode for StreamHashJoin { }) } } + +impl ExprRewritable for StreamHashJoin { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical + .rewrite_exprs(r) + .as_logical_join() + .unwrap() + .clone(), + self.eq_join_predicate.rewrite_exprs(r), + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs index 72dbbfbbfdd4..afe318cb66a7 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs @@ -17,7 +17,7 @@ use std::fmt; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use risingwave_pb::stream_plan::HopWindowNode; -use super::{LogicalHopWindow, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; +use super::{ExprRewritable, LogicalHopWindow, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::stream_fragmenter::BuildFragmentGraphState; /// [`StreamHopWindow`] represents a hop window table function. @@ -91,3 +91,5 @@ impl StreamNode for StreamHopWindow { }) } } + +impl ExprRewritable for StreamHopWindow {} diff --git a/src/frontend/src/optimizer/plan_node/stream_index_scan.rs b/src/frontend/src/optimizer/plan_node/stream_index_scan.rs index 3a613a95f137..62486104469f 100644 --- a/src/frontend/src/optimizer/plan_node/stream_index_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_index_scan.rs @@ -19,8 +19,9 @@ use itertools::Itertools; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use risingwave_pb::stream_plan::{ChainType, StreamNode as ProstStreamPlan}; -use super::{LogicalScan, PlanBase, PlanNodeId, StreamNode}; +use super::{ExprRewritable, LogicalScan, PlanBase, PlanNodeId, PlanRef, StreamNode}; use crate::catalog::ColumnId; +use crate::expr::ExprRewriter; use crate::optimizer::plan_node::utils::IndicesDisplay; use crate::optimizer::property::{Distribution, DistributionDisplay}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -206,3 +207,21 @@ impl StreamIndexScan { } } } + +impl ExprRewritable for StreamIndexScan { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical + .rewrite_exprs(r) + .as_logical_scan() + .unwrap() + .clone(), + self.chain_type, + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/stream_local_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_local_simple_agg.rs index 6f5884e0117b..2c945a92782a 100644 --- a/src/frontend/src/optimizer/plan_node/stream_local_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_local_simple_agg.rs @@ -19,7 +19,8 @@ use itertools::Itertools; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use super::generic::PlanAggCall; -use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; +use super::{ExprRewritable, LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; +use crate::expr::ExprRewriter; use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::property::RequiredDist; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -109,3 +110,20 @@ impl StreamNode for StreamLocalSimpleAgg { }) } } + +impl ExprRewritable for StreamLocalSimpleAgg { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical + .rewrite_exprs(r) + .as_logical_agg() + .unwrap() + .clone(), + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 38ebd07e9956..026629e9151a 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -22,7 +22,7 @@ use risingwave_common::catalog::{ColumnDesc, TableId}; use risingwave_common::error::{ErrorCode, Result}; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; -use super::{PlanRef, PlanTreeNodeUnary, StreamNode, StreamSink}; +use super::{ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode, StreamSink}; use crate::catalog::column_catalog::ColumnCatalog; use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion}; use crate::catalog::{FragmentId, USER_COLUMN_ID_OFFSET}; @@ -358,3 +358,5 @@ impl StreamNode for StreamMaterialize { }) } } + +impl ExprRewritable for StreamMaterialize {} diff --git a/src/frontend/src/optimizer/plan_node/stream_now.rs b/src/frontend/src/optimizer/plan_node/stream_now.rs index fb2ae37a1762..9d7ec3999d87 100644 --- a/src/frontend/src/optimizer/plan_node/stream_now.rs +++ b/src/frontend/src/optimizer/plan_node/stream_now.rs @@ -24,7 +24,7 @@ use risingwave_pb::stream_plan::NowNode; use super::generic::GenericPlanRef; use super::stream::StreamPlanRef; use super::utils::{IndicesDisplay, TableCatalogBuilder}; -use super::{LogicalNow, PlanBase, StreamNode}; +use super::{ExprRewritable, LogicalNow, PlanBase, StreamNode}; use crate::optimizer::property::{Distribution, FunctionalDependencySet}; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::OptimizerContextRef; @@ -97,3 +97,5 @@ impl StreamNode for StreamNow { }) } } + +impl ExprRewritable for StreamNow {} diff --git a/src/frontend/src/optimizer/plan_node/stream_project.rs b/src/frontend/src/optimizer/plan_node/stream_project.rs index e67d16777f5e..613e9abd5b27 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project.rs @@ -20,8 +20,8 @@ use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use risingwave_pb::stream_plan::ProjectNode; use super::generic::GenericPlanRef; -use super::{LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; -use crate::expr::{try_derive_watermark, Expr, ExprDisplay, ExprImpl}; +use super::{ExprRewritable, LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; +use crate::expr::{try_derive_watermark, Expr, ExprDisplay, ExprImpl, ExprRewriter}; use crate::stream_fragmenter::BuildFragmentGraphState; /// `StreamProject` implements [`super::LogicalProject`] to evaluate specified expressions on input @@ -150,3 +150,20 @@ impl StreamNode for StreamProject { }) } } + +impl ExprRewritable for StreamProject { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical + .rewrite_exprs(r) + .as_logical_project() + .unwrap() + .clone(), + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/stream_project_set.rs b/src/frontend/src/optimizer/plan_node/stream_project_set.rs index 7c832a23dcaf..e23eb1012866 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project_set.rs @@ -19,8 +19,8 @@ use itertools::Itertools; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use risingwave_pb::stream_plan::ProjectSetNode; -use super::{LogicalProjectSet, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; -use crate::expr::try_derive_watermark; +use super::{ExprRewritable, LogicalProjectSet, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; +use crate::expr::{try_derive_watermark, ExprRewriter}; use crate::stream_fragmenter::BuildFragmentGraphState; #[derive(Debug, Clone)] @@ -94,3 +94,20 @@ impl StreamNode for StreamProjectSet { }) } } + +impl ExprRewritable for StreamProjectSet { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical + .rewrite_exprs(r) + .as_logical_project_set() + .unwrap() + .clone(), + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs b/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs index 10dca6d19d5b..689007ae92e1 100644 --- a/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs +++ b/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs @@ -16,7 +16,7 @@ use std::fmt; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; -use super::{PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; +use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::stream_fragmenter::BuildFragmentGraphState; #[derive(Clone, Debug)] @@ -76,3 +76,5 @@ impl StreamNode for StreamRowIdGen { }) } } + +impl ExprRewritable for StreamRowIdGen {} diff --git a/src/frontend/src/optimizer/plan_node/stream_share.rs b/src/frontend/src/optimizer/plan_node/stream_share.rs index a215037da667..66361b3b8917 100644 --- a/src/frontend/src/optimizer/plan_node/stream_share.rs +++ b/src/frontend/src/optimizer/plan_node/stream_share.rs @@ -20,7 +20,7 @@ use risingwave_pb::stream_plan::{ DispatchStrategy, DispatcherType, ExchangeNode, StreamNode as ProstStreamPlan, }; -use super::{PlanRef, PlanTreeNodeUnary, StreamNode}; +use super::{ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::plan_node::{LogicalShare, PlanBase, PlanTreeNode}; use crate::optimizer::property::Distribution; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -143,3 +143,5 @@ impl StreamShare { } } } + +impl ExprRewritable for StreamShare {} diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 9ef1a843d07a..02b77dea42ef 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -17,7 +17,7 @@ use std::fmt; use risingwave_common::catalog::Field; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; -use super::{PlanBase, PlanRef, StreamNode}; +use super::{ExprRewritable, PlanBase, PlanRef, StreamNode}; use crate::optimizer::plan_node::PlanTreeNodeUnary; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::TableCatalog; @@ -93,3 +93,5 @@ impl StreamNode for StreamSink { }) } } + +impl ExprRewritable for StreamSink {} diff --git a/src/frontend/src/optimizer/plan_node/stream_source.rs b/src/frontend/src/optimizer/plan_node/stream_source.rs index 27747c590005..236698ebf445 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source.rs @@ -20,7 +20,7 @@ use risingwave_pb::catalog::ColumnIndex; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use risingwave_pb::stream_plan::{SourceNode, StreamSource as ProstStreamSource}; -use super::{LogicalSource, PlanBase, StreamNode}; +use super::{ExprRewritable, LogicalSource, PlanBase, StreamNode}; use crate::optimizer::property::Distribution; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -103,3 +103,5 @@ impl StreamNode for StreamSource { ProstStreamNode::Source(SourceNode { source_inner }) } } + +impl ExprRewritable for StreamSource {} diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 3f8a68bd4c35..d1f13a9f838b 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -22,8 +22,11 @@ use risingwave_common::catalog::{Field, TableDesc}; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use risingwave_pb::stream_plan::{ChainType, StreamNode as ProstStreamPlan}; -use super::{LogicalScan, PlanBase, PlanNodeId, StreamIndexScan, StreamNode}; +use super::{ + ExprRewritable, LogicalScan, PlanBase, PlanNodeId, PlanRef, StreamIndexScan, StreamNode, +}; use crate::catalog::ColumnId; +use crate::expr::ExprRewriter; use crate::optimizer::plan_node::utils::IndicesDisplay; use crate::optimizer::property::{Distribution, DistributionDisplay}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -233,3 +236,20 @@ impl StreamTableScan { } } } + +impl ExprRewritable for StreamTableScan { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical + .rewrite_exprs(r) + .as_logical_scan() + .unwrap() + .clone(), + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/stream_topn.rs b/src/frontend/src/optimizer/plan_node/stream_topn.rs index c73d1dd6ab5b..a69be61f66a2 100644 --- a/src/frontend/src/optimizer/plan_node/stream_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_topn.rs @@ -17,7 +17,7 @@ use std::fmt; use fixedbitset::FixedBitSet; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; -use super::{LogicalTopN, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; +use super::{ExprRewritable, LogicalTopN, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::property::{Distribution, Order}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -114,3 +114,4 @@ impl StreamNode for StreamTopN { } } } +impl ExprRewritable for StreamTopN {} diff --git a/src/frontend/src/optimizer/plan_node/stream_union.rs b/src/frontend/src/optimizer/plan_node/stream_union.rs index f000cf23877e..4eccf732f190 100644 --- a/src/frontend/src/optimizer/plan_node/stream_union.rs +++ b/src/frontend/src/optimizer/plan_node/stream_union.rs @@ -19,7 +19,7 @@ use fixedbitset::FixedBitSet; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use risingwave_pb::stream_plan::UnionNode; -use super::PlanRef; +use super::{ExprRewritable, PlanRef}; use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::optimizer::plan_node::{LogicalUnion, PlanBase, PlanTreeNode, StreamNode}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -88,3 +88,5 @@ impl StreamNode for StreamUnion { ProstStreamNode::Union(UnionNode {}) } } + +impl ExprRewritable for StreamUnion {}