From 87fdc9dc31b6d3956593fa0aea2065f75f485ec9 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 29 Mar 2022 15:52:53 +0800 Subject: [PATCH 01/12] move append_only fn to impl dyn PlanNode --- rust/frontend/src/optimizer/plan_node/mod.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/rust/frontend/src/optimizer/plan_node/mod.rs b/rust/frontend/src/optimizer/plan_node/mod.rs index f6487ce0bd407..cde44398d0955 100644 --- a/rust/frontend/src/optimizer/plan_node/mod.rs +++ b/rust/frontend/src/optimizer/plan_node/mod.rs @@ -64,9 +64,6 @@ pub trait PlanNode: { fn node_type(&self) -> PlanNodeType; fn plan_base(&self) -> &PlanBase; - fn append_only(&self) -> bool { - self.plan_base().append_only - } } impl_downcast!(PlanNode); @@ -97,6 +94,10 @@ impl dyn PlanNode { &self.plan_base().pk_indices } + pub fn append_only(&self) -> bool { + self.plan_base().append_only + } + /// Serialize the plan node and its children to a batch plan proto. pub fn to_batch_prost(&self) -> BatchPlanProst { self.to_batch_prost_identity(true) From 831a3e438a8e17e7ac4a148c9796f21115862058 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 29 Mar 2022 16:32:46 +0800 Subject: [PATCH 02/12] use WithSchema with macro --- rust/frontend/src/optimizer/plan_node/batch_delete.rs | 6 ------ rust/frontend/src/optimizer/plan_node/batch_exchange.rs | 6 ------ rust/frontend/src/optimizer/plan_node/batch_filter.rs | 6 ------ rust/frontend/src/optimizer/plan_node/batch_hash_agg.rs | 7 ------- rust/frontend/src/optimizer/plan_node/batch_hash_join.rs | 6 ------ rust/frontend/src/optimizer/plan_node/batch_insert.rs | 6 ------ rust/frontend/src/optimizer/plan_node/batch_limit.rs | 7 ------- rust/frontend/src/optimizer/plan_node/batch_project.rs | 6 ------ rust/frontend/src/optimizer/plan_node/batch_seq_scan.rs | 6 ------ rust/frontend/src/optimizer/plan_node/batch_simple_agg.rs | 6 ------ rust/frontend/src/optimizer/plan_node/batch_sort.rs | 6 ------ rust/frontend/src/optimizer/plan_node/batch_values.rs | 6 ------ rust/frontend/src/optimizer/plan_node/stream_exchange.rs | 6 ------ rust/frontend/src/optimizer/plan_node/stream_filter.rs | 6 ------ rust/frontend/src/optimizer/plan_node/stream_hash_agg.rs | 6 ------ rust/frontend/src/optimizer/plan_node/stream_hash_join.rs | 6 ------ .../frontend/src/optimizer/plan_node/stream_materialize.rs | 6 ------ rust/frontend/src/optimizer/plan_node/stream_project.rs | 5 ----- rust/frontend/src/optimizer/plan_node/stream_simple_agg.rs | 6 ------ rust/frontend/src/optimizer/plan_node/stream_source.rs | 6 ------ rust/frontend/src/optimizer/plan_node/stream_table_scan.rs | 7 ------- rust/frontend/src/optimizer/property/schema.rs | 6 +++--- 22 files changed, 3 insertions(+), 131 deletions(-) diff --git a/rust/frontend/src/optimizer/plan_node/batch_delete.rs b/rust/frontend/src/optimizer/plan_node/batch_delete.rs index 74a0b81701a26..59e43a630a99b 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_delete.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_delete.rs @@ -61,12 +61,6 @@ impl PlanTreeNodeUnary for BatchDelete { impl_plan_tree_node_for_unary! { BatchDelete } -impl WithSchema for BatchDelete { - fn schema(&self) -> &Schema { - self.logical.schema() - } -} - impl ToDistributedBatch for BatchDelete { fn to_distributed(&self) -> PlanRef { let new_input = self diff --git a/rust/frontend/src/optimizer/plan_node/batch_exchange.rs b/rust/frontend/src/optimizer/plan_node/batch_exchange.rs index 4b42b550decc1..d44cfb873b5c4 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_exchange.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_exchange.rs @@ -59,12 +59,6 @@ impl PlanTreeNodeUnary for BatchExchange { } impl_plan_tree_node_for_unary! {BatchExchange} -impl WithSchema for BatchExchange { - fn schema(&self) -> &Schema { - &self.base.schema - } -} - impl ToDistributedBatch for BatchExchange { fn to_distributed(&self) -> PlanRef { unreachable!() diff --git a/rust/frontend/src/optimizer/plan_node/batch_filter.rs b/rust/frontend/src/optimizer/plan_node/batch_filter.rs index 96e790bc0e499..252480e58b2d8 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_filter.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_filter.rs @@ -67,12 +67,6 @@ impl PlanTreeNodeUnary for BatchFilter { impl_plan_tree_node_for_unary! { BatchFilter } -impl WithSchema for BatchFilter { - fn schema(&self) -> &Schema { - self.logical.schema() - } -} - impl ToDistributedBatch for BatchFilter { fn to_distributed(&self) -> PlanRef { let new_input = self diff --git a/rust/frontend/src/optimizer/plan_node/batch_hash_agg.rs b/rust/frontend/src/optimizer/plan_node/batch_hash_agg.rs index 9d8c736825d9c..2010e76093dab 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_hash_agg.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_hash_agg.rs @@ -82,13 +82,6 @@ impl PlanTreeNodeUnary for BatchHashAgg { } } impl_plan_tree_node_for_unary! { BatchHashAgg } - -impl WithSchema for BatchHashAgg { - fn schema(&self) -> &Schema { - self.logical.schema() - } -} - impl ToDistributedBatch for BatchHashAgg { fn to_distributed(&self) -> PlanRef { let new_input = self.input().to_distributed_with_required( diff --git a/rust/frontend/src/optimizer/plan_node/batch_hash_join.rs b/rust/frontend/src/optimizer/plan_node/batch_hash_join.rs index b12c1bb7de13c..8d0c3155bdf4a 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_hash_join.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_hash_join.rs @@ -114,12 +114,6 @@ impl PlanTreeNodeBinary for BatchHashJoin { impl_plan_tree_node_for_binary! { BatchHashJoin } -impl WithSchema for BatchHashJoin { - fn schema(&self) -> &Schema { - self.logical.schema() - } -} - impl ToDistributedBatch for BatchHashJoin { fn to_distributed(&self) -> PlanRef { let left = self.left().to_distributed_with_required( diff --git a/rust/frontend/src/optimizer/plan_node/batch_insert.rs b/rust/frontend/src/optimizer/plan_node/batch_insert.rs index d696c52d64087..013a493d10a9a 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_insert.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_insert.rs @@ -61,12 +61,6 @@ impl PlanTreeNodeUnary for BatchInsert { impl_plan_tree_node_for_unary! { BatchInsert } -impl WithSchema for BatchInsert { - fn schema(&self) -> &Schema { - self.logical.schema() - } -} - impl ToDistributedBatch for BatchInsert { fn to_distributed(&self) -> PlanRef { let new_input = self diff --git a/rust/frontend/src/optimizer/plan_node/batch_limit.rs b/rust/frontend/src/optimizer/plan_node/batch_limit.rs index 245751a92f376..b6adeaf78ae76 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_limit.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_limit.rs @@ -56,13 +56,6 @@ impl PlanTreeNodeUnary for BatchLimit { } } impl_plan_tree_node_for_unary! {BatchLimit} - -impl WithSchema for BatchLimit { - fn schema(&self) -> &Schema { - self.logical.schema() - } -} - impl ToDistributedBatch for BatchLimit { fn to_distributed(&self) -> PlanRef { let new_input = self diff --git a/rust/frontend/src/optimizer/plan_node/batch_project.rs b/rust/frontend/src/optimizer/plan_node/batch_project.rs index 7dd4fcc34647e..11960a4787c93 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_project.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_project.rs @@ -67,12 +67,6 @@ impl PlanTreeNodeUnary for BatchProject { impl_plan_tree_node_for_unary! { BatchProject } -impl WithSchema for BatchProject { - fn schema(&self) -> &Schema { - self.logical.schema() - } -} - impl ToDistributedBatch for BatchProject { fn to_distributed(&self) -> PlanRef { let new_input = self diff --git a/rust/frontend/src/optimizer/plan_node/batch_seq_scan.rs b/rust/frontend/src/optimizer/plan_node/batch_seq_scan.rs index 6e1f692602304..ec8620dd25e17 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_seq_scan.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_seq_scan.rs @@ -29,12 +29,6 @@ pub struct BatchSeqScan { logical: LogicalScan, } -impl WithSchema for BatchSeqScan { - fn schema(&self) -> &Schema { - self.logical.schema() - } -} - impl BatchSeqScan { pub fn new_inner(logical: LogicalScan, dist: Distribution) -> Self { let ctx = logical.base.ctx.clone(); diff --git a/rust/frontend/src/optimizer/plan_node/batch_simple_agg.rs b/rust/frontend/src/optimizer/plan_node/batch_simple_agg.rs index 7fc80eb94b867..99c142fc3db6d 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_simple_agg.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_simple_agg.rs @@ -65,12 +65,6 @@ impl PlanTreeNodeUnary for BatchSimpleAgg { } impl_plan_tree_node_for_unary! { BatchSimpleAgg } -impl WithSchema for BatchSimpleAgg { - fn schema(&self) -> &Schema { - self.logical.schema() - } -} - impl ToDistributedBatch for BatchSimpleAgg { fn to_distributed(&self) -> PlanRef { let new_input = self diff --git a/rust/frontend/src/optimizer/plan_node/batch_sort.rs b/rust/frontend/src/optimizer/plan_node/batch_sort.rs index faf1c0ee27136..689027c13cb63 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_sort.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_sort.rs @@ -56,12 +56,6 @@ impl PlanTreeNodeUnary for BatchSort { } impl_plan_tree_node_for_unary! {BatchSort} -impl WithSchema for BatchSort { - fn schema(&self) -> &Schema { - &self.base.schema - } -} - impl ToDistributedBatch for BatchSort { fn to_distributed(&self) -> PlanRef { let new_input = self diff --git a/rust/frontend/src/optimizer/plan_node/batch_values.rs b/rust/frontend/src/optimizer/plan_node/batch_values.rs index 5c2c185bcf574..cf46faebb63f2 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_values.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_values.rs @@ -58,12 +58,6 @@ impl fmt::Display for BatchValues { } } -impl WithSchema for BatchValues { - fn schema(&self) -> &Schema { - self.logical.schema() - } -} - impl ToDistributedBatch for BatchValues { fn to_distributed(&self) -> PlanRef { Self::with_dist(self.logical().clone(), Distribution::Single).into() diff --git a/rust/frontend/src/optimizer/plan_node/stream_exchange.rs b/rust/frontend/src/optimizer/plan_node/stream_exchange.rs index d62b9c4e2b395..6f863f74bbad8 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -61,12 +61,6 @@ impl PlanTreeNodeUnary for StreamExchange { } impl_plan_tree_node_for_unary! {StreamExchange} -impl WithSchema for StreamExchange { - fn schema(&self) -> &Schema { - &self.base.schema - } -} - impl ToStreamProst for StreamExchange { fn to_stream_prost_body(&self) -> Node { Node::ExchangeNode(ExchangeNode { diff --git a/rust/frontend/src/optimizer/plan_node/stream_filter.rs b/rust/frontend/src/optimizer/plan_node/stream_filter.rs index ae7f17fd274a9..e6ef73d6012a8 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_filter.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_filter.rs @@ -71,12 +71,6 @@ impl PlanTreeNodeUnary for StreamFilter { impl_plan_tree_node_for_unary! { StreamFilter } -impl WithSchema for StreamFilter { - fn schema(&self) -> &Schema { - self.logical.schema() - } -} - impl ToStreamProst for StreamFilter { fn to_stream_prost_body(&self) -> ProstStreamNode { ProstStreamNode::FilterNode(FilterNode { diff --git a/rust/frontend/src/optimizer/plan_node/stream_hash_agg.rs b/rust/frontend/src/optimizer/plan_node/stream_hash_agg.rs index 63990cf88250a..f1c2138477b8e 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_hash_agg.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_hash_agg.rs @@ -89,12 +89,6 @@ impl PlanTreeNodeUnary for StreamHashAgg { } impl_plan_tree_node_for_unary! { StreamHashAgg } -impl WithSchema for StreamHashAgg { - fn schema(&self) -> &Schema { - self.logical.schema() - } -} - impl ToStreamProst for StreamHashAgg { fn to_stream_prost_body(&self) -> ProstStreamNode { use risingwave_pb::stream_plan::*; diff --git a/rust/frontend/src/optimizer/plan_node/stream_hash_join.rs b/rust/frontend/src/optimizer/plan_node/stream_hash_join.rs index dd85effff4976..56793ed4df3d3 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_hash_join.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_hash_join.rs @@ -126,12 +126,6 @@ impl PlanTreeNodeBinary for StreamHashJoin { impl_plan_tree_node_for_binary! { StreamHashJoin } -impl WithSchema for StreamHashJoin { - fn schema(&self) -> &Schema { - self.logical.schema() - } -} - impl ToStreamProst for StreamHashJoin { fn to_stream_prost_body(&self) -> Node { Node::HashJoinNode(HashJoinNode { diff --git a/rust/frontend/src/optimizer/plan_node/stream_materialize.rs b/rust/frontend/src/optimizer/plan_node/stream_materialize.rs index 5a211f3e06350..a79fcc453d76c 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -211,12 +211,6 @@ impl PlanTreeNodeUnary for StreamMaterialize { impl_plan_tree_node_for_unary! { StreamMaterialize } -impl WithSchema for StreamMaterialize { - fn schema(&self) -> &Schema { - &self.base.schema - } -} - impl ToStreamProst for StreamMaterialize { fn to_stream_prost_body(&self) -> ProstStreamNode { use risingwave_pb::stream_plan::*; diff --git a/rust/frontend/src/optimizer/plan_node/stream_project.rs b/rust/frontend/src/optimizer/plan_node/stream_project.rs index 435cd2803c249..0dd44b94fefeb 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_project.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_project.rs @@ -66,11 +66,6 @@ impl PlanTreeNodeUnary for StreamProject { } } impl_plan_tree_node_for_unary! {StreamProject} -impl WithSchema for StreamProject { - fn schema(&self) -> &Schema { - self.logical.schema() - } -} impl ToStreamProst for StreamProject { fn to_stream_prost_body(&self) -> ProstStreamNode { diff --git a/rust/frontend/src/optimizer/plan_node/stream_simple_agg.rs b/rust/frontend/src/optimizer/plan_node/stream_simple_agg.rs index 28dc1f3375f6b..1b1eebe4d5acd 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_simple_agg.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_simple_agg.rs @@ -68,12 +68,6 @@ impl PlanTreeNodeUnary for StreamSimpleAgg { } impl_plan_tree_node_for_unary! { StreamSimpleAgg } -impl WithSchema for StreamSimpleAgg { - fn schema(&self) -> &Schema { - self.logical.schema() - } -} - impl ToStreamProst for StreamSimpleAgg { fn to_stream_prost_body(&self) -> ProstStreamNode { use risingwave_pb::stream_plan::*; diff --git a/rust/frontend/src/optimizer/plan_node/stream_source.rs b/rust/frontend/src/optimizer/plan_node/stream_source.rs index 0f991c8c6f267..3cc48a925a7a2 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_source.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_source.rs @@ -77,12 +77,6 @@ impl StreamSource { } } -impl WithSchema for StreamSource { - fn schema(&self) -> &Schema { - &self.base.schema - } -} - impl_plan_tree_node_for_leaf! { StreamSource } impl fmt::Display for StreamSource { diff --git a/rust/frontend/src/optimizer/plan_node/stream_table_scan.rs b/rust/frontend/src/optimizer/plan_node/stream_table_scan.rs index 1ea4f3cd14653..7c5cc04af3421 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -57,13 +57,6 @@ impl StreamTableScan { self.logical.table_name() } } - -impl WithSchema for StreamTableScan { - fn schema(&self) -> &Schema { - self.logical.schema() - } -} - impl_plan_tree_node_for_leaf! { StreamTableScan } impl fmt::Display for StreamTableScan { diff --git a/rust/frontend/src/optimizer/property/schema.rs b/rust/frontend/src/optimizer/property/schema.rs index 07408a2a2635d..763ded82a81a6 100644 --- a/rust/frontend/src/optimizer/property/schema.rs +++ b/rust/frontend/src/optimizer/property/schema.rs @@ -17,7 +17,7 @@ use paste::paste; use risingwave_common::catalog::Schema; use super::super::plan_node::*; -use crate::for_logical_plan_nodes; +use crate::{for_all_plan_nodes, for_logical_plan_nodes}; pub trait WithSchema { fn schema(&self) -> &Schema; @@ -34,7 +34,7 @@ pub trait WithSchema { } /// Define module for each node. -macro_rules! impl_with_schema_for_logical_node { +macro_rules! impl_with_schema { ([], $( { $convention:ident, $name:ident }),*) => { $(paste! { impl WithSchema for [<$convention $name>] { @@ -45,4 +45,4 @@ macro_rules! impl_with_schema_for_logical_node { })* } } -for_logical_plan_nodes! {impl_with_schema_for_logical_node } +for_all_plan_nodes! {impl_with_schema } From e6e57cd898069717e3ac842c94bc1d4b18219ce5 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 29 Mar 2022 16:34:14 +0800 Subject: [PATCH 03/12] clippy fix --- rust/frontend/src/optimizer/plan_node/batch_delete.rs | 1 - rust/frontend/src/optimizer/plan_node/batch_exchange.rs | 3 +-- rust/frontend/src/optimizer/plan_node/batch_filter.rs | 1 - rust/frontend/src/optimizer/plan_node/batch_hash_agg.rs | 1 - rust/frontend/src/optimizer/plan_node/batch_hash_join.rs | 1 - rust/frontend/src/optimizer/plan_node/batch_insert.rs | 1 - rust/frontend/src/optimizer/plan_node/batch_limit.rs | 1 - rust/frontend/src/optimizer/plan_node/batch_project.rs | 1 - rust/frontend/src/optimizer/plan_node/batch_seq_scan.rs | 1 - rust/frontend/src/optimizer/plan_node/batch_simple_agg.rs | 1 - rust/frontend/src/optimizer/plan_node/batch_sort.rs | 1 - rust/frontend/src/optimizer/plan_node/batch_values.rs | 1 - rust/frontend/src/optimizer/plan_node/stream_exchange.rs | 1 - rust/frontend/src/optimizer/plan_node/stream_filter.rs | 1 - rust/frontend/src/optimizer/plan_node/stream_hash_agg.rs | 1 - rust/frontend/src/optimizer/plan_node/stream_hash_join.rs | 1 - rust/frontend/src/optimizer/plan_node/stream_materialize.rs | 2 +- rust/frontend/src/optimizer/plan_node/stream_project.rs | 1 - rust/frontend/src/optimizer/plan_node/stream_simple_agg.rs | 1 - rust/frontend/src/optimizer/plan_node/stream_table_scan.rs | 1 - rust/frontend/src/optimizer/property/schema.rs | 2 +- 21 files changed, 3 insertions(+), 22 deletions(-) diff --git a/rust/frontend/src/optimizer/plan_node/batch_delete.rs b/rust/frontend/src/optimizer/plan_node/batch_delete.rs index 59e43a630a99b..ad4cc554872df 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_delete.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_delete.rs @@ -14,7 +14,6 @@ use std::fmt; -use risingwave_common::catalog::Schema; use risingwave_pb::plan::plan_node::NodeBody; use risingwave_pb::plan::{DeleteNode, TableRefId}; diff --git a/rust/frontend/src/optimizer/plan_node/batch_exchange.rs b/rust/frontend/src/optimizer/plan_node/batch_exchange.rs index d44cfb873b5c4..85959fedd8fde 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_exchange.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_exchange.rs @@ -14,12 +14,11 @@ use std::fmt; -use risingwave_common::catalog::Schema; use risingwave_pb::plan::plan_node::NodeBody; use risingwave_pb::plan::ExchangeNode; use super::{PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch}; -use crate::optimizer::property::{Distribution, Order, WithDistribution, WithOrder, WithSchema}; +use crate::optimizer::property::{Distribution, Order, WithDistribution, WithOrder}; /// `BatchExchange` imposes a particular distribution on its input /// without changing its content. diff --git a/rust/frontend/src/optimizer/plan_node/batch_filter.rs b/rust/frontend/src/optimizer/plan_node/batch_filter.rs index 252480e58b2d8..c563a90bec613 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_filter.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_filter.rs @@ -14,7 +14,6 @@ use std::fmt; -use risingwave_common::catalog::Schema; use risingwave_pb::plan::plan_node::NodeBody; use risingwave_pb::plan::FilterNode; diff --git a/rust/frontend/src/optimizer/plan_node/batch_hash_agg.rs b/rust/frontend/src/optimizer/plan_node/batch_hash_agg.rs index 2010e76093dab..f0821282e071d 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_hash_agg.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_hash_agg.rs @@ -15,7 +15,6 @@ use std::fmt; use itertools::Itertools; -use risingwave_common::catalog::Schema; use risingwave_pb::plan::plan_node::NodeBody; use risingwave_pb::plan::HashAggNode; diff --git a/rust/frontend/src/optimizer/plan_node/batch_hash_join.rs b/rust/frontend/src/optimizer/plan_node/batch_hash_join.rs index 8d0c3155bdf4a..9a123880cb87f 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_hash_join.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_hash_join.rs @@ -14,7 +14,6 @@ use std::fmt; -use risingwave_common::catalog::Schema; use risingwave_pb::plan::plan_node::NodeBody; use risingwave_pb::plan::HashJoinNode; diff --git a/rust/frontend/src/optimizer/plan_node/batch_insert.rs b/rust/frontend/src/optimizer/plan_node/batch_insert.rs index 013a493d10a9a..0096a70bf2d2c 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_insert.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_insert.rs @@ -14,7 +14,6 @@ use std::fmt; -use risingwave_common::catalog::Schema; use risingwave_pb::plan::plan_node::NodeBody; use risingwave_pb::plan::{InsertNode, TableRefId}; diff --git a/rust/frontend/src/optimizer/plan_node/batch_limit.rs b/rust/frontend/src/optimizer/plan_node/batch_limit.rs index b6adeaf78ae76..c761ae46bb87a 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_limit.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_limit.rs @@ -14,7 +14,6 @@ use std::fmt; -use risingwave_common::catalog::Schema; use risingwave_pb::plan::plan_node::NodeBody; use risingwave_pb::plan::LimitNode; diff --git a/rust/frontend/src/optimizer/plan_node/batch_project.rs b/rust/frontend/src/optimizer/plan_node/batch_project.rs index 11960a4787c93..a2bcf62fde29e 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_project.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_project.rs @@ -14,7 +14,6 @@ use std::fmt; -use risingwave_common::catalog::Schema; use risingwave_pb::expr::ExprNode; use risingwave_pb::plan::plan_node::NodeBody; use risingwave_pb::plan::ProjectNode; diff --git a/rust/frontend/src/optimizer/plan_node/batch_seq_scan.rs b/rust/frontend/src/optimizer/plan_node/batch_seq_scan.rs index ec8620dd25e17..2ad96a90b1be1 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_seq_scan.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_seq_scan.rs @@ -14,7 +14,6 @@ use std::fmt; -use risingwave_common::catalog::Schema; use risingwave_pb::plan::plan_node::NodeBody; use risingwave_pb::plan::{CellBasedTableDesc, ColumnDesc as ProstColumnDesc, RowSeqScanNode}; diff --git a/rust/frontend/src/optimizer/plan_node/batch_simple_agg.rs b/rust/frontend/src/optimizer/plan_node/batch_simple_agg.rs index 99c142fc3db6d..6532a512848cc 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_simple_agg.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_simple_agg.rs @@ -14,7 +14,6 @@ use std::fmt; -use risingwave_common::catalog::Schema; use risingwave_pb::plan::plan_node::NodeBody; use risingwave_pb::plan::SortAggNode; diff --git a/rust/frontend/src/optimizer/plan_node/batch_sort.rs b/rust/frontend/src/optimizer/plan_node/batch_sort.rs index 689027c13cb63..d7d532646eb24 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_sort.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_sort.rs @@ -15,7 +15,6 @@ use std::fmt; use itertools::Itertools; -use risingwave_common::catalog::Schema; use risingwave_pb::plan::plan_node::NodeBody; use risingwave_pb::plan::{ColumnOrder, OrderByNode}; diff --git a/rust/frontend/src/optimizer/plan_node/batch_values.rs b/rust/frontend/src/optimizer/plan_node/batch_values.rs index cf46faebb63f2..8e89d6da089b4 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_values.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_values.rs @@ -14,7 +14,6 @@ use std::fmt; -use risingwave_common::catalog::Schema; use risingwave_pb::plan::plan_node::NodeBody; use risingwave_pb::plan::values_node::ExprTuple; use risingwave_pb::plan::ValuesNode; diff --git a/rust/frontend/src/optimizer/plan_node/stream_exchange.rs b/rust/frontend/src/optimizer/plan_node/stream_exchange.rs index 6f863f74bbad8..28078da56e0f7 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -14,7 +14,6 @@ use std::fmt; -use risingwave_common::catalog::Schema; use risingwave_pb::stream_plan::stream_node::Node; use risingwave_pb::stream_plan::{DispatchStrategy, DispatcherType, ExchangeNode}; diff --git a/rust/frontend/src/optimizer/plan_node/stream_filter.rs b/rust/frontend/src/optimizer/plan_node/stream_filter.rs index e6ef73d6012a8..12ea33ba6cee5 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_filter.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_filter.rs @@ -14,7 +14,6 @@ use std::fmt; -use risingwave_common::catalog::Schema; use risingwave_pb::stream_plan::stream_node::Node as ProstStreamNode; use risingwave_pb::stream_plan::FilterNode; diff --git a/rust/frontend/src/optimizer/plan_node/stream_hash_agg.rs b/rust/frontend/src/optimizer/plan_node/stream_hash_agg.rs index f1c2138477b8e..283159fbcc473 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_hash_agg.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_hash_agg.rs @@ -15,7 +15,6 @@ use std::fmt; use itertools::Itertools; -use risingwave_common::catalog::Schema; use risingwave_pb::stream_plan::stream_node::Node as ProstStreamNode; use super::logical_agg::PlanAggCall; diff --git a/rust/frontend/src/optimizer/plan_node/stream_hash_join.rs b/rust/frontend/src/optimizer/plan_node/stream_hash_join.rs index 56793ed4df3d3..d8784b5e931a6 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_hash_join.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_hash_join.rs @@ -15,7 +15,6 @@ use std::fmt; use itertools::Itertools; -use risingwave_common::catalog::Schema; use risingwave_pb::plan::JoinType; use risingwave_pb::stream_plan::stream_node::Node; use risingwave_pb::stream_plan::HashJoinNode; diff --git a/rust/frontend/src/optimizer/plan_node/stream_materialize.rs b/rust/frontend/src/optimizer/plan_node/stream_materialize.rs index a79fcc453d76c..36072737df264 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -30,7 +30,7 @@ use crate::catalog::column_catalog::ColumnCatalog; use crate::catalog::table_catalog::TableCatalog; use crate::catalog::{gen_row_id_column_name, is_row_id_column_name, ColumnId}; use crate::optimizer::plan_node::{PlanBase, PlanNode}; -use crate::optimizer::property::{Order, WithSchema}; +use crate::optimizer::property::Order; /// Materializes a stream. #[derive(Debug, Clone)] diff --git a/rust/frontend/src/optimizer/plan_node/stream_project.rs b/rust/frontend/src/optimizer/plan_node/stream_project.rs index 0dd44b94fefeb..f575fb5744098 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_project.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_project.rs @@ -14,7 +14,6 @@ use std::fmt; -use risingwave_common::catalog::Schema; use risingwave_pb::stream_plan::stream_node::Node as ProstStreamNode; use risingwave_pb::stream_plan::ProjectNode; diff --git a/rust/frontend/src/optimizer/plan_node/stream_simple_agg.rs b/rust/frontend/src/optimizer/plan_node/stream_simple_agg.rs index 1b1eebe4d5acd..e864f9ba97643 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_simple_agg.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_simple_agg.rs @@ -15,7 +15,6 @@ use std::fmt; use itertools::Itertools; -use risingwave_common::catalog::Schema; use risingwave_pb::stream_plan::stream_node::Node as ProstStreamNode; use super::logical_agg::PlanAggCall; diff --git a/rust/frontend/src/optimizer/plan_node/stream_table_scan.rs b/rust/frontend/src/optimizer/plan_node/stream_table_scan.rs index 7c5cc04af3421..f2817f710c149 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -15,7 +15,6 @@ use std::fmt; use itertools::Itertools; -use risingwave_common::catalog::Schema; use risingwave_pb::stream_plan::stream_node::Node as ProstStreamNode; use risingwave_pb::stream_plan::StreamNode as ProstStreamPlan; diff --git a/rust/frontend/src/optimizer/property/schema.rs b/rust/frontend/src/optimizer/property/schema.rs index 763ded82a81a6..17c8cc481654a 100644 --- a/rust/frontend/src/optimizer/property/schema.rs +++ b/rust/frontend/src/optimizer/property/schema.rs @@ -17,7 +17,7 @@ use paste::paste; use risingwave_common::catalog::Schema; use super::super::plan_node::*; -use crate::{for_all_plan_nodes, for_logical_plan_nodes}; +use crate::for_all_plan_nodes; pub trait WithSchema { fn schema(&self) -> &Schema; From 29df11ba514fbe75b037cd20a99459e23a3ac66c Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 29 Mar 2022 16:41:16 +0800 Subject: [PATCH 04/12] remove WithOrder --- .../src/optimizer/plan_node/batch_exchange.rs | 2 +- .../src/optimizer/plan_node/batch_sort.rs | 2 +- rust/frontend/src/optimizer/plan_node/mod.rs | 7 +++-- rust/frontend/src/optimizer/property/order.rs | 31 +++++-------------- 4 files changed, 14 insertions(+), 28 deletions(-) diff --git a/rust/frontend/src/optimizer/plan_node/batch_exchange.rs b/rust/frontend/src/optimizer/plan_node/batch_exchange.rs index 85959fedd8fde..cc34c813f9fde 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_exchange.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_exchange.rs @@ -18,7 +18,7 @@ use risingwave_pb::plan::plan_node::NodeBody; use risingwave_pb::plan::ExchangeNode; use super::{PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch}; -use crate::optimizer::property::{Distribution, Order, WithDistribution, WithOrder}; +use crate::optimizer::property::{Distribution, Order, WithDistribution}; /// `BatchExchange` imposes a particular distribution on its input /// without changing its content. diff --git a/rust/frontend/src/optimizer/plan_node/batch_sort.rs b/rust/frontend/src/optimizer/plan_node/batch_sort.rs index d7d532646eb24..deab6f7e70421 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_sort.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_sort.rs @@ -19,7 +19,7 @@ use risingwave_pb::plan::plan_node::NodeBody; use risingwave_pb::plan::{ColumnOrder, OrderByNode}; use super::{PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch}; -use crate::optimizer::property::{Distribution, Order, WithOrder, WithSchema}; +use crate::optimizer::property::{Distribution, Order, WithSchema}; /// `BatchSort` buffers all data from input and sort these rows by specified order, providing the /// collation required by user or parent plan node. diff --git a/rust/frontend/src/optimizer/plan_node/mod.rs b/rust/frontend/src/optimizer/plan_node/mod.rs index cde44398d0955..1faff27ca068c 100644 --- a/rust/frontend/src/optimizer/plan_node/mod.rs +++ b/rust/frontend/src/optimizer/plan_node/mod.rs @@ -38,7 +38,7 @@ use risingwave_common::error::{ErrorCode, Result}; use risingwave_pb::plan::PlanNode as BatchPlanProst; use risingwave_pb::stream_plan::StreamNode as StreamPlanProst; -use super::property::{WithConvention, WithDistribution, WithOrder, WithSchema}; +use super::property::{Order, WithConvention, WithDistribution, WithSchema}; /// The common trait over all plan nodes. Used by optimizer framework which will treate all node as /// `dyn PlanNode` @@ -51,7 +51,6 @@ pub trait PlanNode: + Display + Downcast + WithConvention - + WithOrder + WithDistribution + WithSchema + WithContext @@ -90,6 +89,10 @@ impl dyn PlanNode { Ok(output) } + pub fn order(&self) -> &Order { + &self.plan_base().order + } + pub fn pk_indices(&self) -> &[usize] { &self.plan_base().pk_indices } diff --git a/rust/frontend/src/optimizer/property/order.rs b/rust/frontend/src/optimizer/property/order.rs index 1f823e54abb4e..027302076e4de 100644 --- a/rust/frontend/src/optimizer/property/order.rs +++ b/rust/frontend/src/optimizer/property/order.rs @@ -23,7 +23,9 @@ use risingwave_pb::plan::OrderType as ProstOrderType; use super::super::plan_node::*; use super::Convention; use crate::optimizer::PlanRef; -use crate::{for_batch_plan_nodes, for_logical_plan_nodes, for_stream_plan_nodes}; +use crate::{ + for_all_plan_nodes, for_batch_plan_nodes, for_logical_plan_nodes, for_stream_plan_nodes, +}; #[derive(Debug, Clone, Default)] pub struct Order { @@ -195,37 +197,18 @@ impl Order { } } -pub trait WithOrder { - /// the order property of the [`PlanNode`]'s output - fn order(&self) -> &Order; -} - -macro_rules! impl_with_order_base { +macro_rules! impl_order_with_base { ([], $( { $convention:ident, $name:ident }),*) => { $(paste! { - impl WithOrder for [<$convention $name>] { - fn order(&self) -> &Order { + impl [<$convention $name>] { + pub fn order(&self) -> &Order { &self.base.order } } })* } } -for_batch_plan_nodes! { impl_with_order_base } - -macro_rules! impl_with_order_any { - ([], $( { $convention:ident, $name:ident }),*) => { - $(paste! { - impl WithOrder for [<$convention $name>] { - fn order(&self) -> &Order { - Order::any() - } - } - })* - } -} -for_logical_plan_nodes! { impl_with_order_any } -for_stream_plan_nodes! { impl_with_order_any } +for_all_plan_nodes! { impl_order_with_base } #[cfg(test)] mod tests { From dcfcc12a387bac811f315f12d1ba56c01f2be6fd Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 29 Mar 2022 16:41:54 +0800 Subject: [PATCH 05/12] clippy fix --- rust/frontend/src/optimizer/property/order.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/rust/frontend/src/optimizer/property/order.rs b/rust/frontend/src/optimizer/property/order.rs index 027302076e4de..37afc51c7c02f 100644 --- a/rust/frontend/src/optimizer/property/order.rs +++ b/rust/frontend/src/optimizer/property/order.rs @@ -22,10 +22,8 @@ use risingwave_pb::plan::OrderType as ProstOrderType; use super::super::plan_node::*; use super::Convention; +use crate::for_all_plan_nodes; use crate::optimizer::PlanRef; -use crate::{ - for_all_plan_nodes, for_batch_plan_nodes, for_logical_plan_nodes, for_stream_plan_nodes, -}; #[derive(Debug, Clone, Default)] pub struct Order { From e586466a3f7017df7a47700858cedbfd2162297d Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 29 Mar 2022 16:45:06 +0800 Subject: [PATCH 06/12] remove WithDistribution --- .../src/optimizer/plan_node/batch_exchange.rs | 2 +- rust/frontend/src/optimizer/plan_node/mod.rs | 6 ++-- .../optimizer/plan_node/stream_exchange.rs | 2 +- .../src/optimizer/property/distribution.rs | 30 +++++-------------- 4 files changed, 13 insertions(+), 27 deletions(-) diff --git a/rust/frontend/src/optimizer/plan_node/batch_exchange.rs b/rust/frontend/src/optimizer/plan_node/batch_exchange.rs index cc34c813f9fde..41b06a0eb11ce 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_exchange.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_exchange.rs @@ -18,7 +18,7 @@ use risingwave_pb::plan::plan_node::NodeBody; use risingwave_pb::plan::ExchangeNode; use super::{PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch}; -use crate::optimizer::property::{Distribution, Order, WithDistribution}; +use crate::optimizer::property::{Distribution, Order}; /// `BatchExchange` imposes a particular distribution on its input /// without changing its content. diff --git a/rust/frontend/src/optimizer/plan_node/mod.rs b/rust/frontend/src/optimizer/plan_node/mod.rs index 1faff27ca068c..66f72ed49d602 100644 --- a/rust/frontend/src/optimizer/plan_node/mod.rs +++ b/rust/frontend/src/optimizer/plan_node/mod.rs @@ -38,7 +38,7 @@ use risingwave_common::error::{ErrorCode, Result}; use risingwave_pb::plan::PlanNode as BatchPlanProst; use risingwave_pb::stream_plan::StreamNode as StreamPlanProst; -use super::property::{Order, WithConvention, WithDistribution, WithSchema}; +use super::property::{Distribution, Order, WithConvention, WithSchema}; /// The common trait over all plan nodes. Used by optimizer framework which will treate all node as /// `dyn PlanNode` @@ -51,7 +51,6 @@ pub trait PlanNode: + Display + Downcast + WithConvention - + WithDistribution + WithSchema + WithContext + WithId @@ -92,6 +91,9 @@ impl dyn PlanNode { pub fn order(&self) -> &Order { &self.plan_base().order } + pub fn distribution(&self) -> &Distribution { + &self.plan_base().dist + } pub fn pk_indices(&self) -> &[usize] { &self.plan_base().pk_indices diff --git a/rust/frontend/src/optimizer/plan_node/stream_exchange.rs b/rust/frontend/src/optimizer/plan_node/stream_exchange.rs index 28078da56e0f7..bf5acc2975e30 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -18,7 +18,7 @@ use risingwave_pb::stream_plan::stream_node::Node; use risingwave_pb::stream_plan::{DispatchStrategy, DispatcherType, ExchangeNode}; use super::{PlanBase, PlanRef, PlanTreeNodeUnary, ToStreamProst}; -use crate::optimizer::property::{Distribution, WithDistribution, WithSchema}; +use crate::optimizer::property::{Distribution, WithSchema}; /// `StreamExchange` imposes a particular distribution on its input /// without changing its content. diff --git a/rust/frontend/src/optimizer/property/distribution.rs b/rust/frontend/src/optimizer/property/distribution.rs index 79ce456d9fc5e..334d040676d3e 100644 --- a/rust/frontend/src/optimizer/property/distribution.rs +++ b/rust/frontend/src/optimizer/property/distribution.rs @@ -21,7 +21,9 @@ use risingwave_pb::plan::ExchangeInfo; use super::super::plan_node::*; use crate::optimizer::property::{Convention, Order}; use crate::optimizer::PlanRef; -use crate::{for_batch_plan_nodes, for_logical_plan_nodes, for_stream_plan_nodes}; +use crate::{ + for_all_plan_nodes, for_batch_plan_nodes, for_logical_plan_nodes, for_stream_plan_nodes, +}; #[derive(Debug, Clone, PartialEq)] pub enum Distribution { @@ -125,36 +127,18 @@ impl Distribution { } } -pub trait WithDistribution { - /// the distribution property of the [`PlanNode`]'s output - fn distribution(&self) -> &Distribution; -} - -macro_rules! impl_with_dist_base { +macro_rules! impl_dist_with_base { ([], $( { $convention:ident, $name:ident }),*) => { $(paste! { - impl WithDistribution for [<$convention $name>] { - fn distribution(&self) -> &Distribution { + impl [<$convention $name>] { + pub fn distribution(&self) -> &Distribution { &self.base.dist } } })* } } -for_batch_plan_nodes! { impl_with_dist_base } -for_stream_plan_nodes! { impl_with_dist_base } -macro_rules! impl_with_dist_any { - ([], $( { $convention:ident, $name:ident }),*) => { - $(paste! { - impl WithDistribution for [<$convention $name>] { - fn distribution(&self) -> &Distribution { - Distribution::any() - } - } - })* - } -} -for_logical_plan_nodes! { impl_with_dist_any } +for_all_plan_nodes! { impl_dist_with_base } #[cfg(test)] mod tests { From 0085c2660d36395b9458afa7ab7aa4615e4adfe8 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 29 Mar 2022 16:45:57 +0800 Subject: [PATCH 07/12] clippy fix --- rust/frontend/src/optimizer/property/distribution.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/rust/frontend/src/optimizer/property/distribution.rs b/rust/frontend/src/optimizer/property/distribution.rs index 334d040676d3e..4f0afa8194abb 100644 --- a/rust/frontend/src/optimizer/property/distribution.rs +++ b/rust/frontend/src/optimizer/property/distribution.rs @@ -19,11 +19,9 @@ use risingwave_pb::plan::exchange_info::{ use risingwave_pb::plan::ExchangeInfo; use super::super::plan_node::*; +use crate::for_all_plan_nodes; use crate::optimizer::property::{Convention, Order}; use crate::optimizer::PlanRef; -use crate::{ - for_all_plan_nodes, for_batch_plan_nodes, for_logical_plan_nodes, for_stream_plan_nodes, -}; #[derive(Debug, Clone, PartialEq)] pub enum Distribution { From 125d4788b29de9ece85aa72478932e0c5fa7e71c Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 29 Mar 2022 17:27:14 +0800 Subject: [PATCH 08/12] add base delegate --- .../src/optimizer/plan_node/plan_base.rs | 20 ++++++++++++++++++- .../src/optimizer/property/distribution.rs | 15 -------------- rust/frontend/src/optimizer/property/order.rs | 15 -------------- 3 files changed, 19 insertions(+), 31 deletions(-) diff --git a/rust/frontend/src/optimizer/plan_node/plan_base.rs b/rust/frontend/src/optimizer/plan_node/plan_base.rs index 956f83bc2e01b..ea16f27d3ffae 100644 --- a/rust/frontend/src/optimizer/plan_node/plan_base.rs +++ b/rust/frontend/src/optimizer/plan_node/plan_base.rs @@ -12,9 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use paste::paste; use risingwave_common::catalog::Schema; -use super::PlanNodeId; +use super::*; +use crate::for_all_plan_nodes; use crate::optimizer::property::{Distribution, Order}; use crate::session::OptimizerContextRef; @@ -89,3 +91,19 @@ impl PlanBase { } } } +macro_rules! impl_base_delegate { + ([], $( { $convention:ident, $name:ident }),*) => { + $(paste! { + impl [<$convention $name>] { + pub fn order(&self) -> &Order { + &self.base.order + } + pub fn distribution(&self) -> &Distribution { + &self.base.dist + } + + } + })* + } +} +for_all_plan_nodes! { impl_base_delegate } diff --git a/rust/frontend/src/optimizer/property/distribution.rs b/rust/frontend/src/optimizer/property/distribution.rs index 4f0afa8194abb..46449d72c939e 100644 --- a/rust/frontend/src/optimizer/property/distribution.rs +++ b/rust/frontend/src/optimizer/property/distribution.rs @@ -12,14 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use paste::paste; use risingwave_pb::plan::exchange_info::{ BroadcastInfo, Distribution as DistributionProst, DistributionMode, HashInfo, }; use risingwave_pb::plan::ExchangeInfo; use super::super::plan_node::*; -use crate::for_all_plan_nodes; use crate::optimizer::property::{Convention, Order}; use crate::optimizer::PlanRef; @@ -125,19 +123,6 @@ impl Distribution { } } -macro_rules! impl_dist_with_base { - ([], $( { $convention:ident, $name:ident }),*) => { - $(paste! { - impl [<$convention $name>] { - pub fn distribution(&self) -> &Distribution { - &self.base.dist - } - } - })* - } -} -for_all_plan_nodes! { impl_dist_with_base } - #[cfg(test)] mod tests { use super::Distribution; diff --git a/rust/frontend/src/optimizer/property/order.rs b/rust/frontend/src/optimizer/property/order.rs index 37afc51c7c02f..d9a8c97d8c036 100644 --- a/rust/frontend/src/optimizer/property/order.rs +++ b/rust/frontend/src/optimizer/property/order.rs @@ -15,14 +15,12 @@ use std::fmt; use itertools::Itertools; -use paste::paste; use risingwave_common::util::sort_util::OrderType; use risingwave_pb::expr::InputRefExpr; use risingwave_pb::plan::OrderType as ProstOrderType; use super::super::plan_node::*; use super::Convention; -use crate::for_all_plan_nodes; use crate::optimizer::PlanRef; #[derive(Debug, Clone, Default)] @@ -195,19 +193,6 @@ impl Order { } } -macro_rules! impl_order_with_base { - ([], $( { $convention:ident, $name:ident }),*) => { - $(paste! { - impl [<$convention $name>] { - pub fn order(&self) -> &Order { - &self.base.order - } - } - })* - } -} -for_all_plan_nodes! { impl_order_with_base } - #[cfg(test)] mod tests { use super::{Direction, FieldOrder, Order}; From 36e090a001fb7fd8f9673b22ea255fcb0db3a13c Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 29 Mar 2022 19:02:00 +0800 Subject: [PATCH 09/12] remove WithId WithCtx --- .../src/optimizer/plan_node/logical_agg.rs | 1 - .../src/optimizer/plan_node/logical_filter.rs | 1 - rust/frontend/src/optimizer/plan_node/mod.rs | 18 +++--- .../src/optimizer/plan_node/plan_base.rs | 17 +++++- rust/frontend/src/optimizer/property/ctx.rs | 58 ------------------- rust/frontend/src/optimizer/property/mod.rs | 2 - 6 files changed, 24 insertions(+), 73 deletions(-) delete mode 100644 rust/frontend/src/optimizer/property/ctx.rs diff --git a/rust/frontend/src/optimizer/plan_node/logical_agg.rs b/rust/frontend/src/optimizer/plan_node/logical_agg.rs index 4c72979a45308..b49fbb40a92e3 100644 --- a/rust/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/rust/frontend/src/optimizer/plan_node/logical_agg.rs @@ -544,7 +544,6 @@ mod tests { assert_eq_input_ref, input_ref_to_column_indices, AggCall, ExprType, FunctionCall, }; use crate::optimizer::plan_node::LogicalValues; - use crate::optimizer::property::ctx::WithId; use crate::session::OptimizerContext; #[tokio::test] diff --git a/rust/frontend/src/optimizer/plan_node/logical_filter.rs b/rust/frontend/src/optimizer/plan_node/logical_filter.rs index baa3c060e40eb..6d1dfd729997c 100644 --- a/rust/frontend/src/optimizer/plan_node/logical_filter.rs +++ b/rust/frontend/src/optimizer/plan_node/logical_filter.rs @@ -148,7 +148,6 @@ mod tests { use super::*; use crate::expr::{assert_eq_input_ref, FunctionCall, InputRef, Literal}; use crate::optimizer::plan_node::LogicalValues; - use crate::optimizer::property::ctx::WithId; use crate::session::OptimizerContext; #[tokio::test] diff --git a/rust/frontend/src/optimizer/plan_node/mod.rs b/rust/frontend/src/optimizer/plan_node/mod.rs index 66f72ed49d602..dd1c0196a3742 100644 --- a/rust/frontend/src/optimizer/plan_node/mod.rs +++ b/rust/frontend/src/optimizer/plan_node/mod.rs @@ -52,8 +52,6 @@ pub trait PlanNode: + Downcast + WithConvention + WithSchema - + WithContext - + WithId + ColPrunable + ToBatch + ToStream @@ -88,17 +86,21 @@ impl dyn PlanNode { Ok(output) } + pub fn id(&self) -> PlanNodeId { + self.plan_base().id + } + pub fn ctx(&self) -> OptimizerContextRef { + self.plan_base().ctx.clone() + } + pub fn pk_indices(&self) -> &[usize] { + &self.plan_base().pk_indices + } pub fn order(&self) -> &Order { &self.plan_base().order } pub fn distribution(&self) -> &Distribution { &self.plan_base().dist } - - pub fn pk_indices(&self) -> &[usize] { - &self.plan_base().pk_indices - } - pub fn append_only(&self) -> bool { self.plan_base().append_only } @@ -244,7 +246,7 @@ pub use stream_simple_agg::StreamSimpleAgg; pub use stream_source::StreamSource; pub use stream_table_scan::StreamTableScan; -use crate::optimizer::property::{WithContext, WithId}; +use crate::session::OptimizerContextRef; /// [`for_all_plan_nodes`] includes all plan nodes. If you added a new plan node /// inside the project, be sure to add here and in its conventions like [`for_logical_plan_nodes`] diff --git a/rust/frontend/src/optimizer/plan_node/plan_base.rs b/rust/frontend/src/optimizer/plan_node/plan_base.rs index ea16f27d3ffae..204b7e792d104 100644 --- a/rust/frontend/src/optimizer/plan_node/plan_base.rs +++ b/rust/frontend/src/optimizer/plan_node/plan_base.rs @@ -95,13 +95,24 @@ macro_rules! impl_base_delegate { ([], $( { $convention:ident, $name:ident }),*) => { $(paste! { impl [<$convention $name>] { + pub fn id(&self) -> PlanNodeId { + self.plan_base().id + } + pub fn ctx(&self) -> OptimizerContextRef { + self.plan_base().ctx.clone() + } + pub fn pk_indices(&self) -> &[usize] { + &self.plan_base().pk_indices + } pub fn order(&self) -> &Order { - &self.base.order + &self.plan_base().order } pub fn distribution(&self) -> &Distribution { - &self.base.dist + &self.plan_base().dist + } + pub fn append_only(&self) -> bool { + self.plan_base().append_only } - } })* } diff --git a/rust/frontend/src/optimizer/property/ctx.rs b/rust/frontend/src/optimizer/property/ctx.rs deleted file mode 100644 index a470fbd1c90ae..0000000000000 --- a/rust/frontend/src/optimizer/property/ctx.rs +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright 2022 Singularity Data -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use paste::paste; - -use super::super::plan_node::*; -use crate::optimizer::plan_node::PlanNodeId; -use crate::session::OptimizerContextRef; -use crate::{for_batch_plan_nodes, for_logical_plan_nodes, for_stream_plan_nodes}; - -pub trait WithContext { - fn ctx(&self) -> OptimizerContextRef; -} - -macro_rules! impl_with_ctx { - ([], $( { $convention:ident, $name:ident }),*) => { - $(paste! { - impl WithContext for [<$convention $name>] { - fn ctx(&self) -> OptimizerContextRef { - self.base.ctx.clone() - } - } - })* - } -} -for_batch_plan_nodes! { impl_with_ctx } -for_logical_plan_nodes! { impl_with_ctx } -for_stream_plan_nodes! { impl_with_ctx } - -pub trait WithId { - fn id(&self) -> PlanNodeId; -} - -macro_rules! impl_with_id { - ([], $( { $convention:ident, $name:ident }),*) => { - $(paste! { - impl WithId for [<$convention $name>] { - fn id(&self) -> PlanNodeId { - self.base.id - } - } - })* - } -} -for_batch_plan_nodes! { impl_with_id } -for_logical_plan_nodes! { impl_with_id } -for_stream_plan_nodes! { impl_with_id } diff --git a/rust/frontend/src/optimizer/property/mod.rs b/rust/frontend/src/optimizer/property/mod.rs index 3c2575d7ae39c..032274b154f7c 100644 --- a/rust/frontend/src/optimizer/property/mod.rs +++ b/rust/frontend/src/optimizer/property/mod.rs @@ -31,5 +31,3 @@ mod distribution; pub use distribution::*; mod schema; pub use schema::*; -pub mod ctx; -pub use ctx::*; From 186daceb2a1d686be20afe487393438aceec6f21 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 29 Mar 2022 19:23:36 +0800 Subject: [PATCH 10/12] remove convention --- rust/frontend/src/optimizer/mod.rs | 3 +- rust/frontend/src/optimizer/plan_node/mod.rs | 14 +++++- rust/frontend/src/optimizer/plan_rewriter.rs | 1 - rust/frontend/src/optimizer/plan_visitor.rs | 1 - .../src/optimizer/property/convention.rs | 43 ------------------- .../src/optimizer/property/distribution.rs | 2 +- rust/frontend/src/optimizer/property/mod.rs | 2 - rust/frontend/src/optimizer/property/order.rs | 1 - 8 files changed, 14 insertions(+), 53 deletions(-) delete mode 100644 rust/frontend/src/optimizer/property/convention.rs diff --git a/rust/frontend/src/optimizer/mod.rs b/rust/frontend/src/optimizer/mod.rs index ac278f697ea4b..e41f3866ee53b 100644 --- a/rust/frontend/src/optimizer/mod.rs +++ b/rust/frontend/src/optimizer/mod.rs @@ -28,8 +28,7 @@ use risingwave_common::catalog::Schema; use risingwave_common::error::Result; use self::heuristic::{ApplyOrder, HeuristicOptimizer}; -use self::plan_node::{LogicalProject, StreamMaterialize}; -use self::property::Convention; +use self::plan_node::{Convention, LogicalProject, StreamMaterialize}; use self::rule::*; use crate::expr::InputRef; diff --git a/rust/frontend/src/optimizer/plan_node/mod.rs b/rust/frontend/src/optimizer/plan_node/mod.rs index dd1c0196a3742..53ddf1feacd2b 100644 --- a/rust/frontend/src/optimizer/plan_node/mod.rs +++ b/rust/frontend/src/optimizer/plan_node/mod.rs @@ -38,7 +38,7 @@ use risingwave_common::error::{ErrorCode, Result}; use risingwave_pb::plan::PlanNode as BatchPlanProst; use risingwave_pb::stream_plan::StreamNode as StreamPlanProst; -use super::property::{Distribution, Order, WithConvention, WithSchema}; +use super::property::{Distribution, Order, WithSchema}; /// The common trait over all plan nodes. Used by optimizer framework which will treate all node as /// `dyn PlanNode` @@ -50,7 +50,6 @@ pub trait PlanNode: + Debug + Display + Downcast - + WithConvention + WithSchema + ColPrunable + ToBatch @@ -60,6 +59,7 @@ pub trait PlanNode: { fn node_type(&self) -> PlanNodeType; fn plan_base(&self) -> &PlanBase; + fn convention(&self) -> Convention; } impl_downcast!(PlanNode); @@ -68,6 +68,13 @@ pub type PlanRef = Rc; #[derive(Clone, Debug, Copy)] pub struct PlanNodeId(pub i32); +#[derive(Debug, PartialEq)] +pub enum Convention { + Logical, + Batch, + Stream, +} + impl dyn PlanNode { /// Write explain the whole plan tree. pub fn explain(&self, level: usize, f: &mut impl std::fmt::Write) -> std::fmt::Result { @@ -382,6 +389,9 @@ macro_rules! enum_plan_node_type { fn plan_base(&self) -> &PlanBase { &self.base } + fn convention(&self) -> Convention { + Convention::$convention + } })* } } diff --git a/rust/frontend/src/optimizer/plan_rewriter.rs b/rust/frontend/src/optimizer/plan_rewriter.rs index 7196e400cfaa0..d37e9084ec6e3 100644 --- a/rust/frontend/src/optimizer/plan_rewriter.rs +++ b/rust/frontend/src/optimizer/plan_rewriter.rs @@ -17,7 +17,6 @@ use paste::paste; use crate::for_all_plan_nodes; use crate::optimizer::plan_node::*; -use crate::optimizer::property::Convention; /// Define `PlanRewriter` trait. macro_rules! def_rewriter { diff --git a/rust/frontend/src/optimizer/plan_visitor.rs b/rust/frontend/src/optimizer/plan_visitor.rs index 79cf20977f298..4f164c54574c8 100644 --- a/rust/frontend/src/optimizer/plan_visitor.rs +++ b/rust/frontend/src/optimizer/plan_visitor.rs @@ -16,7 +16,6 @@ use paste::paste; use crate::for_all_plan_nodes; use crate::optimizer::plan_node::*; -use crate::optimizer::property::Convention; /// Define `PlanVisitor` trait. macro_rules! def_visitor { diff --git a/rust/frontend/src/optimizer/property/convention.rs b/rust/frontend/src/optimizer/property/convention.rs deleted file mode 100644 index 5003ff48e9a2a..0000000000000 --- a/rust/frontend/src/optimizer/property/convention.rs +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2022 Singularity Data -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use paste::paste; - -use super::super::plan_node::*; -use crate::for_all_plan_nodes; - -#[derive(Debug, PartialEq)] -pub enum Convention { - Logical, - Batch, - Stream, -} - -pub trait WithConvention { - fn convention(&self) -> Convention; -} - -/// Define module for each node. -macro_rules! impl_convention_for_plan_node { - ([], $({ $convention:ident, $name:ident }),*) => { - $(paste! { - impl WithConvention for [<$convention $name>] { - fn convention(&self) -> Convention { - Convention::$convention - } - } - })* - } -} -for_all_plan_nodes! { impl_convention_for_plan_node } diff --git a/rust/frontend/src/optimizer/property/distribution.rs b/rust/frontend/src/optimizer/property/distribution.rs index 46449d72c939e..cb5dc852848bb 100644 --- a/rust/frontend/src/optimizer/property/distribution.rs +++ b/rust/frontend/src/optimizer/property/distribution.rs @@ -18,7 +18,7 @@ use risingwave_pb::plan::exchange_info::{ use risingwave_pb::plan::ExchangeInfo; use super::super::plan_node::*; -use crate::optimizer::property::{Convention, Order}; +use crate::optimizer::property::Order; use crate::optimizer::PlanRef; #[derive(Debug, Clone, PartialEq)] diff --git a/rust/frontend/src/optimizer/property/mod.rs b/rust/frontend/src/optimizer/property/mod.rs index 032274b154f7c..b26a43157a738 100644 --- a/rust/frontend/src/optimizer/property/mod.rs +++ b/rust/frontend/src/optimizer/property/mod.rs @@ -23,8 +23,6 @@ //! (such as an optimizer based on the Volcano/Cascades model). //! //! [PlanNode]: super::plan_node::PlanNode -mod convention; -pub use convention::*; pub(crate) mod order; pub use order::*; mod distribution; diff --git a/rust/frontend/src/optimizer/property/order.rs b/rust/frontend/src/optimizer/property/order.rs index d9a8c97d8c036..75ee49f9cc80f 100644 --- a/rust/frontend/src/optimizer/property/order.rs +++ b/rust/frontend/src/optimizer/property/order.rs @@ -20,7 +20,6 @@ use risingwave_pb::expr::InputRefExpr; use risingwave_pb::plan::OrderType as ProstOrderType; use super::super::plan_node::*; -use super::Convention; use crate::optimizer::PlanRef; #[derive(Debug, Clone, Default)] From 01896fc8824559e6f80fde9e8171084fe099e4bb Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 29 Mar 2022 19:30:02 +0800 Subject: [PATCH 11/12] remove WithSchema --- .../src/optimizer/plan_node/batch_delete.rs | 2 +- .../src/optimizer/plan_node/batch_filter.rs | 2 +- .../src/optimizer/plan_node/batch_hash_agg.rs | 2 +- .../optimizer/plan_node/batch_hash_join.rs | 2 +- .../src/optimizer/plan_node/batch_insert.rs | 2 +- .../src/optimizer/plan_node/batch_limit.rs | 2 +- .../src/optimizer/plan_node/batch_project.rs | 2 +- .../src/optimizer/plan_node/batch_seq_scan.rs | 2 +- .../optimizer/plan_node/batch_simple_agg.rs | 2 +- .../src/optimizer/plan_node/batch_sort.rs | 2 +- .../src/optimizer/plan_node/batch_values.rs | 2 +- .../src/optimizer/plan_node/logical_agg.rs | 6 +-- .../src/optimizer/plan_node/logical_filter.rs | 5 +- .../src/optimizer/plan_node/logical_join.rs | 7 ++- .../src/optimizer/plan_node/logical_limit.rs | 5 +- .../optimizer/plan_node/logical_project.rs | 6 +-- .../src/optimizer/plan_node/logical_scan.rs | 3 +- .../src/optimizer/plan_node/logical_topn.rs | 4 +- .../src/optimizer/plan_node/logical_values.rs | 3 +- rust/frontend/src/optimizer/plan_node/mod.rs | 19 +++++++- .../src/optimizer/plan_node/plan_base.rs | 3 ++ .../optimizer/plan_node/stream_exchange.rs | 2 +- .../src/optimizer/plan_node/stream_filter.rs | 1 - .../optimizer/plan_node/stream_hash_agg.rs | 2 +- .../optimizer/plan_node/stream_hash_join.rs | 2 +- .../src/optimizer/plan_node/stream_project.rs | 1 - .../optimizer/plan_node/stream_simple_agg.rs | 2 +- .../src/optimizer/plan_node/stream_source.rs | 2 +- .../optimizer/plan_node/stream_table_scan.rs | 2 +- rust/frontend/src/optimizer/property/mod.rs | 2 - .../frontend/src/optimizer/property/schema.rs | 48 ------------------- .../frontend/src/optimizer/rule/filter_agg.rs | 1 - 32 files changed, 55 insertions(+), 93 deletions(-) delete mode 100644 rust/frontend/src/optimizer/property/schema.rs diff --git a/rust/frontend/src/optimizer/plan_node/batch_delete.rs b/rust/frontend/src/optimizer/plan_node/batch_delete.rs index ad4cc554872df..aaca5d31cac3a 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_delete.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_delete.rs @@ -20,7 +20,7 @@ use risingwave_pb::plan::{DeleteNode, TableRefId}; use super::{ LogicalDelete, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch, }; -use crate::optimizer::property::{Distribution, Order, WithSchema}; +use crate::optimizer::property::{Distribution, Order}; /// `BatchDelete` implements [`LogicalDelete`] #[derive(Debug, Clone)] diff --git a/rust/frontend/src/optimizer/plan_node/batch_filter.rs b/rust/frontend/src/optimizer/plan_node/batch_filter.rs index c563a90bec613..184884f6f1bc7 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_filter.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_filter.rs @@ -20,7 +20,7 @@ use risingwave_pb::plan::FilterNode; use super::{LogicalFilter, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch}; use crate::expr::Expr; use crate::optimizer::plan_node::PlanBase; -use crate::optimizer::property::{Distribution, WithSchema}; +use crate::optimizer::property::Distribution; use crate::utils::Condition; /// `BatchFilter` implements [`super::LogicalFilter`] diff --git a/rust/frontend/src/optimizer/plan_node/batch_hash_agg.rs b/rust/frontend/src/optimizer/plan_node/batch_hash_agg.rs index f0821282e071d..a89573bb5345c 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_hash_agg.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_hash_agg.rs @@ -21,7 +21,7 @@ use risingwave_pb::plan::HashAggNode; use super::logical_agg::PlanAggCall; use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch}; use crate::expr::InputRefDisplay; -use crate::optimizer::property::{Distribution, Order, WithSchema}; +use crate::optimizer::property::{Distribution, Order}; #[derive(Debug, Clone)] pub struct BatchHashAgg { diff --git a/rust/frontend/src/optimizer/plan_node/batch_hash_join.rs b/rust/frontend/src/optimizer/plan_node/batch_hash_join.rs index 9a123880cb87f..ee9ef32ae8412 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_hash_join.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_hash_join.rs @@ -21,7 +21,7 @@ use super::{ EqJoinPredicate, LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, ToBatchProst, ToDistributedBatch, }; -use crate::optimizer::property::{Distribution, Order, WithSchema}; +use crate::optimizer::property::{Distribution, Order}; use crate::utils::ColIndexMapping; /// `BatchHashJoin` implements [`super::LogicalJoin`] with hash table. It builds a hash table diff --git a/rust/frontend/src/optimizer/plan_node/batch_insert.rs b/rust/frontend/src/optimizer/plan_node/batch_insert.rs index 0096a70bf2d2c..86b12f07c06fb 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_insert.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_insert.rs @@ -19,7 +19,7 @@ use risingwave_pb::plan::{InsertNode, TableRefId}; use super::{LogicalInsert, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch}; use crate::optimizer::plan_node::PlanBase; -use crate::optimizer::property::{Distribution, Order, WithSchema}; +use crate::optimizer::property::{Distribution, Order}; /// `BatchInsert` implements [`LogicalInsert`] #[derive(Debug, Clone)] diff --git a/rust/frontend/src/optimizer/plan_node/batch_limit.rs b/rust/frontend/src/optimizer/plan_node/batch_limit.rs index c761ae46bb87a..b8f50dd4e58c6 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_limit.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_limit.rs @@ -18,7 +18,7 @@ use risingwave_pb::plan::plan_node::NodeBody; use risingwave_pb::plan::LimitNode; use super::{LogicalLimit, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch}; -use crate::optimizer::property::{Distribution, WithSchema}; +use crate::optimizer::property::Distribution; /// `BatchLimit` implements [`super::LogicalLimit`] to fetch specified rows from input #[derive(Debug, Clone)] diff --git a/rust/frontend/src/optimizer/plan_node/batch_project.rs b/rust/frontend/src/optimizer/plan_node/batch_project.rs index a2bcf62fde29e..b7f1b7c15f9b7 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_project.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_project.rs @@ -22,7 +22,7 @@ use super::{ LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch, }; use crate::expr::Expr; -use crate::optimizer::property::{Distribution, Order, WithSchema}; +use crate::optimizer::property::{Distribution, Order}; /// `BatchProject` implements [`super::LogicalProject`] to evaluate specified expressions on input /// rows diff --git a/rust/frontend/src/optimizer/plan_node/batch_seq_scan.rs b/rust/frontend/src/optimizer/plan_node/batch_seq_scan.rs index 2ad96a90b1be1..164caa29934d9 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_seq_scan.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_seq_scan.rs @@ -19,7 +19,7 @@ use risingwave_pb::plan::{CellBasedTableDesc, ColumnDesc as ProstColumnDesc, Row use super::{PlanBase, PlanRef, ToBatchProst, ToDistributedBatch}; use crate::optimizer::plan_node::LogicalScan; -use crate::optimizer::property::{Distribution, Order, WithSchema}; +use crate::optimizer::property::{Distribution, Order}; /// `BatchSeqScan` implements [`super::LogicalScan`] to scan from a row-oriented table #[derive(Debug, Clone)] diff --git a/rust/frontend/src/optimizer/plan_node/batch_simple_agg.rs b/rust/frontend/src/optimizer/plan_node/batch_simple_agg.rs index 6532a512848cc..31f2bb2fe79dc 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_simple_agg.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_simple_agg.rs @@ -19,7 +19,7 @@ use risingwave_pb::plan::SortAggNode; use super::logical_agg::PlanAggCall; use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch}; -use crate::optimizer::property::{Distribution, Order, WithSchema}; +use crate::optimizer::property::{Distribution, Order}; #[derive(Debug, Clone)] pub struct BatchSimpleAgg { diff --git a/rust/frontend/src/optimizer/plan_node/batch_sort.rs b/rust/frontend/src/optimizer/plan_node/batch_sort.rs index deab6f7e70421..7cd412a568517 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_sort.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_sort.rs @@ -19,7 +19,7 @@ use risingwave_pb::plan::plan_node::NodeBody; use risingwave_pb::plan::{ColumnOrder, OrderByNode}; use super::{PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch}; -use crate::optimizer::property::{Distribution, Order, WithSchema}; +use crate::optimizer::property::{Distribution, Order}; /// `BatchSort` buffers all data from input and sort these rows by specified order, providing the /// collation required by user or parent plan node. diff --git a/rust/frontend/src/optimizer/plan_node/batch_values.rs b/rust/frontend/src/optimizer/plan_node/batch_values.rs index 8e89d6da089b4..987e0eb1c4828 100644 --- a/rust/frontend/src/optimizer/plan_node/batch_values.rs +++ b/rust/frontend/src/optimizer/plan_node/batch_values.rs @@ -20,7 +20,7 @@ use risingwave_pb::plan::ValuesNode; use super::{LogicalValues, PlanBase, PlanRef, PlanTreeNodeLeaf, ToBatchProst, ToDistributedBatch}; use crate::expr::{Expr, ExprImpl}; -use crate::optimizer::property::{Distribution, Order, WithSchema}; +use crate::optimizer::property::{Distribution, Order}; #[derive(Debug, Clone)] pub struct BatchValues { diff --git a/rust/frontend/src/optimizer/plan_node/logical_agg.rs b/rust/frontend/src/optimizer/plan_node/logical_agg.rs index b49fbb40a92e3..31e5b424836fa 100644 --- a/rust/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/rust/frontend/src/optimizer/plan_node/logical_agg.rs @@ -24,12 +24,12 @@ use risingwave_common::types::DataType; use risingwave_pb::expr::AggCall as ProstAggCall; use super::{ - BatchHashAgg, BatchSimpleAgg, ColPrunable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamHashAgg, - StreamSimpleAgg, ToBatch, ToStream, + BatchHashAgg, BatchSimpleAgg, ColPrunable, PlanBase, PlanNode, PlanRef, PlanTreeNodeUnary, + StreamHashAgg, StreamSimpleAgg, ToBatch, ToStream, }; use crate::expr::{AggCall, Expr, ExprImpl, ExprRewriter, ExprType, FunctionCall, InputRef}; use crate::optimizer::plan_node::LogicalProject; -use crate::optimizer::property::{Distribution, WithSchema}; +use crate::optimizer::property::Distribution; use crate::utils::ColIndexMapping; /// Aggregation Call diff --git a/rust/frontend/src/optimizer/plan_node/logical_filter.rs b/rust/frontend/src/optimizer/plan_node/logical_filter.rs index 6d1dfd729997c..1c429db902daf 100644 --- a/rust/frontend/src/optimizer/plan_node/logical_filter.rs +++ b/rust/frontend/src/optimizer/plan_node/logical_filter.rs @@ -18,12 +18,11 @@ use fixedbitset::FixedBitSet; use risingwave_common::error::Result; use super::{ - ColPrunable, CollectInputRef, LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatch, - ToStream, + ColPrunable, CollectInputRef, LogicalProject, PlanBase, PlanNode, PlanRef, PlanTreeNodeUnary, + ToBatch, ToStream, }; use crate::expr::{assert_input_ref, ExprImpl}; use crate::optimizer::plan_node::{BatchFilter, StreamFilter}; -use crate::optimizer::property::WithSchema; use crate::utils::{ColIndexMapping, Condition}; /// `LogicalFilter` iterates over its input and returns elements for which `predicate` evaluates to diff --git a/rust/frontend/src/optimizer/plan_node/logical_join.rs b/rust/frontend/src/optimizer/plan_node/logical_join.rs index cd906b91ffbc0..b59208967b224 100644 --- a/rust/frontend/src/optimizer/plan_node/logical_join.rs +++ b/rust/frontend/src/optimizer/plan_node/logical_join.rs @@ -19,14 +19,14 @@ use risingwave_common::catalog::Schema; use risingwave_pb::plan::JoinType; use super::{ - ColPrunable, LogicalProject, PlanBase, PlanRef, PlanTreeNodeBinary, StreamHashJoin, ToBatch, - ToStream, + ColPrunable, LogicalProject, PlanBase, PlanNode, PlanRef, PlanTreeNodeBinary, StreamHashJoin, + ToBatch, ToStream, }; use crate::expr::ExprImpl; use crate::optimizer::plan_node::{ BatchFilter, BatchHashJoin, CollectInputRef, EqJoinPredicate, LogicalFilter, StreamFilter, }; -use crate::optimizer::property::{Distribution, WithSchema}; +use crate::optimizer::property::Distribution; use crate::utils::{ColIndexMapping, Condition}; /// `LogicalJoin` combines two relations according to some condition. @@ -408,7 +408,6 @@ mod tests { use super::*; use crate::expr::{assert_eq_input_ref, FunctionCall, InputRef, Literal}; use crate::optimizer::plan_node::{LogicalValues, PlanTreeNodeUnary}; - use crate::optimizer::property::WithSchema; use crate::session::OptimizerContext; /// Pruning diff --git a/rust/frontend/src/optimizer/plan_node/logical_limit.rs b/rust/frontend/src/optimizer/plan_node/logical_limit.rs index 68af099aa1d4f..b03fabb7c90d6 100644 --- a/rust/frontend/src/optimizer/plan_node/logical_limit.rs +++ b/rust/frontend/src/optimizer/plan_node/logical_limit.rs @@ -16,8 +16,9 @@ use std::fmt; use fixedbitset::FixedBitSet; -use super::{BatchLimit, ColPrunable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatch, ToStream}; -use crate::optimizer::property::WithSchema; +use super::{ + BatchLimit, ColPrunable, PlanBase, PlanNode, PlanRef, PlanTreeNodeUnary, ToBatch, ToStream, +}; use crate::utils::ColIndexMapping; /// `LogicalLimit` fetches up to `limit` rows from `offset` diff --git a/rust/frontend/src/optimizer/plan_node/logical_project.rs b/rust/frontend/src/optimizer/plan_node/logical_project.rs index f7147e881168e..be2c579005920 100644 --- a/rust/frontend/src/optimizer/plan_node/logical_project.rs +++ b/rust/frontend/src/optimizer/plan_node/logical_project.rs @@ -19,14 +19,14 @@ use itertools::Itertools; use risingwave_common::catalog::{Field, Schema}; use super::{ - BatchProject, ColPrunable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamProject, ToBatch, - ToStream, + BatchProject, ColPrunable, PlanBase, PlanNode, PlanRef, PlanTreeNodeUnary, StreamProject, + ToBatch, ToStream, }; use crate::expr::{ as_alias_display, assert_input_ref, Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef, }; use crate::optimizer::plan_node::CollectInputRef; -use crate::optimizer::property::{Distribution, Order, WithSchema}; +use crate::optimizer::property::{Distribution, Order}; use crate::utils::ColIndexMapping; /// `LogicalProject` computes a set of expressions from its input relation. diff --git a/rust/frontend/src/optimizer/plan_node/logical_scan.rs b/rust/frontend/src/optimizer/plan_node/logical_scan.rs index fc6e4d13a72ff..9e93c653ad4e6 100644 --- a/rust/frontend/src/optimizer/plan_node/logical_scan.rs +++ b/rust/frontend/src/optimizer/plan_node/logical_scan.rs @@ -21,9 +21,8 @@ use itertools::Itertools; use risingwave_common::catalog::{ColumnDesc, Schema, TableDesc}; use risingwave_common::error::Result; -use super::{ColPrunable, PlanBase, PlanRef, StreamTableScan, ToBatch, ToStream}; +use super::{ColPrunable, PlanBase, PlanNode, PlanRef, StreamTableScan, ToBatch, ToStream}; use crate::optimizer::plan_node::BatchSeqScan; -use crate::optimizer::property::WithSchema; use crate::session::OptimizerContextRef; use crate::utils::ColIndexMapping; diff --git a/rust/frontend/src/optimizer/plan_node/logical_topn.rs b/rust/frontend/src/optimizer/plan_node/logical_topn.rs index 263394c00a1bf..83aea52887005 100644 --- a/rust/frontend/src/optimizer/plan_node/logical_topn.rs +++ b/rust/frontend/src/optimizer/plan_node/logical_topn.rs @@ -16,9 +16,9 @@ use std::fmt; use fixedbitset::FixedBitSet; -use super::{ColPrunable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatch, ToStream}; +use super::{ColPrunable, PlanBase, PlanNode, PlanRef, PlanTreeNodeUnary, ToBatch, ToStream}; use crate::optimizer::plan_node::LogicalProject; -use crate::optimizer::property::{FieldOrder, Order, WithSchema}; +use crate::optimizer::property::{FieldOrder, Order}; use crate::utils::ColIndexMapping; /// `LogicalTopN` sorts the input data and fetches up to `limit` rows from `offset` diff --git a/rust/frontend/src/optimizer/plan_node/logical_values.rs b/rust/frontend/src/optimizer/plan_node/logical_values.rs index 1235944f50267..44d3d60b8cb6d 100644 --- a/rust/frontend/src/optimizer/plan_node/logical_values.rs +++ b/rust/frontend/src/optimizer/plan_node/logical_values.rs @@ -18,9 +18,8 @@ use std::{fmt, vec}; use fixedbitset::FixedBitSet; use risingwave_common::catalog::Schema; -use super::{BatchValues, ColPrunable, PlanBase, PlanRef, ToBatch, ToStream}; +use super::{BatchValues, ColPrunable, PlanBase, PlanNode, PlanRef, ToBatch, ToStream}; use crate::expr::{Expr, ExprImpl}; -use crate::optimizer::property::WithSchema; use crate::session::OptimizerContextRef; /// `LogicalValues` builds rows according to a list of expressions diff --git a/rust/frontend/src/optimizer/plan_node/mod.rs b/rust/frontend/src/optimizer/plan_node/mod.rs index 53ddf1feacd2b..32fae8278de82 100644 --- a/rust/frontend/src/optimizer/plan_node/mod.rs +++ b/rust/frontend/src/optimizer/plan_node/mod.rs @@ -33,12 +33,14 @@ use std::rc::Rc; use downcast_rs::{impl_downcast, Downcast}; use dyn_clone::{self, DynClone}; +use fixedbitset::FixedBitSet; use paste::paste; +use risingwave_common::catalog::Schema; use risingwave_common::error::{ErrorCode, Result}; use risingwave_pb::plan::PlanNode as BatchPlanProst; use risingwave_pb::stream_plan::StreamNode as StreamPlanProst; -use super::property::{Distribution, Order, WithSchema}; +use super::property::{Distribution, Order}; /// The common trait over all plan nodes. Used by optimizer framework which will treate all node as /// `dyn PlanNode` @@ -50,7 +52,6 @@ pub trait PlanNode: + Debug + Display + Downcast - + WithSchema + ColPrunable + ToBatch + ToStream @@ -60,6 +61,17 @@ pub trait PlanNode: fn node_type(&self) -> PlanNodeType; fn plan_base(&self) -> &PlanBase; fn convention(&self) -> Convention; + + // TODO: find a better place declear this func + fn must_contain_columns(&self, required_cols: &FixedBitSet) { + // Having equal length also implies: + // required_cols.is_subset(&FixedBitSet::from_iter(0..self.schema().fields().len())) + assert_eq!( + required_cols.len(), + self.plan_base().schema.fields().len(), + "required cols capacity != columns available", + ); + } } impl_downcast!(PlanNode); @@ -99,6 +111,9 @@ impl dyn PlanNode { pub fn ctx(&self) -> OptimizerContextRef { self.plan_base().ctx.clone() } + pub fn schema(&self) -> &Schema { + &self.plan_base().schema + } pub fn pk_indices(&self) -> &[usize] { &self.plan_base().pk_indices } diff --git a/rust/frontend/src/optimizer/plan_node/plan_base.rs b/rust/frontend/src/optimizer/plan_node/plan_base.rs index 204b7e792d104..58a50a5a1cccf 100644 --- a/rust/frontend/src/optimizer/plan_node/plan_base.rs +++ b/rust/frontend/src/optimizer/plan_node/plan_base.rs @@ -101,6 +101,9 @@ macro_rules! impl_base_delegate { pub fn ctx(&self) -> OptimizerContextRef { self.plan_base().ctx.clone() } + pub fn schema(&self) -> &Schema { + &self.plan_base().schema + } pub fn pk_indices(&self) -> &[usize] { &self.plan_base().pk_indices } diff --git a/rust/frontend/src/optimizer/plan_node/stream_exchange.rs b/rust/frontend/src/optimizer/plan_node/stream_exchange.rs index bf5acc2975e30..81a12b746cb46 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -18,7 +18,7 @@ use risingwave_pb::stream_plan::stream_node::Node; use risingwave_pb::stream_plan::{DispatchStrategy, DispatcherType, ExchangeNode}; use super::{PlanBase, PlanRef, PlanTreeNodeUnary, ToStreamProst}; -use crate::optimizer::property::{Distribution, WithSchema}; +use crate::optimizer::property::Distribution; /// `StreamExchange` imposes a particular distribution on its input /// without changing its content. diff --git a/rust/frontend/src/optimizer/plan_node/stream_filter.rs b/rust/frontend/src/optimizer/plan_node/stream_filter.rs index 12ea33ba6cee5..5a5248c5e9ee1 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_filter.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_filter.rs @@ -20,7 +20,6 @@ use risingwave_pb::stream_plan::FilterNode; use super::{LogicalFilter, PlanRef, PlanTreeNodeUnary, ToStreamProst}; use crate::expr::Expr; use crate::optimizer::plan_node::PlanBase; -use crate::optimizer::property::WithSchema; use crate::utils::Condition; /// `StreamFilter` implements [`super::LogicalFilter`] diff --git a/rust/frontend/src/optimizer/plan_node/stream_hash_agg.rs b/rust/frontend/src/optimizer/plan_node/stream_hash_agg.rs index 283159fbcc473..739a7c8ef3129 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_hash_agg.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_hash_agg.rs @@ -20,7 +20,7 @@ use risingwave_pb::stream_plan::stream_node::Node as ProstStreamNode; use super::logical_agg::PlanAggCall; use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, ToStreamProst}; use crate::expr::InputRefDisplay; -use crate::optimizer::property::{Distribution, WithSchema}; +use crate::optimizer::property::Distribution; #[derive(Debug, Clone)] pub struct StreamHashAgg { diff --git a/rust/frontend/src/optimizer/plan_node/stream_hash_join.rs b/rust/frontend/src/optimizer/plan_node/stream_hash_join.rs index d8784b5e931a6..17bf01b0ab8fe 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_hash_join.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_hash_join.rs @@ -22,7 +22,7 @@ use risingwave_pb::stream_plan::HashJoinNode; use super::{LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, ToStreamProst}; use crate::expr::Expr; use crate::optimizer::plan_node::EqJoinPredicate; -use crate::optimizer::property::{Distribution, WithSchema}; +use crate::optimizer::property::Distribution; use crate::utils::ColIndexMapping; /// `BatchHashJoin` implements [`super::LogicalJoin`] with hash table. It builds a hash table diff --git a/rust/frontend/src/optimizer/plan_node/stream_project.rs b/rust/frontend/src/optimizer/plan_node/stream_project.rs index f575fb5744098..801241d242fc6 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_project.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_project.rs @@ -19,7 +19,6 @@ use risingwave_pb::stream_plan::ProjectNode; use super::{LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, ToStreamProst}; use crate::expr::Expr; -use crate::optimizer::property::WithSchema; /// `StreamProject` implements [`super::LogicalProject`] to evaluate specified expressions on input /// rows. diff --git a/rust/frontend/src/optimizer/plan_node/stream_simple_agg.rs b/rust/frontend/src/optimizer/plan_node/stream_simple_agg.rs index e864f9ba97643..8b15c88f9466b 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_simple_agg.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_simple_agg.rs @@ -19,7 +19,7 @@ use risingwave_pb::stream_plan::stream_node::Node as ProstStreamNode; use super::logical_agg::PlanAggCall; use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, ToStreamProst}; -use crate::optimizer::property::{Distribution, WithSchema}; +use crate::optimizer::property::Distribution; #[derive(Debug, Clone)] pub struct StreamSimpleAgg { diff --git a/rust/frontend/src/optimizer/plan_node/stream_source.rs b/rust/frontend/src/optimizer/plan_node/stream_source.rs index 3cc48a925a7a2..49b3dd7da0c43 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_source.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_source.rs @@ -24,7 +24,7 @@ use risingwave_pb::stream_plan::stream_node::Node as ProstStreamNode; use risingwave_pb::stream_plan::SourceNode; use super::{PlanBase, ToStreamProst}; -use crate::optimizer::property::{Distribution, WithSchema}; +use crate::optimizer::property::Distribution; use crate::session::OptimizerContextRef; /// [`StreamSource`] represents a table/connector source at the very beginning of the graph. diff --git a/rust/frontend/src/optimizer/plan_node/stream_table_scan.rs b/rust/frontend/src/optimizer/plan_node/stream_table_scan.rs index f2817f710c149..f6d5ac8f49c1d 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -19,7 +19,7 @@ use risingwave_pb::stream_plan::stream_node::Node as ProstStreamNode; use risingwave_pb::stream_plan::StreamNode as ProstStreamPlan; use super::{LogicalScan, PlanBase, PlanNodeId, ToStreamProst}; -use crate::optimizer::property::{Distribution, WithSchema}; +use crate::optimizer::property::Distribution; /// `StreamTableScan` is a virtual plan node to represent a stream table scan. It will be converted /// to chain + merge node (for upstream materialize) + batch table scan when converting to `MView` diff --git a/rust/frontend/src/optimizer/property/mod.rs b/rust/frontend/src/optimizer/property/mod.rs index b26a43157a738..f91d753a2a904 100644 --- a/rust/frontend/src/optimizer/property/mod.rs +++ b/rust/frontend/src/optimizer/property/mod.rs @@ -27,5 +27,3 @@ pub(crate) mod order; pub use order::*; mod distribution; pub use distribution::*; -mod schema; -pub use schema::*; diff --git a/rust/frontend/src/optimizer/property/schema.rs b/rust/frontend/src/optimizer/property/schema.rs deleted file mode 100644 index 17c8cc481654a..0000000000000 --- a/rust/frontend/src/optimizer/property/schema.rs +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright 2022 Singularity Data -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use fixedbitset::FixedBitSet; -use paste::paste; -use risingwave_common::catalog::Schema; - -use super::super::plan_node::*; -use crate::for_all_plan_nodes; - -pub trait WithSchema { - fn schema(&self) -> &Schema; - - fn must_contain_columns(&self, required_cols: &FixedBitSet) { - // Having equal length also implies: - // required_cols.is_subset(&FixedBitSet::from_iter(0..self.schema().fields().len())) - assert_eq!( - required_cols.len(), - self.schema().fields().len(), - "required cols capacity != columns available", - ); - } -} - -/// Define module for each node. -macro_rules! impl_with_schema { - ([], $( { $convention:ident, $name:ident }),*) => { - $(paste! { - impl WithSchema for [<$convention $name>] { - fn schema(&self) -> &Schema { - &self.base.schema - } - } - })* - } -} -for_all_plan_nodes! {impl_with_schema } diff --git a/rust/frontend/src/optimizer/rule/filter_agg.rs b/rust/frontend/src/optimizer/rule/filter_agg.rs index 7a4103f9893ca..86af3c6974329 100644 --- a/rust/frontend/src/optimizer/rule/filter_agg.rs +++ b/rust/frontend/src/optimizer/rule/filter_agg.rs @@ -17,7 +17,6 @@ use fixedbitset::FixedBitSet; use super::super::plan_node::*; use super::{BoxedRule, Rule}; use crate::expr::InputRef; -use crate::optimizer::property::WithSchema; use crate::utils::Substitute; /// Pushes a [`LogicalFilter`] past a [`LogicalAgg`]. From 9cca7cbdaea69f7767796cc8d85ed5560191bc7e Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 29 Mar 2022 19:41:30 +0800 Subject: [PATCH 12/12] remove useless todo --- rust/frontend/src/optimizer/plan_node/convert.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/frontend/src/optimizer/plan_node/convert.rs b/rust/frontend/src/optimizer/plan_node/convert.rs index dccd450ff3f14..a29920103230e 100644 --- a/rust/frontend/src/optimizer/plan_node/convert.rs +++ b/rust/frontend/src/optimizer/plan_node/convert.rs @@ -36,7 +36,7 @@ pub trait ToStream { /// /// Now it is used to: /// 1. ensure every plan node's output having pk column - /// 2. (todo) add `row_count`() in every Agg + /// 2. add `row_count`() in every Agg fn logical_rewrite_for_stream(&self) -> (PlanRef, ColIndexMapping); /// `to_stream` is equivalent to `to_stream_with_dist_required(Distribution::any())` fn to_stream(&self) -> PlanRef;