Skip to content

Commit

Permalink
refactor(optimizer): remove WithXXX on PlanNode (#1388)
Browse files Browse the repository at this point in the history
* move append_only fn to impl dyn PlanNode

* use WithSchema with macro

* clippy fix

* remove WithOrder

* clippy fix

* remove WithDistribution

* clippy fix

* add base delegate

* remove WithId WithCtx

* remove convention

* remove WithSchema

* remove useless todo
  • Loading branch information
st1page authored Mar 29, 2022
1 parent 2d18568 commit 930c42f
Show file tree
Hide file tree
Showing 42 changed files with 116 additions and 432 deletions.
3 changes: 1 addition & 2 deletions rust/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ use risingwave_common::catalog::Schema;
use risingwave_common::error::Result;

use self::heuristic::{ApplyOrder, HeuristicOptimizer};
use self::plan_node::{LogicalProject, StreamMaterialize};
use self::property::Convention;
use self::plan_node::{Convention, LogicalProject, StreamMaterialize};
use self::rule::*;
use crate::expr::InputRef;

Expand Down
9 changes: 1 addition & 8 deletions rust/frontend/src/optimizer/plan_node/batch_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::{DeleteNode, TableRefId};

use super::{
LogicalDelete, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch,
};
use crate::optimizer::property::{Distribution, Order, WithSchema};
use crate::optimizer::property::{Distribution, Order};

/// `BatchDelete` implements [`LogicalDelete`]
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -61,12 +60,6 @@ impl PlanTreeNodeUnary for BatchDelete {

impl_plan_tree_node_for_unary! { BatchDelete }

impl WithSchema for BatchDelete {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchDelete {
fn to_distributed(&self) -> PlanRef {
let new_input = self
Expand Down
9 changes: 1 addition & 8 deletions rust/frontend/src/optimizer/plan_node/batch_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::ExchangeNode;

use super::{PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::property::{Distribution, Order, WithDistribution, WithOrder, WithSchema};
use crate::optimizer::property::{Distribution, Order};

/// `BatchExchange` imposes a particular distribution on its input
/// without changing its content.
Expand Down Expand Up @@ -59,12 +58,6 @@ impl PlanTreeNodeUnary for BatchExchange {
}
impl_plan_tree_node_for_unary! {BatchExchange}

impl WithSchema for BatchExchange {
fn schema(&self) -> &Schema {
&self.base.schema
}
}

impl ToDistributedBatch for BatchExchange {
fn to_distributed(&self) -> PlanRef {
unreachable!()
Expand Down
9 changes: 1 addition & 8 deletions rust/frontend/src/optimizer/plan_node/batch_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::FilterNode;

use super::{LogicalFilter, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::expr::Expr;
use crate::optimizer::plan_node::PlanBase;
use crate::optimizer::property::{Distribution, WithSchema};
use crate::optimizer::property::Distribution;
use crate::utils::Condition;

/// `BatchFilter` implements [`super::LogicalFilter`]
Expand Down Expand Up @@ -67,12 +66,6 @@ impl PlanTreeNodeUnary for BatchFilter {

impl_plan_tree_node_for_unary! { BatchFilter }

impl WithSchema for BatchFilter {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchFilter {
fn to_distributed(&self) -> PlanRef {
let new_input = self
Expand Down
10 changes: 1 addition & 9 deletions rust/frontend/src/optimizer/plan_node/batch_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
use std::fmt;

use itertools::Itertools;
use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::HashAggNode;

use super::logical_agg::PlanAggCall;
use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::expr::InputRefDisplay;
use crate::optimizer::property::{Distribution, Order, WithSchema};
use crate::optimizer::property::{Distribution, Order};

#[derive(Debug, Clone)]
pub struct BatchHashAgg {
Expand Down Expand Up @@ -82,13 +81,6 @@ impl PlanTreeNodeUnary for BatchHashAgg {
}
}
impl_plan_tree_node_for_unary! { BatchHashAgg }

impl WithSchema for BatchHashAgg {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchHashAgg {
fn to_distributed(&self) -> PlanRef {
let new_input = self.input().to_distributed_with_required(
Expand Down
9 changes: 1 addition & 8 deletions rust/frontend/src/optimizer/plan_node/batch_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::HashJoinNode;

use super::{
EqJoinPredicate, LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, ToBatchProst,
ToDistributedBatch,
};
use crate::optimizer::property::{Distribution, Order, WithSchema};
use crate::optimizer::property::{Distribution, Order};
use crate::utils::ColIndexMapping;

/// `BatchHashJoin` implements [`super::LogicalJoin`] with hash table. It builds a hash table
Expand Down Expand Up @@ -114,12 +113,6 @@ impl PlanTreeNodeBinary for BatchHashJoin {

impl_plan_tree_node_for_binary! { BatchHashJoin }

impl WithSchema for BatchHashJoin {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchHashJoin {
fn to_distributed(&self) -> PlanRef {
let left = self.left().to_distributed_with_required(
Expand Down
9 changes: 1 addition & 8 deletions rust/frontend/src/optimizer/plan_node/batch_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::{InsertNode, TableRefId};

use super::{LogicalInsert, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::plan_node::PlanBase;
use crate::optimizer::property::{Distribution, Order, WithSchema};
use crate::optimizer::property::{Distribution, Order};

/// `BatchInsert` implements [`LogicalInsert`]
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -61,12 +60,6 @@ impl PlanTreeNodeUnary for BatchInsert {

impl_plan_tree_node_for_unary! { BatchInsert }

impl WithSchema for BatchInsert {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchInsert {
fn to_distributed(&self) -> PlanRef {
let new_input = self
Expand Down
10 changes: 1 addition & 9 deletions rust/frontend/src/optimizer/plan_node/batch_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::LimitNode;

use super::{LogicalLimit, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::property::{Distribution, WithSchema};
use crate::optimizer::property::Distribution;

/// `BatchLimit` implements [`super::LogicalLimit`] to fetch specified rows from input
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -56,13 +55,6 @@ impl PlanTreeNodeUnary for BatchLimit {
}
}
impl_plan_tree_node_for_unary! {BatchLimit}

impl WithSchema for BatchLimit {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchLimit {
fn to_distributed(&self) -> PlanRef {
let new_input = self
Expand Down
9 changes: 1 addition & 8 deletions rust/frontend/src/optimizer/plan_node/batch_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::expr::ExprNode;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::ProjectNode;
Expand All @@ -23,7 +22,7 @@ use super::{
LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch,
};
use crate::expr::Expr;
use crate::optimizer::property::{Distribution, Order, WithSchema};
use crate::optimizer::property::{Distribution, Order};

/// `BatchProject` implements [`super::LogicalProject`] to evaluate specified expressions on input
/// rows
Expand Down Expand Up @@ -67,12 +66,6 @@ impl PlanTreeNodeUnary for BatchProject {

impl_plan_tree_node_for_unary! { BatchProject }

impl WithSchema for BatchProject {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchProject {
fn to_distributed(&self) -> PlanRef {
let new_input = self
Expand Down
9 changes: 1 addition & 8 deletions rust/frontend/src/optimizer/plan_node/batch_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::{CellBasedTableDesc, ColumnDesc as ProstColumnDesc, RowSeqScanNode};

use super::{PlanBase, PlanRef, ToBatchProst, ToDistributedBatch};
use crate::optimizer::plan_node::LogicalScan;
use crate::optimizer::property::{Distribution, Order, WithSchema};
use crate::optimizer::property::{Distribution, Order};

/// `BatchSeqScan` implements [`super::LogicalScan`] to scan from a row-oriented table
#[derive(Debug, Clone)]
Expand All @@ -29,12 +28,6 @@ pub struct BatchSeqScan {
logical: LogicalScan,
}

impl WithSchema for BatchSeqScan {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl BatchSeqScan {
pub fn new_inner(logical: LogicalScan, dist: Distribution) -> Self {
let ctx = logical.base.ctx.clone();
Expand Down
9 changes: 1 addition & 8 deletions rust/frontend/src/optimizer/plan_node/batch_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::SortAggNode;

use super::logical_agg::PlanAggCall;
use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::property::{Distribution, Order, WithSchema};
use crate::optimizer::property::{Distribution, Order};

#[derive(Debug, Clone)]
pub struct BatchSimpleAgg {
Expand Down Expand Up @@ -65,12 +64,6 @@ impl PlanTreeNodeUnary for BatchSimpleAgg {
}
impl_plan_tree_node_for_unary! { BatchSimpleAgg }

impl WithSchema for BatchSimpleAgg {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchSimpleAgg {
fn to_distributed(&self) -> PlanRef {
let new_input = self
Expand Down
9 changes: 1 addition & 8 deletions rust/frontend/src/optimizer/plan_node/batch_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
use std::fmt;

use itertools::Itertools;
use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::{ColumnOrder, OrderByNode};

use super::{PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::property::{Distribution, Order, WithOrder, WithSchema};
use crate::optimizer::property::{Distribution, Order};

/// `BatchSort` buffers all data from input and sort these rows by specified order, providing the
/// collation required by user or parent plan node.
Expand Down Expand Up @@ -56,12 +55,6 @@ impl PlanTreeNodeUnary for BatchSort {
}
impl_plan_tree_node_for_unary! {BatchSort}

impl WithSchema for BatchSort {
fn schema(&self) -> &Schema {
&self.base.schema
}
}

impl ToDistributedBatch for BatchSort {
fn to_distributed(&self) -> PlanRef {
let new_input = self
Expand Down
9 changes: 1 addition & 8 deletions rust/frontend/src/optimizer/plan_node/batch_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::values_node::ExprTuple;
use risingwave_pb::plan::ValuesNode;

use super::{LogicalValues, PlanBase, PlanRef, PlanTreeNodeLeaf, ToBatchProst, ToDistributedBatch};
use crate::expr::{Expr, ExprImpl};
use crate::optimizer::property::{Distribution, Order, WithSchema};
use crate::optimizer::property::{Distribution, Order};

#[derive(Debug, Clone)]
pub struct BatchValues {
Expand Down Expand Up @@ -58,12 +57,6 @@ impl fmt::Display for BatchValues {
}
}

impl WithSchema for BatchValues {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchValues {
fn to_distributed(&self) -> PlanRef {
Self::with_dist(self.logical().clone(), Distribution::Single).into()
Expand Down
2 changes: 1 addition & 1 deletion rust/frontend/src/optimizer/plan_node/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub trait ToStream {
///
/// Now it is used to:
/// 1. ensure every plan node's output having pk column
/// 2. (todo) add `row_count`() in every Agg
/// 2. add `row_count`() in every Agg
fn logical_rewrite_for_stream(&self) -> (PlanRef, ColIndexMapping);
/// `to_stream` is equivalent to `to_stream_with_dist_required(Distribution::any())`
fn to_stream(&self) -> PlanRef;
Expand Down
7 changes: 3 additions & 4 deletions rust/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ use risingwave_common::types::DataType;
use risingwave_pb::expr::AggCall as ProstAggCall;

use super::{
BatchHashAgg, BatchSimpleAgg, ColPrunable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamHashAgg,
StreamSimpleAgg, ToBatch, ToStream,
BatchHashAgg, BatchSimpleAgg, ColPrunable, PlanBase, PlanNode, PlanRef, PlanTreeNodeUnary,
StreamHashAgg, StreamSimpleAgg, ToBatch, ToStream,
};
use crate::expr::{AggCall, Expr, ExprImpl, ExprRewriter, ExprType, FunctionCall, InputRef};
use crate::optimizer::plan_node::LogicalProject;
use crate::optimizer::property::{Distribution, WithSchema};
use crate::optimizer::property::Distribution;
use crate::utils::ColIndexMapping;

/// Aggregation Call
Expand Down Expand Up @@ -544,7 +544,6 @@ mod tests {
assert_eq_input_ref, input_ref_to_column_indices, AggCall, ExprType, FunctionCall,
};
use crate::optimizer::plan_node::LogicalValues;
use crate::optimizer::property::ctx::WithId;
use crate::session::OptimizerContext;

#[tokio::test]
Expand Down
Loading

0 comments on commit 930c42f

Please sign in to comment.