Skip to content

Commit

Permalink
feat(optimizer): add columns_monotonicity field for PlanNode (#17600)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <stdrc@outlook.com>
  • Loading branch information
stdrc authored Jul 17, 2024
1 parent e8273ca commit 6e2c82f
Show file tree
Hide file tree
Showing 40 changed files with 232 additions and 34 deletions.
6 changes: 5 additions & 1 deletion src/frontend/src/optimizer/plan_node/generic/cdc_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::catalog::ColumnId;
use crate::error::Result;
use crate::expr::{ExprRewriter, ExprVisitor};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;
use crate::optimizer::property::{FunctionalDependencySet, MonotonicityMap};
use crate::WithOptions;

/// [`CdcScan`] reads rows of a table from an external upstream database
Expand Down Expand Up @@ -125,6 +125,10 @@ impl CdcScan {
FixedBitSet::with_capacity(self.get_table_columns().len())
}

pub fn columns_monotonicity(&self) -> MonotonicityMap {
MonotonicityMap::new()
}

pub(crate) fn column_names_with_table_prefix(&self) -> Vec<String> {
self.output_col_idx
.iter()
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/logical_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::optimizer::plan_node::{
ToStreamContext,
};
use crate::optimizer::property::Distribution::HashShard;
use crate::optimizer::property::{Distribution, Order, RequiredDist};
use crate::optimizer::property::{Distribution, MonotonicityMap, Order, RequiredDist};
use crate::utils::{ColIndexMapping, Condition, IndexRewriter};

/// `LogicalSource` returns contents of a table or other equivalent object
Expand Down Expand Up @@ -229,6 +229,7 @@ impl LogicalSource {
true, // `list` will keep listing all objects, it must be append-only
false,
FixedBitSet::with_capacity(logical_source.column_catalog.len()),
MonotonicityMap::new(),
),
core: logical_source,
}
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use self::batch::BatchPlanRef;
use self::generic::{GenericPlanRef, PhysicalPlanRef};
use self::stream::StreamPlanRef;
use self::utils::Distill;
use super::property::{Distribution, FunctionalDependencySet, Order};
use super::property::{Distribution, FunctionalDependencySet, MonotonicityMap, Order};
use crate::error::{ErrorCode, Result};
use crate::optimizer::ExpressionSimplifyRewriter;
use crate::session::current::notice_to_user;
Expand Down Expand Up @@ -609,6 +609,10 @@ impl StreamPlanRef for PlanRef {
fn watermark_columns(&self) -> &FixedBitSet {
self.plan_base().watermark_columns()
}

fn columns_monotonicity(&self) -> &MonotonicityMap {
self.plan_base().columns_monotonicity()
}
}

/// Allow access to all fields defined in [`BatchPlanRef`] for the type-erased plan node.
Expand Down
18 changes: 18 additions & 0 deletions src/frontend/src/optimizer/plan_node/plan_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub struct StreamExtra {
/// The watermark column indices of the `PlanNode`'s output. There could be watermark output from
/// this stream operator.
watermark_columns: FixedBitSet,
/// The monotonicity of columns in the output.
columns_monotonicity: MonotonicityMap,
}

impl GetPhysicalCommon for StreamExtra {
Expand Down Expand Up @@ -168,6 +170,10 @@ impl stream::StreamPlanRef for PlanBase<Stream> {
fn watermark_columns(&self) -> &FixedBitSet {
&self.extra.watermark_columns
}

fn columns_monotonicity(&self) -> &MonotonicityMap {
&self.extra.columns_monotonicity
}
}

impl batch::BatchPlanRef for PlanBase<Batch> {
Expand Down Expand Up @@ -222,6 +228,7 @@ impl PlanBase<Stream> {
append_only: bool,
emit_on_window_close: bool,
watermark_columns: FixedBitSet,
columns_monotonicity: MonotonicityMap,
) -> Self {
let id = ctx.next_plan_node_id();
assert_eq!(watermark_columns.len(), schema.len());
Expand All @@ -236,6 +243,7 @@ impl PlanBase<Stream> {
append_only,
emit_on_window_close,
watermark_columns,
columns_monotonicity,
},
}
}
Expand All @@ -246,6 +254,7 @@ impl PlanBase<Stream> {
append_only: bool,
emit_on_window_close: bool,
watermark_columns: FixedBitSet,
columns_monotonicity: MonotonicityMap,
) -> Self {
Self::new_stream(
core.ctx(),
Expand All @@ -256,6 +265,7 @@ impl PlanBase<Stream> {
append_only,
emit_on_window_close,
watermark_columns,
columns_monotonicity,
)
}
}
Expand Down Expand Up @@ -383,6 +393,10 @@ impl<'a> PlanBaseRef<'a> {
dispatch_plan_base!(self, [Stream], StreamPlanRef::watermark_columns)
}

pub(super) fn columns_monotonicity(self) -> &'a MonotonicityMap {
dispatch_plan_base!(self, [Stream], StreamPlanRef::columns_monotonicity)
}

pub(super) fn order(self) -> &'a Order {
dispatch_plan_base!(self, [Batch], BatchPlanRef::order)
}
Expand Down Expand Up @@ -428,6 +442,10 @@ impl StreamPlanRef for PlanBaseRef<'_> {
fn watermark_columns(&self) -> &FixedBitSet {
(*self).watermark_columns()
}

