Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(optimizer): remove WithXXX on PlanNode #1388

Merged
merged 13 commits into from
Mar 29, 2022
3 changes: 1 addition & 2 deletions rust/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ use risingwave_common::catalog::Schema;
use risingwave_common::error::Result;

use self::heuristic::{ApplyOrder, HeuristicOptimizer};
use self::plan_node::{LogicalProject, StreamMaterialize};
use self::property::Convention;
use self::plan_node::{Convention, LogicalProject, StreamMaterialize};
use self::rule::*;
use crate::expr::InputRef;

Expand Down
9 changes: 1 addition & 8 deletions rust/frontend/src/optimizer/plan_node/batch_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::{DeleteNode, TableRefId};

use super::{
LogicalDelete, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch,
};
use crate::optimizer::property::{Distribution, Order, WithSchema};
use crate::optimizer::property::{Distribution, Order};

/// `BatchDelete` implements [`LogicalDelete`]
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -61,12 +60,6 @@ impl PlanTreeNodeUnary for BatchDelete {

impl_plan_tree_node_for_unary! { BatchDelete }

impl WithSchema for BatchDelete {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchDelete {
fn to_distributed(&self) -> PlanRef {
let new_input = self
Expand Down
9 changes: 1 addition & 8 deletions rust/frontend/src/optimizer/plan_node/batch_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::ExchangeNode;

use super::{PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::property::{Distribution, Order, WithDistribution, WithOrder, WithSchema};
use crate::optimizer::property::{Distribution, Order};

/// `BatchExchange` imposes a particular distribution on its input
/// without changing its content.
Expand Down Expand Up @@ -59,12 +58,6 @@ impl PlanTreeNodeUnary for BatchExchange {
}
impl_plan_tree_node_for_unary! {BatchExchange}

impl WithSchema for BatchExchange {
fn schema(&self) -> &Schema {
&self.base.schema
}
}

impl ToDistributedBatch for BatchExchange {
fn to_distributed(&self) -> PlanRef {
unreachable!()
Expand Down
9 changes: 1 addition & 8 deletions rust/frontend/src/optimizer/plan_node/batch_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::FilterNode;

use super::{LogicalFilter, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::expr::Expr;
use crate::optimizer::plan_node::PlanBase;
use crate::optimizer::property::{Distribution, WithSchema};
use crate::optimizer::property::Distribution;
use crate::utils::Condition;

/// `BatchFilter` implements [`super::LogicalFilter`]
Expand Down Expand Up @@ -67,12 +66,6 @@ impl PlanTreeNodeUnary for BatchFilter {

impl_plan_tree_node_for_unary! { BatchFilter }

impl WithSchema for BatchFilter {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchFilter {
fn to_distributed(&self) -> PlanRef {
let new_input = self
Expand Down
10 changes: 1 addition & 9 deletions rust/frontend/src/optimizer/plan_node/batch_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
use std::fmt;

use itertools::Itertools;
use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::HashAggNode;

use super::logical_agg::PlanAggCall;
use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::expr::InputRefDisplay;
use crate::optimizer::property::{Distribution, Order, WithSchema};
use crate::optimizer::property::{Distribution, Order};

#[derive(Debug, Clone)]
pub struct BatchHashAgg {
Expand Down Expand Up @@ -82,13 +81,6 @@ impl PlanTreeNodeUnary for BatchHashAgg {
}
}
impl_plan_tree_node_for_unary! { BatchHashAgg }

impl WithSchema for BatchHashAgg {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchHashAgg {
fn to_distributed(&self) -> PlanRef {
let new_input = self.input().to_distributed_with_required(
Expand Down
9 changes: 1 addition & 8 deletions rust/frontend/src/optimizer/plan_node/batch_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::HashJoinNode;

use super::{
EqJoinPredicate, LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, ToBatchProst,
ToDistributedBatch,
};
use crate::optimizer::property::{Distribution, Order, WithSchema};
use crate::optimizer::property::{Distribution, Order};
use crate::utils::ColIndexMapping;

/// `BatchHashJoin` implements [`super::LogicalJoin`] with hash table. It builds a hash table
Expand Down Expand Up @@ -114,12 +113,6 @@ impl PlanTreeNodeBinary for BatchHashJoin {

impl_plan_tree_node_for_binary! { BatchHashJoin }

impl WithSchema for BatchHashJoin {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchHashJoin {
fn to_distributed(&self) -> PlanRef {
let left = self.left().to_distributed_with_required(
Expand Down
9 changes: 1 addition & 8 deletions rust/frontend/src/optimizer/plan_node/batch_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::{InsertNode, TableRefId};

use super::{LogicalInsert, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::plan_node::PlanBase;
use crate::optimizer::property::{Distribution, Order, WithSchema};
use crate::optimizer::property::{Distribution, Order};

/// `BatchInsert` implements [`LogicalInsert`]
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -61,12 +60,6 @@ impl PlanTreeNodeUnary for BatchInsert {

impl_plan_tree_node_for_unary! { BatchInsert }

impl WithSchema for BatchInsert {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchInsert {
fn to_distributed(&self) -> PlanRef {
let new_input = self
Expand Down
10 changes: 1 addition & 9 deletions rust/frontend/src/optimizer/plan_node/batch_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::LimitNode;

use super::{LogicalLimit, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::property::{Distribution, WithSchema};
use crate::optimizer::property::Distribution;

/// `BatchLimit` implements [`super::LogicalLimit`] to fetch specified rows from input
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -56,13 +55,6 @@ impl PlanTreeNodeUnary for BatchLimit {
}
}
impl_plan_tree_node_for_unary! {BatchLimit}

impl WithSchema for BatchLimit {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchLimit {
fn to_distributed(&self) -> PlanRef {
let new_input = self
Expand Down
9 changes: 1 addition & 8 deletions rust/frontend/src/optimizer/plan_node/batch_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::expr::ExprNode;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::ProjectNode;
Expand All @@ -23,7 +22,7 @@ use super::{
LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch,
};
use crate::expr::Expr;
use crate::optimizer::property::{Distribution, Order, WithSchema};
use crate::optimizer::property::{Distribution, Order};

/// `BatchProject` implements [`super::LogicalProject`] to evaluate specified expressions on input
/// rows
Expand Down Expand Up @@ -67,12 +66,6 @@ impl PlanTreeNodeUnary for BatchProject {

impl_plan_tree_node_for_unary! { BatchProject }

impl WithSchema for BatchProject {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchProject {
fn to_distributed(&self) -> PlanRef {
let new_input = self
Expand Down
9 changes: 1 addition & 8 deletions rust/frontend/src/optimizer/plan_node/batch_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::{CellBasedTableDesc, ColumnDesc as ProstColumnDesc, RowSeqScanNode};

use super::{PlanBase, PlanRef, ToBatchProst, ToDistributedBatch};
use crate::optimizer::plan_node::LogicalScan;
use crate::optimizer::property::{Distribution, Order, WithSchema};
use crate::optimizer::property::{Distribution, Order};

/// `BatchSeqScan` implements [`super::LogicalScan`] to scan from a row-oriented table
#[derive(Debug, Clone)]
Expand All @@ -29,12 +28,6 @@ pub struct BatchSeqScan {
logical: LogicalScan,
}

impl WithSchema for BatchSeqScan {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl BatchSeqScan {
pub fn new_inner(logical: LogicalScan, dist: Distribution) -> Self {
let ctx = logical.base.ctx.clone();
Expand Down
9 changes: 1 addition & 8 deletions rust/frontend/src/optimizer/plan_node/batch_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::SortAggNode;

use super::logical_agg::PlanAggCall;
use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::property::{Distribution, Order, WithSchema};
use crate::optimizer::property::{Distribution, Order};

#[derive(Debug, Clone)]
pub struct BatchSimpleAgg {
Expand Down Expand Up @@ -65,12 +64,6 @@ impl PlanTreeNodeUnary for BatchSimpleAgg {
}
impl_plan_tree_node_for_unary! { BatchSimpleAgg }

impl WithSchema for BatchSimpleAgg {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchSimpleAgg {
fn to_distributed(&self) -> PlanRef {
let new_input = self
Expand Down
9 changes: 1 addition & 8 deletions rust/frontend/src/optimizer/plan_node/batch_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
use std::fmt;

use itertools::Itertools;
use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::{ColumnOrder, OrderByNode};

use super::{PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::property::{Distribution, Order, WithOrder, WithSchema};
use crate::optimizer::property::{Distribution, Order};

/// `BatchSort` buffers all data from input and sort these rows by specified order, providing the
/// collation required by user or parent plan node.
Expand Down Expand Up @@ -56,12 +55,6 @@ impl PlanTreeNodeUnary for BatchSort {
}
impl_plan_tree_node_for_unary! {BatchSort}

impl WithSchema for BatchSort {
fn schema(&self) -> &Schema {
&self.base.schema
}
}

impl ToDistributedBatch for BatchSort {
fn to_distributed(&self) -> PlanRef {
let new_input = self
Expand Down
9 changes: 1 addition & 8 deletions rust/frontend/src/optimizer/plan_node/batch_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::values_node::ExprTuple;
use risingwave_pb::plan::ValuesNode;

use super::{LogicalValues, PlanBase, PlanRef, PlanTreeNodeLeaf, ToBatchProst, ToDistributedBatch};
use crate::expr::{Expr, ExprImpl};
use crate::optimizer::property::{Distribution, Order, WithSchema};
use crate::optimizer::property::{Distribution, Order};

#[derive(Debug, Clone)]
pub struct BatchValues {
Expand Down Expand Up @@ -58,12 +57,6 @@ impl fmt::Display for BatchValues {
}
}

impl WithSchema for BatchValues {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchValues {
fn to_distributed(&self) -> PlanRef {
Self::with_dist(self.logical().clone(), Distribution::Single).into()
Expand Down
2 changes: 1 addition & 1 deletion rust/frontend/src/optimizer/plan_node/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub trait ToStream {
///
/// Now it is used to:
/// 1. ensure every plan node's output having pk column
/// 2. (todo) add `row_count`() in every Agg
/// 2. add `row_count`() in every Agg
fn logical_rewrite_for_stream(&self) -> (PlanRef, ColIndexMapping);
/// `to_stream` is equivalent to `to_stream_with_dist_required(Distribution::any())`
fn to_stream(&self) -> PlanRef;
Expand Down
7 changes: 3 additions & 4 deletions rust/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ use risingwave_common::types::DataType;
use risingwave_pb::expr::AggCall as ProstAggCall;

use super::{
BatchHashAgg, BatchSimpleAgg, ColPrunable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamHashAgg,
StreamSimpleAgg, ToBatch, ToStream,
BatchHashAgg, BatchSimpleAgg, ColPrunable, PlanBase, PlanNode, PlanRef, PlanTreeNodeUnary,
StreamHashAgg, StreamSimpleAgg, ToBatch, ToStream,
};
use crate::expr::{AggCall, Expr, ExprImpl, ExprRewriter, ExprType, FunctionCall, InputRef};
use crate::optimizer::plan_node::LogicalProject;
use crate::optimizer::property::{Distribution, WithSchema};
use crate::optimizer::property::Distribution;
use crate::utils::ColIndexMapping;

/// Aggregation Call
Expand Down Expand Up @@ -544,7 +544,6 @@ mod tests {
assert_eq_input_ref, input_ref_to_column_indices, AggCall, ExprType, FunctionCall,
};
use crate::optimizer::plan_node::LogicalValues;
use crate::optimizer::property::ctx::WithId;
use crate::session::OptimizerContext;

#[tokio::test]
Expand Down
Loading