From a7e7bbcb6eec4ede8a4ebf2500ca99cc1718146f Mon Sep 17 00:00:00 2001 From: ice1000 Date: Wed, 10 May 2023 13:41:51 +0000 Subject: [PATCH 1/5] Logical updates --- .../src/optimizer/plan_node/generic/mod.rs | 2 + .../src/optimizer/plan_node/generic/update.rs | 56 +++++++++ .../src/optimizer/plan_node/logical_update.rs | 106 ++++-------------- src/frontend/src/planner/update.rs | 6 +- 4 files changed, 85 insertions(+), 85 deletions(-) create mode 100644 src/frontend/src/optimizer/plan_node/generic/update.rs diff --git a/src/frontend/src/optimizer/plan_node/generic/mod.rs b/src/frontend/src/optimizer/plan_node/generic/mod.rs index 2c951c1fad4da..43df44ba9647e 100644 --- a/src/frontend/src/optimizer/plan_node/generic/mod.rs +++ b/src/frontend/src/optimizer/plan_node/generic/mod.rs @@ -52,6 +52,8 @@ mod over_window; pub use over_window::*; mod except; pub use except::*; +mod update; +pub use update::*; pub trait GenericPlanRef { fn schema(&self) -> &Schema; diff --git a/src/frontend/src/optimizer/plan_node/generic/update.rs b/src/frontend/src/optimizer/plan_node/generic/update.rs new file mode 100644 index 0000000000000..fe4cfc3c385a8 --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/generic/update.rs @@ -0,0 +1,56 @@ +use std::fmt; +use std::hash::Hash; + +use educe::Educe; +use risingwave_common::catalog::TableVersionId; + +use crate::catalog::TableId; +use crate::expr::ExprImpl; + +#[derive(Debug, Clone, Educe)] +#[educe(PartialEq, Eq, Hash)] +pub struct Update { + #[educe(PartialEq(ignore))] + #[educe(Hash(ignore))] + pub table_name: String, // explain-only + pub table_id: TableId, + pub table_version_id: TableVersionId, + pub input: PlanRef, + pub exprs: Vec, + pub returning: bool, +} + +impl Update { + pub fn new( + input: PlanRef, + table_name: String, + table_id: TableId, + table_version_id: TableVersionId, + exprs: Vec, + returning: bool, + ) -> Self { + Self { + table_name, + table_id, + table_version_id, + input, + exprs, + returning, + } + } + + pub(crate) fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result { + write!( + f, + "{} {{ table: {}, exprs: {:?}{} }}", + name, + self.table_name, + self.exprs, + if self.returning { + ", returning: true" + } else { + "" + } + ) + } +} diff --git a/src/frontend/src/optimizer/plan_node/logical_update.rs b/src/frontend/src/optimizer/plan_node/logical_update.rs index 8d60a9de2fe56..a758cb39c2f18 100644 --- a/src/frontend/src/optimizer/plan_node/logical_update.rs +++ b/src/frontend/src/optimizer/plan_node/logical_update.rs @@ -19,7 +19,7 @@ use risingwave_common::error::Result; use risingwave_common::types::DataType; use super::{ - gen_filter_and_pushdown, BatchUpdate, ColPrunable, ExprRewritable, PlanBase, PlanRef, + gen_filter_and_pushdown, generic, BatchUpdate, ColPrunable, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, }; use crate::catalog::TableId; @@ -37,109 +37,51 @@ use crate::utils::{ColIndexMapping, Condition}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct LogicalUpdate { pub base: PlanBase, - table_name: String, // explain-only - table_id: TableId, - table_version_id: TableVersionId, - input: PlanRef, - exprs: Vec, - returning: bool, + core: generic::Update, } -impl LogicalUpdate { - /// Create a [`LogicalUpdate`] node. Used internally by optimizer. - pub fn new( - input: PlanRef, - table_name: String, - table_id: TableId, - table_version_id: TableVersionId, - exprs: Vec, - returning: bool, - ) -> Self { - let ctx = input.ctx(); - let schema = if returning { - input.schema().clone() +impl From> for LogicalUpdate { + fn from(core: generic::Update) -> Self { + let ctx = core.input.ctx(); + let schema = if core.returning { + core.input.schema().clone() } else { Schema::new(vec![Field::unnamed(DataType::Int64)]) }; let fd_set = FunctionalDependencySet::new(schema.len()); let base = PlanBase::new_logical(ctx, schema, vec![], fd_set); - Self { - base, - table_name, - table_id, - table_version_id, - input, - exprs, - returning, - } - } - - /// Create a [`LogicalUpdate`] node. Used by planner. - pub fn create( - input: PlanRef, - table_name: String, - table_id: TableId, - table_version_id: TableVersionId, - exprs: Vec, - returning: bool, - ) -> Result { - Ok(Self::new( - input, - table_name, - table_id, - table_version_id, - exprs, - returning, - )) - } - - pub(super) fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result { - write!( - f, - "{} {{ table: {}, exprs: {:?}{} }}", - name, - self.table_name, - self.exprs, - if self.returning { - ", returning: true" - } else { - "" - } - ) + Self { base, core } } +} +impl LogicalUpdate { #[must_use] pub fn table_id(&self) -> TableId { - self.table_id + self.core.table_id } pub fn exprs(&self) -> &[ExprImpl] { - self.exprs.as_ref() + self.core.exprs.as_ref() } pub fn has_returning(&self) -> bool { - self.returning + self.core.returning } pub fn table_version_id(&self) -> TableVersionId { - self.table_version_id + self.core.table_version_id } } impl PlanTreeNodeUnary for LogicalUpdate { fn input(&self) -> PlanRef { - self.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - Self::new( - input, - self.table_name.clone(), - self.table_id, - self.table_version_id, - self.exprs.clone(), - self.returning, - ) + let mut core = self.core.clone(); + core.input = input; + core.into() } } @@ -147,7 +89,7 @@ impl_plan_tree_node_for_unary! { LogicalUpdate } impl fmt::Display for LogicalUpdate { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.fmt_with_name(f, "LogicalUpdate") + self.core.fmt_with_name(f, "LogicalUpdate") } } @@ -157,17 +99,17 @@ impl ExprRewritable for LogicalUpdate { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut new = self.clone(); + let mut new = self.core.clone(); new.exprs = new.exprs.into_iter().map(|e| r.rewrite_expr(e)).collect(); - new.base = new.base.clone_with_new_plan_id(); - new.into() + Self::from(new).into() } } impl ColPrunable for LogicalUpdate { fn prune_col(&self, _required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef { - let required_cols: Vec<_> = (0..self.input.schema().len()).collect(); - self.clone_with_input(self.input.prune_col(&required_cols, ctx)) + let input = self.input(); + let required_cols: Vec<_> = (0..input.schema().len()).collect(); + self.clone_with_input(input.prune_col(&required_cols, ctx)) .into() } } diff --git a/src/frontend/src/planner/update.rs b/src/frontend/src/planner/update.rs index 8b79d00d4a834..d77c8868640be 100644 --- a/src/frontend/src/planner/update.rs +++ b/src/frontend/src/planner/update.rs @@ -18,7 +18,7 @@ use risingwave_common::error::Result; use super::select::LogicalFilter; use super::Planner; use crate::binder::BoundUpdate; -use crate::optimizer::plan_node::{LogicalProject, LogicalUpdate}; +use crate::optimizer::plan_node::{generic, LogicalProject, LogicalUpdate}; use crate::optimizer::property::{Order, RequiredDist}; use crate::optimizer::{PlanRef, PlanRoot}; @@ -31,14 +31,14 @@ impl Planner { scan }; let returning = !update.returning_list.is_empty(); - let mut plan: PlanRef = LogicalUpdate::create( + let mut plan: PlanRef = LogicalUpdate::from(generic::Update::new( input, update.table_name.clone(), update.table_id, update.table_version_id, update.exprs, returning, - )? + )) .into(); if returning { From ac7b571590c9d8f97fa8db18a4e59db771f1676d Mon Sep 17 00:00:00 2001 From: ice1000 Date: Wed, 10 May 2023 14:27:10 +0000 Subject: [PATCH 2/5] Batch --- .../src/optimizer/plan_node/batch_update.rs | 42 ++++++++----------- .../src/optimizer/plan_node/generic/update.rs | 11 ++++- .../src/optimizer/plan_node/logical_update.rs | 5 ++- 3 files changed, 30 insertions(+), 28 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/batch_update.rs b/src/frontend/src/optimizer/plan_node/batch_update.rs index 3414876dc3928..5927785144bd6 100644 --- a/src/frontend/src/optimizer/plan_node/batch_update.rs +++ b/src/frontend/src/optimizer/plan_node/batch_update.rs @@ -14,13 +14,13 @@ use std::fmt; +use risingwave_common::catalog::Schema; use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::UpdateNode; use super::{ - ExprRewritable, LogicalUpdate, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, - ToDistributedBatch, + generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, }; use crate::expr::{Expr, ExprRewriter}; use crate::optimizer::plan_node::ToLocalBatch; @@ -30,18 +30,13 @@ use crate::optimizer::property::{Distribution, Order, RequiredDist}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchUpdate { pub base: PlanBase, - pub logical: LogicalUpdate, + pub logical: generic::Update, } impl BatchUpdate { - pub fn new(logical: LogicalUpdate) -> Self { - let ctx = logical.base.ctx.clone(); - let base = PlanBase::new_batch( - ctx, - logical.schema().clone(), - Distribution::Single, - Order::any(), - ); + pub fn new(logical: generic::Update, schema: Schema) -> Self { + let ctx = logical.input.ctx(); + let base = PlanBase::new_batch(ctx, schema, Distribution::Single, Order::any()); Self { base, logical } } } @@ -54,11 +49,13 @@ impl fmt::Display for BatchUpdate { impl PlanTreeNodeUnary for BatchUpdate { fn input(&self) -> PlanRef { - self.logical.input() + self.logical.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - Self::new(self.logical.clone_with_input(input)) + let mut logical = self.logical.clone(); + logical.input = input; + Self::new(logical, self.schema().clone()) } } @@ -76,16 +73,16 @@ impl ToBatchPb for BatchUpdate { fn to_batch_prost_body(&self) -> NodeBody { let exprs = self .logical - .exprs() + .exprs .iter() .map(|x| x.to_expr_proto()) .collect(); NodeBody::Update(UpdateNode { exprs, - table_id: self.logical.table_id().table_id(), - table_version_id: self.logical.table_version_id(), - returning: self.logical.has_returning(), + table_id: self.logical.table_id.table_id(), + table_version_id: self.logical.table_version_id, + returning: self.logical.returning, }) } } @@ -104,13 +101,8 @@ impl ExprRewritable for BatchUpdate { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - Self::new( - self.logical - .rewrite_exprs(r) - .as_logical_update() - .unwrap() - .clone(), - ) - .into() + let mut logical = self.logical.clone(); + logical.rewrite_exprs(r); + Self::new(logical, self.schema().clone()).into() } } diff --git a/src/frontend/src/optimizer/plan_node/generic/update.rs b/src/frontend/src/optimizer/plan_node/generic/update.rs index fe4cfc3c385a8..159bed80d11c4 100644 --- a/src/frontend/src/optimizer/plan_node/generic/update.rs +++ b/src/frontend/src/optimizer/plan_node/generic/update.rs @@ -5,7 +5,7 @@ use educe::Educe; use risingwave_common::catalog::TableVersionId; use crate::catalog::TableId; -use crate::expr::ExprImpl; +use crate::expr::{ExprImpl, ExprRewriter}; #[derive(Debug, Clone, Educe)] #[educe(PartialEq, Eq, Hash)] @@ -53,4 +53,13 @@ impl Update { } ) } + + pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) { + self.exprs = self + .exprs + .iter() + .map(|e| r.rewrite_expr(e.clone())) + .collect(); + } } + diff --git a/src/frontend/src/optimizer/plan_node/logical_update.rs b/src/frontend/src/optimizer/plan_node/logical_update.rs index a758cb39c2f18..556874f2d962d 100644 --- a/src/frontend/src/optimizer/plan_node/logical_update.rs +++ b/src/frontend/src/optimizer/plan_node/logical_update.rs @@ -127,8 +127,9 @@ impl PredicatePushdown for LogicalUpdate { impl ToBatch for LogicalUpdate { fn to_batch(&self) -> Result { let new_input = self.input().to_batch()?; - let new_logical = self.clone_with_input(new_input); - Ok(BatchUpdate::new(new_logical).into()) + let mut new_logical = self.core.clone(); + new_logical.input = new_input; + Ok(BatchUpdate::new(new_logical, self.schema().clone()).into()) } } From 600bb315a3e5ff045b2e6ad87fa90b694c1df46d Mon Sep 17 00:00:00 2001 From: ice1000 Date: Wed, 10 May 2023 14:27:53 +0000 Subject: [PATCH 3/5] Fmt --- src/frontend/src/optimizer/plan_node/generic/update.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/frontend/src/optimizer/plan_node/generic/update.rs b/src/frontend/src/optimizer/plan_node/generic/update.rs index 159bed80d11c4..9146faa184958 100644 --- a/src/frontend/src/optimizer/plan_node/generic/update.rs +++ b/src/frontend/src/optimizer/plan_node/generic/update.rs @@ -62,4 +62,3 @@ impl Update { .collect(); } } - From af304199e92e47130b05b6668b0e19de53144e2b Mon Sep 17 00:00:00 2001 From: ice1000 Date: Wed, 10 May 2023 14:29:45 +0000 Subject: [PATCH 4/5] Fix --- src/frontend/src/scheduler/plan_fragmenter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 49212a368f59b..6204b498e32a5 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -948,7 +948,7 @@ impl BatchPlanFragmenter { if let Some(insert) = node.as_batch_insert() { Some(insert.logical.table_id()) } else if let Some(update) = node.as_batch_update() { - Some(update.logical.table_id()) + Some(update.logical.table_id) } else if let Some(delete) = node.as_batch_delete() { Some(delete.logical.table_id()) } else { From b3429612b24641706e585cf5e01748f9311e4a1d Mon Sep 17 00:00:00 2001 From: ice1000 Date: Wed, 10 May 2023 15:48:38 +0000 Subject: [PATCH 5/5] add copyright --- .../src/optimizer/plan_node/generic/update.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/frontend/src/optimizer/plan_node/generic/update.rs b/src/frontend/src/optimizer/plan_node/generic/update.rs index 9146faa184958..f14c821f9657f 100644 --- a/src/frontend/src/optimizer/plan_node/generic/update.rs +++ b/src/frontend/src/optimizer/plan_node/generic/update.rs @@ -1,3 +1,17 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use std::fmt; use std::hash::Hash;