fn columns_monotonicity(&self) -> &MonotonicityMap {
(*self).columns_monotonicity()
}
}

impl BatchPlanRef for PlanBaseRef<'_> {
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use fixedbitset::FixedBitSet;

use super::generic::PhysicalPlanRef;
use crate::optimizer::property::MonotonicityMap;

/// A subtrait of [`PhysicalPlanRef`] for stream plans.
///
Expand All @@ -29,6 +30,7 @@ pub trait StreamPlanRef: PhysicalPlanRef {
fn append_only(&self) -> bool;
fn emit_on_window_close(&self) -> bool;
fn watermark_columns(&self) -> &FixedBitSet;
fn columns_monotonicity(&self) -> &MonotonicityMap;
}

/// Prelude for stream plan nodes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ impl StreamCdcTableScan {
core.append_only(),
false,
core.watermark_columns(),
core.columns_monotonicity(),
);
Self { base, core }
}
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_changelog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use super::stream::prelude::PhysicalPlanRef;
use super::stream::StreamPlanRef;
use super::utils::impl_distill_by_unit;
use super::{generic, ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode};
use crate::optimizer::property::MonotonicityMap;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::PlanRef;

Expand Down Expand Up @@ -48,6 +49,7 @@ impl StreamChangeLog {
true,
input.emit_on_window_close(),
watermark_columns,
MonotonicityMap::new(), // TODO: derive monotonicity
);
StreamChangeLog { base, core }
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl StreamDedup {
true,
input.emit_on_window_close(),
input.watermark_columns().clone(),
input.columns_monotonicity().clone(),
);
StreamDedup { base, core }
}
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::expr::{Expr, ExprRewriter, ExprVisitor};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::IndicesDisplay;
use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay, TryToStreamPb};
use crate::optimizer::property::Distribution;
use crate::optimizer::property::{Distribution, MonotonicityMap};
use crate::scheduler::SchedulerResult;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::ColIndexMappingRewriteExt;
Expand Down Expand Up @@ -76,6 +76,7 @@ impl StreamDeltaJoin {
append_only,
false, // TODO(rc): derive EOWC property from input
watermark_columns,
MonotonicityMap::new(), // TODO: derive monotonicity
);

