Skip to content

Commit

Permalink
feat(frontend): ExprRewritable for stream (#7757)
Browse files Browse the repository at this point in the history
`ExprRewritable` for stream

Approved-By: chenzl25
  • Loading branch information
jon-chuang authored Feb 8, 2023
1 parent c8535a6 commit 1dcf7d1
Show file tree
Hide file tree
Showing 27 changed files with 251 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@
No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml
- sql: |
select current_timestamp;
batch_plan: |-
batch_plan: |
BatchProject { exprs: [Now] }
└─BatchValues { rows: [[]] }
13 changes: 11 additions & 2 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,11 +453,12 @@ impl PlanRoot {
// Convert to physical plan node
plan = plan.to_batch_with_order_required(&self.required_order)?;

// SessionTimezone substitution
// TODO: SessionTimezone substitution
// Const eval of exprs at the last minute
// plan = const_eval_exprs(plan)?;

// if explain_trace {
// let ctx = plan.ctx();
// if ctx.is_explain_trace() {
// ctx.trace("Const eval exprs:");
// ctx.trace(plan.explain_to_string().unwrap());
// }
Expand Down Expand Up @@ -614,6 +615,14 @@ impl PlanRoot {
);
}

// Const eval of exprs at the last minute
// plan = const_eval_exprs(plan)?;

// if ctx.is_explain_trace() {
// ctx.trace("Const eval exprs:");
// ctx.trace(plan.explain_to_string().unwrap());
// }

#[cfg(debug_assertions)]
InputRefValidator.validate(plan.clone());

Expand Down
12 changes: 0 additions & 12 deletions src/frontend/src/optimizer/plan_node/expr_rewritable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@

use std::ops::Deref;

use paste::paste;

use super::*;
use crate::expr::ExprRewriter;
use crate::for_stream_plan_nodes;

/// Rewrites expressions in a `PlanRef`. Due to `Share` operator,
/// the `ExprRewriter` needs to be idempotent i.e. applying it more than once
Expand Down Expand Up @@ -48,12 +45,3 @@ impl ExprRewritable for PlanRef {
}
}
}

macro_rules! ban_expr_rewritable {
($( { $convention:ident, $name:ident }),*) => {
paste!{
$(impl ExprRewritable for [<$convention $name>] {} )*
}
}
}
for_stream_plan_nodes! {ban_expr_rewritable}
22 changes: 20 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{ArrangementInfo, DeltaIndexJoinNode};

use super::generic::GenericPlanRef;
use super::{LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamNode};
use crate::expr::Expr;
use super::{ExprRewritable, LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamNode};
use crate::expr::{Expr, ExprRewriter};
use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::plan_node::utils::IndicesDisplay;
use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay};
Expand Down Expand Up @@ -231,3 +231,21 @@ impl StreamNode for StreamDeltaJoin {
})
}
}

impl ExprRewritable for StreamDeltaJoin {
fn has_rewritable_expr(&self) -> bool {
true
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
Self::new(
self.logical
.rewrite_exprs(r)
.as_logical_join()
.unwrap()
.clone(),
self.eq_join_predicate.rewrite_exprs(r),
)
.into()
}
}
4 changes: 3 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use fixedbitset::FixedBitSet;
use risingwave_common::catalog::ColumnDesc;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;

use super::{PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::stream_fragmenter::BuildFragmentGraphState;

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -88,3 +88,5 @@ impl StreamNode for StreamDml {
})
}
}

impl ExprRewritable for StreamDml {}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ pub use risingwave_pb::expr::expr_node::Type as ExprType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::DynamicFilterNode;

use super::generic;
use super::utils::IndicesDisplay;
use super::{generic, ExprRewritable};
use crate::expr::Expr;
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{PlanBase, PlanTreeNodeBinary, StreamNode};
Expand Down Expand Up @@ -150,3 +150,5 @@ impl StreamNode for StreamDynamicFilter {
})
}
}

impl ExprRewritable for StreamDynamicFilter {}
4 changes: 3 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::fmt;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{DispatchStrategy, DispatcherType, ExchangeNode};

use super::{PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::property::{Distribution, DistributionDisplay};
use crate::stream_fragmenter::BuildFragmentGraphState;

Expand Down Expand Up @@ -91,3 +91,5 @@ impl StreamNode for StreamExchange {
})
}
}

impl ExprRewritable for StreamExchange {}
4 changes: 3 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use risingwave_pb::stream_plan::expand_node::Subset;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
use risingwave_pb::stream_plan::ExpandNode;

use super::{LogicalExpand, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use super::{ExprRewritable, LogicalExpand, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::property::Distribution;
use crate::stream_fragmenter::BuildFragmentGraphState;

Expand Down Expand Up @@ -101,3 +101,5 @@ fn subset_to_protobuf(subset: &[usize]) -> Subset {
let column_indices = subset.iter().map(|key| *key as u32).collect();
Subset { column_indices }
}

impl ExprRewritable for StreamExpand {}
21 changes: 19 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
use risingwave_pb::stream_plan::FilterNode;

use super::generic::GenericPlanRef;
use super::{LogicalFilter, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::{Expr, ExprImpl};
use super::{ExprRewritable, LogicalFilter, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::{Expr, ExprImpl, ExprRewriter};
use crate::optimizer::plan_node::PlanBase;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::Condition;
Expand Down Expand Up @@ -85,3 +85,20 @@ impl StreamNode for StreamFilter {
})
}
}

impl ExprRewritable for StreamFilter {
fn has_rewritable_expr(&self) -> bool {
true
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
Self::new(
self.logical
.rewrite_exprs(r)
.as_logical_filter()
.unwrap()
.clone(),
)
.into()
}
}
20 changes: 19 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_global_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use fixedbitset::FixedBitSet;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;

use super::generic::PlanAggCall;
use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use super::{ExprRewritable, LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::ExprRewriter;
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::property::Distribution;
use crate::stream_fragmenter::BuildFragmentGraphState;
Expand Down Expand Up @@ -119,3 +120,20 @@ impl StreamNode for StreamGlobalSimpleAgg {
})
}
}

