Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(optimizer): remove WithXXX on PlanNode #1388

Merged
merged 13 commits into from
Mar 29, 2022
7 changes: 0 additions & 7 deletions rust/frontend/src/optimizer/plan_node/batch_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::{DeleteNode, TableRefId};

Expand Down Expand Up @@ -61,12 +60,6 @@ impl PlanTreeNodeUnary for BatchDelete {

impl_plan_tree_node_for_unary! { BatchDelete }

impl WithSchema for BatchDelete {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchDelete {
fn to_distributed(&self) -> PlanRef {
let new_input = self
Expand Down
9 changes: 1 addition & 8 deletions rust/frontend/src/optimizer/plan_node/batch_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::ExchangeNode;

use super::{PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::property::{Distribution, Order, WithDistribution, WithOrder, WithSchema};
use crate::optimizer::property::{Distribution, Order};

/// `BatchExchange` imposes a particular distribution on its input
/// without changing its content.
Expand Down Expand Up @@ -59,12 +58,6 @@ impl PlanTreeNodeUnary for BatchExchange {
}
impl_plan_tree_node_for_unary! {BatchExchange}

impl WithSchema for BatchExchange {
fn schema(&self) -> &Schema {
&self.base.schema
}
}

impl ToDistributedBatch for BatchExchange {
fn to_distributed(&self) -> PlanRef {
unreachable!()
Expand Down
7 changes: 0 additions & 7 deletions rust/frontend/src/optimizer/plan_node/batch_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::FilterNode;

Expand Down Expand Up @@ -67,12 +66,6 @@ impl PlanTreeNodeUnary for BatchFilter {

impl_plan_tree_node_for_unary! { BatchFilter }

impl WithSchema for BatchFilter {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchFilter {
fn to_distributed(&self) -> PlanRef {
let new_input = self
Expand Down
8 changes: 0 additions & 8 deletions rust/frontend/src/optimizer/plan_node/batch_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::fmt;

use itertools::Itertools;
use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::HashAggNode;

Expand Down Expand Up @@ -82,13 +81,6 @@ impl PlanTreeNodeUnary for BatchHashAgg {
}
}
impl_plan_tree_node_for_unary! { BatchHashAgg }

impl WithSchema for BatchHashAgg {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchHashAgg {
fn to_distributed(&self) -> PlanRef {
let new_input = self.input().to_distributed_with_required(
Expand Down
7 changes: 0 additions & 7 deletions rust/frontend/src/optimizer/plan_node/batch_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::HashJoinNode;

Expand Down Expand Up @@ -114,12 +113,6 @@ impl PlanTreeNodeBinary for BatchHashJoin {

impl_plan_tree_node_for_binary! { BatchHashJoin }

impl WithSchema for BatchHashJoin {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchHashJoin {
fn to_distributed(&self) -> PlanRef {
let left = self.left().to_distributed_with_required(
Expand Down
7 changes: 0 additions & 7 deletions rust/frontend/src/optimizer/plan_node/batch_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::{InsertNode, TableRefId};

Expand Down Expand Up @@ -61,12 +60,6 @@ impl PlanTreeNodeUnary for BatchInsert {

impl_plan_tree_node_for_unary! { BatchInsert }

impl WithSchema for BatchInsert {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchInsert {
fn to_distributed(&self) -> PlanRef {
let new_input = self
Expand Down
8 changes: 0 additions & 8 deletions rust/frontend/src/optimizer/plan_node/batch_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::LimitNode;

Expand Down Expand Up @@ -56,13 +55,6 @@ impl PlanTreeNodeUnary for BatchLimit {
}
}
impl_plan_tree_node_for_unary! {BatchLimit}

impl WithSchema for BatchLimit {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchLimit {
fn to_distributed(&self) -> PlanRef {
let new_input = self
Expand Down
7 changes: 0 additions & 7 deletions rust/frontend/src/optimizer/plan_node/batch_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::expr::ExprNode;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::ProjectNode;
Expand Down Expand Up @@ -67,12 +66,6 @@ impl PlanTreeNodeUnary for BatchProject {

impl_plan_tree_node_for_unary! { BatchProject }

impl WithSchema for BatchProject {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchProject {
fn to_distributed(&self) -> PlanRef {
let new_input = self
Expand Down
7 changes: 0 additions & 7 deletions rust/frontend/src/optimizer/plan_node/batch_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::{CellBasedTableDesc, ColumnDesc as ProstColumnDesc, RowSeqScanNode};

Expand All @@ -29,12 +28,6 @@ pub struct BatchSeqScan {
logical: LogicalScan,
}

impl WithSchema for BatchSeqScan {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl BatchSeqScan {
pub fn new_inner(logical: LogicalScan, dist: Distribution) -> Self {
let ctx = logical.base.ctx.clone();
Expand Down
7 changes: 0 additions & 7 deletions rust/frontend/src/optimizer/plan_node/batch_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::SortAggNode;

Expand Down Expand Up @@ -65,12 +64,6 @@ impl PlanTreeNodeUnary for BatchSimpleAgg {
}
impl_plan_tree_node_for_unary! { BatchSimpleAgg }

impl WithSchema for BatchSimpleAgg {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchSimpleAgg {
fn to_distributed(&self) -> PlanRef {
let new_input = self
Expand Down
9 changes: 1 addition & 8 deletions rust/frontend/src/optimizer/plan_node/batch_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
use std::fmt;

use itertools::Itertools;
use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::{ColumnOrder, OrderByNode};

use super::{PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::property::{Distribution, Order, WithOrder, WithSchema};
use crate::optimizer::property::{Distribution, Order, WithSchema};

/// `BatchSort` buffers all data from input and sort these rows by specified order, providing the
/// collation required by user or parent plan node.
Expand Down Expand Up @@ -56,12 +55,6 @@ impl PlanTreeNodeUnary for BatchSort {
}
impl_plan_tree_node_for_unary! {BatchSort}

impl WithSchema for BatchSort {
fn schema(&self) -> &Schema {
&self.base.schema
}
}

impl ToDistributedBatch for BatchSort {
fn to_distributed(&self) -> PlanRef {
let new_input = self
Expand Down
7 changes: 0 additions & 7 deletions rust/frontend/src/optimizer/plan_node/batch_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::values_node::ExprTuple;
use risingwave_pb::plan::ValuesNode;
Expand Down Expand Up @@ -58,12 +57,6 @@ impl fmt::Display for BatchValues {
}
}

impl WithSchema for BatchValues {
fn schema(&self) -> &Schema {
self.logical.schema()
}
}

impl ToDistributedBatch for BatchValues {
fn to_distributed(&self) -> PlanRef {
Self::with_dist(self.logical().clone(), Distribution::Single).into()
Expand Down
1 change: 0 additions & 1 deletion rust/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,6 @@ mod tests {
assert_eq_input_ref, input_ref_to_column_indices, AggCall, ExprType, FunctionCall,
};
use crate::optimizer::plan_node::LogicalValues;
use crate::optimizer::property::ctx::WithId;
use crate::session::OptimizerContext;

#[tokio::test]
Expand Down
1 change: 0 additions & 1 deletion rust/frontend/src/optimizer/plan_node/logical_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ mod tests {
use super::*;
use crate::expr::{assert_eq_input_ref, FunctionCall, InputRef, Literal};
use crate::optimizer::plan_node::LogicalValues;
use crate::optimizer::property::ctx::WithId;
use crate::session::OptimizerContext;

#[tokio::test]
Expand Down
26 changes: 17 additions & 9 deletions rust/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use risingwave_common::error::{ErrorCode, Result};
use risingwave_pb::plan::PlanNode as BatchPlanProst;
use risingwave_pb::stream_plan::StreamNode as StreamPlanProst;

use super::property::{WithConvention, WithDistribution, WithOrder, WithSchema};
use super::property::{Distribution, Order, WithConvention, WithSchema};

/// The common trait over all plan nodes. Used by optimizer framework which will treate all node as
/// `dyn PlanNode`
Expand All @@ -51,11 +51,7 @@ pub trait PlanNode:
+ Display
+ Downcast
+ WithConvention
+ WithOrder
+ WithDistribution
+ WithSchema
+ WithContext
+ WithId
+ ColPrunable
+ ToBatch
+ ToStream
Expand All @@ -64,9 +60,6 @@ pub trait PlanNode:
{
fn node_type(&self) -> PlanNodeType;
fn plan_base(&self) -> &PlanBase;
fn append_only(&self) -> bool {
self.plan_base().append_only
}
}

impl_downcast!(PlanNode);
Expand All @@ -93,9 +86,24 @@ impl dyn PlanNode {
Ok(output)
}

pub fn id(&self) -> PlanNodeId {
self.plan_base().id
}
pub fn ctx(&self) -> OptimizerContextRef {
self.plan_base().ctx.clone()
}
pub fn pk_indices(&self) -> &[usize] {
&self.plan_base().pk_indices
}
pub fn order(&self) -> &Order {
&self.plan_base().order
}
pub fn distribution(&self) -> &Distribution {
&self.plan_base().dist
}
pub fn append_only(&self) -> bool {
self.plan_base().append_only
}

/// Serialize the plan node and its children to a batch plan proto.
pub fn to_batch_prost(&self) -> BatchPlanProst {
Expand Down Expand Up @@ -238,7 +246,7 @@ pub use stream_simple_agg::StreamSimpleAgg;
pub use stream_source::StreamSource;
pub use stream_table_scan::StreamTableScan;

use crate::optimizer::property::{WithContext, WithId};
use crate::session::OptimizerContextRef;

/// [`for_all_plan_nodes`] includes all plan nodes. If you added a new plan node
/// inside the project, be sure to add here and in its conventions like [`for_logical_plan_nodes`]
Expand Down
31 changes: 30 additions & 1 deletion rust/frontend/src/optimizer/plan_node/plan_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use paste::paste;
use risingwave_common::catalog::Schema;

use super::PlanNodeId;
use super::*;
use crate::for_all_plan_nodes;
use crate::optimizer::property::{Distribution, Order};
use crate::session::OptimizerContextRef;

Expand Down Expand Up @@ -89,3 +91,30 @@ impl PlanBase {
}
}
}
macro_rules! impl_base_delegate {
([], $( { $convention:ident, $name:ident }),*) => {
$(paste! {
impl [<$convention $name>] {
pub fn id(&self) -> PlanNodeId {
self.plan_base().id
}
pub fn ctx(&self) -> OptimizerContextRef {
self.plan_base().ctx.clone()
}
pub fn pk_indices(&self) -> &[usize] {
&self.plan_base().pk_indices
}
pub fn order(&self) -> &Order {
&self.plan_base().order
}
pub fn distribution(&self) -> &Distribution {
&self.plan_base().dist
}
pub fn append_only(&self) -> bool {
self.plan_base().append_only
}
}
})*
}
}
for_all_plan_nodes! { impl_base_delegate }
Loading