Self {
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use super::stream::prelude::*;
use super::utils::{childless_record, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::property::MonotonicityMap;
use crate::stream_fragmenter::BuildFragmentGraphState;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand All @@ -41,6 +42,7 @@ impl StreamDml {
append_only,
false, // TODO(rc): decide EOWC property
FixedBitSet::with_capacity(input.schema().len()), // no watermark if dml is allowed
MonotonicityMap::new(), // TODO: derive monotonicity
);

Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use super::{generic, ExprRewritable, PlanTreeNodeUnary};
use crate::expr::Expr;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::{PlanBase, PlanTreeNodeBinary, StreamNode};
use crate::optimizer::property::Distribution;
use crate::optimizer::property::{Distribution, MonotonicityMap};
use crate::optimizer::PlanRef;
use crate::stream_fragmenter::BuildFragmentGraphState;

Expand Down Expand Up @@ -77,6 +77,7 @@ impl StreamDynamicFilter {
out_append_only,
false, // TODO(rc): decide EOWC property
Self::derive_watermark_columns(&core),
MonotonicityMap::new(), // TODO: derive monotonicity
);
let cleaned_by_watermark = Self::cleaned_by_watermark(&core);
Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use super::stream::prelude::*;
use super::utils::{impl_distill_by_unit, TableCatalogBuilder};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::property::MonotonicityMap;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::TableCatalog;

Expand Down Expand Up @@ -58,6 +59,8 @@ impl StreamEowcOverWindow {
true,
true,
watermark_columns,
// we cannot derive monotonicity for any column for the same reason as watermark columns
MonotonicityMap::new(),
);
StreamEowcOverWindow { base, core }
}
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use super::stream::prelude::*;
use super::utils::{childless_record, plan_node_name, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::property::{Distribution, DistributionDisplay};
use crate::optimizer::property::{Distribution, DistributionDisplay, MonotonicityMap};
use crate::stream_fragmenter::BuildFragmentGraphState;

/// `StreamExchange` imposes a particular distribution on its input
Expand All @@ -44,6 +44,7 @@ impl StreamExchange {
input.append_only(),
input.emit_on_window_close(),
input.watermark_columns().clone(),
MonotonicityMap::new(), // we lost monotonicity information when shuffling
);
StreamExchange {
base,
Expand All @@ -64,6 +65,7 @@ impl StreamExchange {
input.append_only(),
input.emit_on_window_close(),
input.watermark_columns().clone(),
input.columns_monotonicity().clone(),
);
StreamExchange {
base,
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use super::stream::prelude::*;
use super::utils::impl_distill_by_unit;
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::property::Distribution;
use crate::optimizer::property::{Distribution, MonotonicityMap};
use crate::stream_fragmenter::BuildFragmentGraphState;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -52,6 +52,7 @@ impl StreamExpand {
input.append_only(),
input.emit_on_window_close(),
watermark_columns,
MonotonicityMap::new(),
);
StreamExpand { base, core }
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl StreamFilter {
input.append_only(),
input.emit_on_window_close(),
input.watermark_columns().clone(),
input.columns_monotonicity().clone(),
);
StreamFilter { base, core }
}
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::catalog::source_catalog::SourceCatalog;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::{childless_record, Distill};
use crate::optimizer::plan_node::{generic, ExprRewritable, StreamNode};
use crate::optimizer::property::Distribution;
use crate::optimizer::property::{Distribution, MonotonicityMap};
use crate::stream_fragmenter::BuildFragmentGraphState;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -55,6 +55,7 @@ impl StreamFsFetch {
source.catalog.as_ref().map_or(true, |s| s.append_only),
false,
FixedBitSet::with_capacity(source.column_catalog.len()),
MonotonicityMap::new(), // TODO: derive monotonicity
);

Self {
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_group_topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use super::utils::{plan_node_name, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::generic::GenericPlanNode;
use crate::optimizer::property::Order;
use crate::optimizer::property::{MonotonicityMap, Order};
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::PlanRef;

Expand Down Expand Up @@ -79,6 +79,7 @@ impl StreamGroupTopN {
// TODO: https://github.com/risingwavelabs/risingwave/issues/8348
false,
watermark_columns,
MonotonicityMap::new(), // TODO: derive monotonicity
);
StreamGroupTopN {
base,
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::error::{ErrorCode, Result};
use crate::expr::{ExprRewriter, ExprVisitor};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::property::MonotonicityMap;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, IndexSet};

Expand Down Expand Up @@ -93,6 +94,7 @@ impl StreamHashAgg {
emit_on_window_close, // in EOWC mode, we produce append only output
emit_on_window_close,
watermark_columns,
MonotonicityMap::new(), // TODO: derive monotonicity
);
StreamHashAgg {
base,
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::expr::{Expr, ExprDisplay, ExprRewriter, ExprVisitor, InequalityInputP
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::IndicesDisplay;
use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay};
use crate::optimizer::property::Distribution;
use crate::optimizer::property::{Distribution, MonotonicityMap};
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::ColIndexMappingRewriteExt;

Expand Down Expand Up @@ -196,6 +196,7 @@ impl StreamHashJoin {
append_only,
false, // TODO(rc): derive EOWC property from input
watermark_columns,
MonotonicityMap::new(), // TODO: derive monotonicity
);

Self {
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use super::utils::{childless_record, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::property::MonotonicityMap;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::ColIndexMappingRewriteExt;

Expand Down Expand Up @@ -62,6 +63,7 @@ impl StreamHopWindow {
input.append_only(),
input.emit_on_window_close(),
internal2output.rewrite_bitset(&watermark_columns),
MonotonicityMap::new(), /* hop window start/end jumps, so monotonicity is not propagated */
);
Self {
base,
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl StreamMaterialize {
input.append_only(),
input.emit_on_window_close(),
input.watermark_columns().clone(),
input.columns_monotonicity().clone(),
);
Self { base, input, table }
}
Expand Down
Loading

0 comments on commit 6e2c82f

Please sign in to comment.