Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): ExprRewritable for stream #7757

Merged
merged 3 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@
No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml
- sql: |
select current_timestamp;
batch_plan: |-
batch_plan: |
BatchProject { exprs: [Now] }
└─BatchValues { rows: [[]] }
13 changes: 11 additions & 2 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,11 +453,12 @@ impl PlanRoot {
// Convert to physical plan node
plan = plan.to_batch_with_order_required(&self.required_order)?;

// SessionTimezone substitution
// TODO: SessionTimezone substitution
// Const eval of exprs at the last minute
// plan = const_eval_exprs(plan)?;

// if explain_trace {
// let ctx = plan.ctx();
// if ctx.is_explain_trace() {
// ctx.trace("Const eval exprs:");
// ctx.trace(plan.explain_to_string().unwrap());
// }
Expand Down Expand Up @@ -614,6 +615,14 @@ impl PlanRoot {
);
}

// Const eval of exprs at the last minute
// plan = const_eval_exprs(plan)?;

// if ctx.is_explain_trace() {
// ctx.trace("Const eval exprs:");
// ctx.trace(plan.explain_to_string().unwrap());
// }

#[cfg(debug_assertions)]
InputRefValidator.validate(plan.clone());

Expand Down
12 changes: 0 additions & 12 deletions src/frontend/src/optimizer/plan_node/expr_rewritable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@

use std::ops::Deref;

use paste::paste;

use super::*;
use crate::expr::ExprRewriter;
use crate::for_stream_plan_nodes;

/// Rewrites expressions in a `PlanRef`. Due to `Share` operator,
/// the `ExprRewriter` needs to be idempotent i.e. applying it more than once
Expand Down Expand Up @@ -48,12 +45,3 @@ impl ExprRewritable for PlanRef {
}
}
}

macro_rules! ban_expr_rewritable {
($( { $convention:ident, $name:ident }),*) => {
paste!{
$(impl ExprRewritable for [<$convention $name>] {} )*
}
}
}
for_stream_plan_nodes! {ban_expr_rewritable}
22 changes: 20 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{ArrangementInfo, DeltaIndexJoinNode};

use super::generic::GenericPlanRef;
use super::{LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamNode};
use crate::expr::Expr;
use super::{ExprRewritable, LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamNode};
use crate::expr::{Expr, ExprRewriter};
use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::plan_node::utils::IndicesDisplay;
use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay};
Expand Down Expand Up @@ -231,3 +231,21 @@ impl StreamNode for StreamDeltaJoin {
})
}
}

impl ExprRewritable for StreamDeltaJoin {
fn has_rewritable_expr(&self) -> bool {
true
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
Self::new(
self.logical
.rewrite_exprs(r)
.as_logical_join()
.unwrap()
.clone(),
self.eq_join_predicate.rewrite_exprs(r),
)
.into()
}
}
4 changes: 3 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use fixedbitset::FixedBitSet;
use risingwave_common::catalog::ColumnDesc;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;

use super::{PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::stream_fragmenter::BuildFragmentGraphState;

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -88,3 +88,5 @@ impl StreamNode for StreamDml {
})
}
}

impl ExprRewritable for StreamDml {}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ pub use risingwave_pb::expr::expr_node::Type as ExprType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::DynamicFilterNode;

use super::generic;
use super::utils::IndicesDisplay;
use super::{generic, ExprRewritable};
use crate::expr::Expr;
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{PlanBase, PlanTreeNodeBinary, StreamNode};
Expand Down Expand Up @@ -150,3 +150,5 @@ impl StreamNode for StreamDynamicFilter {
})
}
}

impl ExprRewritable for StreamDynamicFilter {}
4 changes: 3 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::fmt;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{DispatchStrategy, DispatcherType, ExchangeNode};

use super::{PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::property::{Distribution, DistributionDisplay};
use crate::stream_fragmenter::BuildFragmentGraphState;

Expand Down Expand Up @@ -91,3 +91,5 @@ impl StreamNode for StreamExchange {
})
}
}

impl ExprRewritable for StreamExchange {}
4 changes: 3 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use risingwave_pb::stream_plan::expand_node::Subset;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
use risingwave_pb::stream_plan::ExpandNode;

use super::{LogicalExpand, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use super::{ExprRewritable, LogicalExpand, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::property::Distribution;
use crate::stream_fragmenter::BuildFragmentGraphState;

Expand Down Expand Up @@ -101,3 +101,5 @@ fn subset_to_protobuf(subset: &[usize]) -> Subset {
let column_indices = subset.iter().map(|key| *key as u32).collect();
Subset { column_indices }
}

impl ExprRewritable for StreamExpand {}
21 changes: 19 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
use risingwave_pb::stream_plan::FilterNode;

use super::generic::GenericPlanRef;
use super::{LogicalFilter, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::{Expr, ExprImpl};
use super::{ExprRewritable, LogicalFilter, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::{Expr, ExprImpl, ExprRewriter};
use crate::optimizer::plan_node::PlanBase;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::Condition;
Expand Down Expand Up @@ -85,3 +85,20 @@ impl StreamNode for StreamFilter {
})
}
}

impl ExprRewritable for StreamFilter {
fn has_rewritable_expr(&self) -> bool {
true
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
Self::new(
self.logical
.rewrite_exprs(r)
.as_logical_filter()
.unwrap()
.clone(),
)
.into()
}
}
20 changes: 19 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_global_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use fixedbitset::FixedBitSet;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;

use super::generic::PlanAggCall;
use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use super::{ExprRewritable, LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::ExprRewriter;
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::property::Distribution;
use crate::stream_fragmenter::BuildFragmentGraphState;
Expand Down Expand Up @@ -119,3 +120,20 @@ impl StreamNode for StreamGlobalSimpleAgg {
})
}
}

