Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(plan_node): use generics in update nodes #9740

Merged
merged 5 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 17 additions & 25 deletions src/frontend/src/optimizer/plan_node/batch_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@

use std::fmt;

use risingwave_common::catalog::Schema;
use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::UpdateNode;

use super::{
ExprRewritable, LogicalUpdate, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb,
ToDistributedBatch,
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
};
use crate::expr::{Expr, ExprRewriter};
use crate::optimizer::plan_node::ToLocalBatch;
Expand All @@ -30,18 +30,13 @@ use crate::optimizer::property::{Distribution, Order, RequiredDist};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchUpdate {
pub base: PlanBase,
pub logical: LogicalUpdate,
pub logical: generic::Update<PlanRef>,
}

impl BatchUpdate {
pub fn new(logical: LogicalUpdate) -> Self {
let ctx = logical.base.ctx.clone();
let base = PlanBase::new_batch(
ctx,
logical.schema().clone(),
Distribution::Single,
Order::any(),
);
pub fn new(logical: generic::Update<PlanRef>, schema: Schema) -> Self {
let ctx = logical.input.ctx();
let base = PlanBase::new_batch(ctx, schema, Distribution::Single, Order::any());
Self { base, logical }
}
}
Expand All @@ -54,11 +49,13 @@ impl fmt::Display for BatchUpdate {

impl PlanTreeNodeUnary for BatchUpdate {
fn input(&self) -> PlanRef {
self.logical.input()
self.logical.input.clone()
}

fn clone_with_input(&self, input: PlanRef) -> Self {
Self::new(self.logical.clone_with_input(input))
let mut logical = self.logical.clone();
logical.input = input;
Self::new(logical, self.schema().clone())
}
}

Expand All @@ -76,16 +73,16 @@ impl ToBatchPb for BatchUpdate {
fn to_batch_prost_body(&self) -> NodeBody {
let exprs = self
.logical
.exprs()
.exprs
.iter()
.map(|x| x.to_expr_proto())
.collect();

NodeBody::Update(UpdateNode {
exprs,
table_id: self.logical.table_id().table_id(),
table_version_id: self.logical.table_version_id(),
returning: self.logical.has_returning(),
table_id: self.logical.table_id.table_id(),
table_version_id: self.logical.table_version_id,
returning: self.logical.returning,
})
}
}
Expand All @@ -104,13 +101,8 @@ impl ExprRewritable for BatchUpdate {
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
Self::new(
self.logical
.rewrite_exprs(r)
.as_logical_update()
.unwrap()
.clone(),
)
.into()
let mut logical = self.logical.clone();
logical.rewrite_exprs(r);
Self::new(logical, self.schema().clone()).into()
}
}
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ mod over_window;
pub use over_window::*;
mod except;
pub use except::*;
mod update;
pub use update::*;

pub trait GenericPlanRef {
fn schema(&self) -> &Schema;
Expand Down
64 changes: 64 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use std::fmt;
ice1000 marked this conversation as resolved.
Show resolved Hide resolved
use std::hash::Hash;

use educe::Educe;
use risingwave_common::catalog::TableVersionId;

use crate::catalog::TableId;
use crate::expr::{ExprImpl, ExprRewriter};

#[derive(Debug, Clone, Educe)]
#[educe(PartialEq, Eq, Hash)]
pub struct Update<PlanRef: Eq + Hash> {
#[educe(PartialEq(ignore))]
#[educe(Hash(ignore))]
pub table_name: String, // explain-only
pub table_id: TableId,
pub table_version_id: TableVersionId,
pub input: PlanRef,
pub exprs: Vec<ExprImpl>,
pub returning: bool,
}

impl<PlanRef: Eq + Hash> Update<PlanRef> {
pub fn new(
input: PlanRef,
table_name: String,
table_id: TableId,
table_version_id: TableVersionId,
exprs: Vec<ExprImpl>,
returning: bool,
) -> Self {
Self {
table_name,
table_id,
table_version_id,
input,
exprs,
returning,
}
}

pub(crate) fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result {
write!(
f,
"{} {{ table: {}, exprs: {:?}{} }}",
name,
self.table_name,
self.exprs,
if self.returning {
", returning: true"
} else {
""
}
)
}

pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
self.exprs = self
.exprs
.iter()
.map(|e| r.rewrite_expr(e.clone()))
.collect();
}
}
111 changes: 27 additions & 84 deletions src/frontend/src/optimizer/plan_node/logical_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use risingwave_common::error::Result;
use risingwave_common::types::DataType;

use super::{
gen_filter_and_pushdown, BatchUpdate, ColPrunable, ExprRewritable, PlanBase, PlanRef,
gen_filter_and_pushdown, generic, BatchUpdate, ColPrunable, ExprRewritable, PlanBase, PlanRef,
PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream,
};
use crate::catalog::TableId;
Expand All @@ -37,117 +37,59 @@ use crate::utils::{ColIndexMapping, Condition};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct LogicalUpdate {
pub base: PlanBase,
table_name: String, // explain-only
table_id: TableId,
table_version_id: TableVersionId,
input: PlanRef,
exprs: Vec<ExprImpl>,
returning: bool,
core: generic::Update<PlanRef>,
}

impl LogicalUpdate {
/// Create a [`LogicalUpdate`] node. Used internally by optimizer.
pub fn new(
input: PlanRef,
table_name: String,
table_id: TableId,
table_version_id: TableVersionId,
exprs: Vec<ExprImpl>,
returning: bool,
) -> Self {
let ctx = input.ctx();
let schema = if returning {
input.schema().clone()
impl From<generic::Update<PlanRef>> for LogicalUpdate {
fn from(core: generic::Update<PlanRef>) -> Self {
let ctx = core.input.ctx();
let schema = if core.returning {
core.input.schema().clone()
} else {
Schema::new(vec![Field::unnamed(DataType::Int64)])
};
let fd_set = FunctionalDependencySet::new(schema.len());
let base = PlanBase::new_logical(ctx, schema, vec![], fd_set);
Self {
base,
table_name,
table_id,
table_version_id,
input,
exprs,
returning,
}
}

/// Create a [`LogicalUpdate`] node. Used by planner.
pub fn create(
input: PlanRef,
table_name: String,
table_id: TableId,
table_version_id: TableVersionId,
exprs: Vec<ExprImpl>,
returning: bool,
) -> Result<Self> {
Ok(Self::new(
input,
table_name,
table_id,
table_version_id,
exprs,
returning,
))
}

pub(super) fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result {
write!(
f,
"{} {{ table: {}, exprs: {:?}{} }}",
name,
self.table_name,
self.exprs,
if self.returning {
", returning: true"
} else {
""
}
)
Self { base, core }
}
}

impl LogicalUpdate {
#[must_use]
pub fn table_id(&self) -> TableId {
self.table_id
self.core.table_id
}

pub fn exprs(&self) -> &[ExprImpl] {
self.exprs.as_ref()
self.core.exprs.as_ref()
}

pub fn has_returning(&self) -> bool {
self.returning
self.core.returning
}

pub fn table_version_id(&self) -> TableVersionId {
self.table_version_id
self.core.table_version_id
}
}

impl PlanTreeNodeUnary for LogicalUpdate {
fn input(&self) -> PlanRef {
self.input.clone()
self.core.input.clone()
}

fn clone_with_input(&self, input: PlanRef) -> Self {
Self::new(
input,
self.table_name.clone(),
self.table_id,
self.table_version_id,
self.exprs.clone(),
self.returning,
)
let mut core = self.core.clone();
core.input = input;
core.into()
}
}

impl_plan_tree_node_for_unary! { LogicalUpdate }

impl fmt::Display for LogicalUpdate {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.fmt_with_name(f, "LogicalUpdate")
self.core.fmt_with_name(f, "LogicalUpdate")
}
}

Expand All @@ -157,17 +99,17 @@ impl ExprRewritable for LogicalUpdate {
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
let mut new = self.clone();
let mut new = self.core.clone();
new.exprs = new.exprs.into_iter().map(|e| r.rewrite_expr(e)).collect();
new.base = new.base.clone_with_new_plan_id();
new.into()
Self::from(new).into()
}
}

impl ColPrunable for LogicalUpdate {
fn prune_col(&self, _required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
let required_cols: Vec<_> = (0..self.input.schema().len()).collect();
self.clone_with_input(self.input.prune_col(&required_cols, ctx))
let input = self.input();
let required_cols: Vec<_> = (0..input.schema().len()).collect();
self.clone_with_input(input.prune_col(&required_cols, ctx))
.into()
}
}
Expand All @@ -185,8 +127,9 @@ impl PredicatePushdown for LogicalUpdate {
impl ToBatch for LogicalUpdate {
fn to_batch(&self) -> Result<PlanRef> {
let new_input = self.input().to_batch()?;
let new_logical = self.clone_with_input(new_input);
Ok(BatchUpdate::new(new_logical).into())
let mut new_logical = self.core.clone();
new_logical.input = new_input;
Ok(BatchUpdate::new(new_logical, self.schema().clone()).into())
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/planner/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use risingwave_common::error::Result;
use super::select::LogicalFilter;
use super::Planner;
use crate::binder::BoundUpdate;
use crate::optimizer::plan_node::{LogicalProject, LogicalUpdate};
use crate::optimizer::plan_node::{generic, LogicalProject, LogicalUpdate};
use crate::optimizer::property::{Order, RequiredDist};
use crate::optimizer::{PlanRef, PlanRoot};

Expand All @@ -31,14 +31,14 @@ impl Planner {
scan
};
let returning = !update.returning_list.is_empty();
let mut plan: PlanRef = LogicalUpdate::create(
let mut plan: PlanRef = LogicalUpdate::from(generic::Update::new(
input,
update.table_name.clone(),
update.table_id,
update.table_version_id,
update.exprs,
returning,
)?
))
.into();

if returning {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ impl BatchPlanFragmenter {
if let Some(insert) = node.as_batch_insert() {
Some(insert.logical.table_id())
} else if let Some(update) = node.as_batch_update() {
Some(update.logical.table_id())
Some(update.logical.table_id)
} else if let Some(delete) = node.as_batch_delete() {
Some(delete.logical.table_id())
} else {
Expand Down