impl ExprRewritable for StreamGlobalSimpleAgg {
fn has_rewritable_expr(&self) -> bool {
true
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
Self::new(
self.logical
.rewrite_exprs(r)
.as_logical_agg()
.unwrap()
.clone(),
)
.into()
}
}
4 changes: 3 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_group_topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::fmt;
use fixedbitset::FixedBitSet;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;

use super::{LogicalTopN, PlanBase, PlanTreeNodeUnary, StreamNode};
use super::{ExprRewritable, LogicalTopN, PlanBase, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::property::{Order, OrderDisplay};
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::PlanRef;
Expand Down Expand Up @@ -151,3 +151,5 @@ impl PlanTreeNodeUnary for StreamGroupTopN {
Self::new(self.logical.clone_with_input(input), self.vnode_col_idx)
}
}

impl ExprRewritable for StreamGroupTopN {}
21 changes: 20 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use fixedbitset::FixedBitSet;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;

use super::generic::PlanAggCall;
use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use super::{ExprRewritable, LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::ExprRewriter;
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::property::Distribution;
use crate::stream_fragmenter::BuildFragmentGraphState;
Expand Down Expand Up @@ -128,3 +129,21 @@ impl StreamNode for StreamHashAgg {
})
}
}

impl ExprRewritable for StreamHashAgg {
fn has_rewritable_expr(&self) -> bool {
true
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
Self::new(
self.logical
.rewrite_exprs(r)
.as_logical_agg()
.unwrap()
.clone(),
self.vnode_col_idx,
)
.into()
}
}
24 changes: 22 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ use risingwave_pb::plan_common::JoinType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::HashJoinNode;

use super::{LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamDeltaJoin, StreamNode};
use crate::expr::Expr;
use super::{
ExprRewritable, LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamDeltaJoin, StreamNode,
};
use crate::expr::{Expr, ExprRewriter};
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::utils::IndicesDisplay;
use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay};
Expand Down Expand Up @@ -283,3 +285,21 @@ impl StreamNode for StreamHashJoin {
})
}
}

impl ExprRewritable for StreamHashJoin {
fn has_rewritable_expr(&self) -> bool {
true
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
Self::new(
self.logical
.rewrite_exprs(r)
.as_logical_join()
.unwrap()
.clone(),
self.eq_join_predicate.rewrite_exprs(r),
)
.into()
}
}
4 changes: 3 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::fmt;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
use risingwave_pb::stream_plan::HopWindowNode;

use super::{LogicalHopWindow, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use super::{ExprRewritable, LogicalHopWindow, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::stream_fragmenter::BuildFragmentGraphState;

/// [`StreamHopWindow`] represents a hop window table function.
Expand Down Expand Up @@ -91,3 +91,5 @@ impl StreamNode for StreamHopWindow {
})
}
}

impl ExprRewritable for StreamHopWindow {}
21 changes: 20 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use itertools::Itertools;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
use risingwave_pb::stream_plan::{ChainType, StreamNode as ProstStreamPlan};

use super::{LogicalScan, PlanBase, PlanNodeId, StreamNode};
use super::{ExprRewritable, LogicalScan, PlanBase, PlanNodeId, PlanRef, StreamNode};
use crate::catalog::ColumnId;
use crate::expr::ExprRewriter;
use crate::optimizer::plan_node::utils::IndicesDisplay;
use crate::optimizer::property::{Distribution, DistributionDisplay};
use crate::stream_fragmenter::BuildFragmentGraphState;
Expand Down Expand Up @@ -206,3 +207,21 @@ impl StreamIndexScan {
}
}
}

impl ExprRewritable for StreamIndexScan {
fn has_rewritable_expr(&self) -> bool {
true
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
Self::new(
self.logical
.rewrite_exprs(r)
.as_logical_scan()
.unwrap()
.clone(),
self.chain_type,
)
.into()
}
}
20 changes: 19 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_local_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use itertools::Itertools;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;

use super::generic::PlanAggCall;
use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use super::{ExprRewritable, LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::ExprRewriter;
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::property::RequiredDist;
use crate::stream_fragmenter::BuildFragmentGraphState;
Expand Down Expand Up @@ -109,3 +110,20 @@ impl StreamNode for StreamLocalSimpleAgg {
})
}
}

impl ExprRewritable for StreamLocalSimpleAgg {
fn has_rewritable_expr(&self) -> bool {
true
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
Self::new(
self.logical
.rewrite_exprs(r)
.as_logical_agg()
.unwrap()
.clone(),
)
.into()
}
}
4 changes: 3 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_common::catalog::{ColumnDesc, TableId};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;

use super::{PlanRef, PlanTreeNodeUnary, StreamNode, StreamSink};
use super::{ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode, StreamSink};
use crate::catalog::column_catalog::ColumnCatalog;
use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion};
use crate::catalog::{FragmentId, USER_COLUMN_ID_OFFSET};
Expand Down Expand Up @@ -358,3 +358,5 @@ impl StreamNode for StreamMaterialize {
})
}
}

impl ExprRewritable for StreamMaterialize {}
Loading

0 comments on commit 1dcf7d1

Please sign in to comment.