impl ExprRewritable for StreamGlobalSimpleAgg {
fn has_rewritable_expr(&self) -> bool {
true
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
Self::new(
self.logical
.rewrite_exprs(r)
.as_logical_agg()
.unwrap()
.clone(),
)
.into()
}
}
4 changes: 3 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_group_topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::fmt;
use fixedbitset::FixedBitSet;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;

use super::{LogicalTopN, PlanBase, PlanTreeNodeUnary, StreamNode};
use super::{ExprRewritable, LogicalTopN, PlanBase, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::property::{Order, OrderDisplay};
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::PlanRef;
Expand Down Expand Up @@ -151,3 +151,5 @@ impl PlanTreeNodeUnary for StreamGroupTopN {
Self::new(self.logical.clone_with_input(input), self.vnode_col_idx)
}
}

impl ExprRewritable for StreamGroupTopN {}
21 changes: 20 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use fixedbitset::FixedBitSet;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;

use super::generic::PlanAggCall;
use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use super::{ExprRewritable, LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::ExprRewriter;
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::property::Distribution;
use crate::stream_fragmenter::BuildFragmentGraphState;
Expand Down Expand Up @@ -128,3 +129,21 @@ impl StreamNode for StreamHashAgg {
})
}
}

impl ExprRewritable for StreamHashAgg {
fn has_rewritable_expr(&self) -> bool {
true
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
Self::new(
self.logical
.rewrite_exprs(r)
.as_logical_agg()
.unwrap()
.clone(),
self.vnode_col_idx,
)
.into()
}
}
24 changes: 22 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ use risingwave_pb::plan_common::JoinType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::HashJoinNode;

use super::{LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamDeltaJoin, StreamNode};
use crate::expr::Expr;
use super::{
ExprRewritable, LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamDeltaJoin, StreamNode,
};
use crate::expr::{Expr, ExprRewriter};
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::utils::IndicesDisplay;
use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay};
Expand Down Expand Up @@ -283,3 +285,21 @@ impl StreamNode for StreamHashJoin {
})
}
}

impl ExprRewritable for StreamHashJoin {
fn has_rewritable_expr(&self) -> bool {
true
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
Self::new(
self.logical
.rewrite_exprs(r)
.as_logical_join()
.unwrap()
.clone(),
self.eq_join_predicate.rewrite_exprs(r),
)
.into()
}
}
4 changes: 3 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::fmt;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
use risingwave_pb::stream_plan::HopWindowNode;

use super::{LogicalHopWindow, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use super::{ExprRewritable, LogicalHopWindow, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::stream_fragmenter::BuildFragmentGraphState;

/// [`StreamHopWindow`] represents a hop window table function.
Expand Down Expand Up @@ -91,3 +91,5 @@ impl StreamNode for StreamHopWindow {
})
}
}

impl ExprRewritable for StreamHopWindow {}
21 changes: 20 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use itertools::Itertools;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
use risingwave_pb::stream_plan::{ChainType, StreamNode as ProstStreamPlan};

use super::{LogicalScan, PlanBase, PlanNodeId, StreamNode};
use super::{ExprRewritable, LogicalScan, PlanBase, PlanNodeId, PlanRef, StreamNode};
use crate::catalog::ColumnId;
use crate::expr::ExprRewriter;
use crate::optimizer::plan_node::utils::IndicesDisplay;
use crate::optimizer::property::{Distribution, DistributionDisplay};
use crate::stream_fragmenter::BuildFragmentGraphState;
Expand Down Expand Up @@ -206,3 +207,21 @@ impl StreamIndexScan {
}
}
}

impl ExprRewritable for StreamIndexScan {
fn has_rewritable_expr(&self) -> bool {
true
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
Self::new(
self.logical
.rewrite_exprs(r)
.as_logical_scan()
.unwrap()
.clone(),
self.chain_type,
)
.into()
}
}
20 changes: 19 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_local_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use itertools::Itertools;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;

use super::generic::PlanAggCall;
use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use super::{ExprRewritable, LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::ExprRewriter;
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::property::RequiredDist;
use crate::stream_fragmenter::BuildFragmentGraphState;
Expand Down Expand Up @@ -109,3 +110,20 @@ impl StreamNode for StreamLocalSimpleAgg {
})
}
}

impl ExprRewritable for StreamLocalSimpleAgg {
fn has_rewritable_expr(&self) -> bool {
true
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
Self::new(
self.logical
.rewrite_exprs(r)
.as_logical_agg()
.unwrap()
.clone(),
)
.into()
}
}
4 changes: 3 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_common::catalog::{ColumnDesc, TableId};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;

use super::{PlanRef, PlanTreeNodeUnary, StreamNode, StreamSink};
use super::{ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode, StreamSink};
use crate::catalog::column_catalog::ColumnCatalog;
use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion};
use crate::catalog::{FragmentId, USER_COLUMN_ID_OFFSET};
Expand Down Expand Up @@ -358,3 +358,5 @@ impl StreamNode for StreamMaterialize {
})
}
}

impl ExprRewritable for StreamMaterialize {}
Loading