Skip to content

Commit

Permalink
Cleanup hop window, create batch.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
ice1000 committed Apr 7, 2023
1 parent 85980fd commit fea86be
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 79 deletions.
21 changes: 21 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::optimizer::property::Order;

use super::generic::GenericPlanRef;

pub trait BatchPlanRef: GenericPlanRef {
fn order(&self) -> &Order;
}
29 changes: 16 additions & 13 deletions src/frontend/src/optimizer/plan_node/batch_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::HopWindowNode;

use super::{
ExprRewritable, LogicalHopWindow, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb,
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb,
ToDistributedBatch,
};
use crate::expr::{Expr, ExprImpl, ExprRewriter};
Expand All @@ -32,24 +32,25 @@ use crate::utils::ColIndexMappingRewriteExt;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchHopWindow {
pub base: PlanBase,
logical: LogicalHopWindow,
logical: generic::HopWindow<PlanRef>,
window_start_exprs: Vec<ExprImpl>,
window_end_exprs: Vec<ExprImpl>,
}

impl BatchHopWindow {
pub fn new(
logical: LogicalHopWindow,
logical: generic::HopWindow<PlanRef>,
window_start_exprs: Vec<ExprImpl>,
window_end_exprs: Vec<ExprImpl>,
) -> Self {
let ctx = logical.base.ctx.clone();
let base = PlanBase::new_logical_with_core(&logical);
let ctx = base.ctx;
let distribution = logical
.i2o_col_mapping()
.rewrite_provided_distribution(logical.input().distribution());
.rewrite_provided_distribution(logical.input.distribution());
let base = PlanBase::new_batch(
ctx,
logical.schema().clone(),
base.schema,
distribution,
logical.get_out_column_index_order(),
);
Expand All @@ -70,12 +71,14 @@ impl fmt::Display for BatchHopWindow {

impl PlanTreeNodeUnary for BatchHopWindow {
fn input(&self) -> PlanRef {
self.logical.input()
self.logical.input.clone()
}

fn clone_with_input(&self, input: PlanRef) -> Self {
let mut logical = self.logical.clone();
logical.input = input;
Self::new(
self.logical.clone_with_input(input),
logical,
self.window_start_exprs.clone(),
self.window_end_exprs.clone(),
)
Expand Down Expand Up @@ -105,7 +108,8 @@ impl ToDistributedBatch for BatchHopWindow {
let new_input = self
.input()
.to_distributed_with_required(required_order, &input_required)?;
let new_logical = self.logical.clone_with_input(new_input);
let mut new_logical = self.logical.clone();
new_logical.input = new_input;
let batch_plan = BatchHopWindow::new(
new_logical,
self.window_start_exprs.clone(),
Expand All @@ -119,12 +123,11 @@ impl ToDistributedBatch for BatchHopWindow {
impl ToBatchPb for BatchHopWindow {
fn to_batch_prost_body(&self) -> NodeBody {
NodeBody::HopWindow(HopWindowNode {
time_col: self.logical.core.time_col.index() as _,
window_slide: Some(self.logical.core.window_slide.into()),
window_size: Some(self.logical.core.window_size.into()),
time_col: self.logical.time_col.index() as _,
window_slide: Some(self.logical.window_slide.into()),
window_size: Some(self.logical.window_size.into()),
output_indices: self
.logical
.core
.output_indices
.iter()
.map(|&x| x as u32)
Expand Down
10 changes: 9 additions & 1 deletion src/frontend/src/optimizer/plan_node/generic/hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use super::super::utils::IndicesDisplay;
use super::{GenericPlanNode, GenericPlanRef};
use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef, InputRefDisplay, Literal};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;
use crate::optimizer::plan_node::batch::BatchPlanRef;
use crate::optimizer::property::{FunctionalDependencySet, Order};
use crate::utils::ColIndexMappingRewriteExt;

/// [`HopWindow`] implements Hop Table Function.
Expand Down Expand Up @@ -118,6 +119,13 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for HopWindow<PlanRef> {
}
}

impl<PlanRef: BatchPlanRef> HopWindow<PlanRef> {
pub fn get_out_column_index_order(&self) -> Order {
self.i2o_col_mapping()
.rewrite_provided_order(self.input.order())
}
}

impl<PlanRef: GenericPlanRef> HopWindow<PlanRef> {
pub fn into_parts(self) -> (PlanRef, InputRef, Interval, Interval, Interval, Vec<usize>) {
(
Expand Down
67 changes: 19 additions & 48 deletions src/frontend/src/optimizer/plan_node/logical_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@ use crate::expr::{ExprType, FunctionCall, InputRef};
use crate::optimizer::plan_node::{
ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
};
use crate::optimizer::property::Order;
use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition};

/// `LogicalHopWindow` implements Hop Table Function.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct LogicalHopWindow {
pub base: PlanBase,
pub(super) core: generic::HopWindow<PlanRef>,
core: generic::HopWindow<PlanRef>,
}

impl LogicalHopWindow {
Expand Down Expand Up @@ -118,36 +117,20 @@ impl LogicalHopWindow {
.into()
}

pub fn internal_window_start_col_idx(&self) -> usize {
self.core.internal_window_start_col_idx()
}

pub fn internal_window_end_col_idx(&self) -> usize {
self.core.internal_window_end_col_idx()
}

pub fn output_window_start_col_idx(&self) -> Option<usize> {
self.internal2output_col_mapping()
.try_map(self.internal_window_start_col_idx())
.try_map(self.core.internal_window_start_col_idx())
}

pub fn output_window_end_col_idx(&self) -> Option<usize> {
self.internal2output_col_mapping()
.try_map(self.internal_window_end_col_idx())
.try_map(self.core.internal_window_end_col_idx())
}

pub fn o2i_col_mapping(&self) -> ColIndexMapping {
self.core.o2i_col_mapping()
}

pub fn i2o_col_mapping(&self) -> ColIndexMapping {
self.core.i2o_col_mapping()
}

pub fn internal_column_num(&self) -> usize {
self.core.internal_column_num()
}

pub fn output2internal_col_mapping(&self) -> ColIndexMapping {
self.core.output2internal_col_mapping()
}
Expand All @@ -167,20 +150,6 @@ impl LogicalHopWindow {
)
}

pub fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result {
self.core.fmt_with_name(f, name)
}

pub fn fmt_fields_with_builder(&self, builder: &mut fmt::DebugStruct<'_, '_>) {
self.core.fmt_fields_with_builder(builder)
}

/// Map the order of the input to use the updated indices
pub fn get_out_column_index_order(&self) -> Order {
self.i2o_col_mapping()
.rewrite_provided_order(self.input().order())
}

/// Get output indices
pub fn output_indices(&self) -> &Vec<usize> {
&self.core.output_indices
Expand Down Expand Up @@ -223,10 +192,10 @@ impl PlanTreeNodeUnary for LogicalHopWindow {
Some(new_idx)
}
None => {
if idx == self.internal_window_start_col_idx() {
if idx == self.core.internal_window_start_col_idx() {
columns_to_be_kept.push(i);
Some(input.schema().len())
} else if idx == self.internal_window_end_col_idx() {
} else if idx == self.core.internal_window_end_col_idx() {
columns_to_be_kept.push(i);
Some(input.schema().len() + 1)
} else {
Expand Down Expand Up @@ -257,7 +226,7 @@ impl_plan_tree_node_for_unary! {LogicalHopWindow}

impl fmt::Display for LogicalHopWindow {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.fmt_with_name(f, "LogicalHopWindow")
self.core.fmt_with_name(f, "LogicalHopWindow")
}
}

Expand Down Expand Up @@ -296,9 +265,9 @@ impl ColPrunable for LogicalHopWindow {
if let Some(idx) = o2i.try_map(idx) {
Some(IndexType::Input(idx))
} else if let Some(idx) = output2internal.try_map(idx) {
if idx == self.internal_window_start_col_idx() {
if idx == self.core.internal_window_start_col_idx() {
Some(IndexType::WindowStart)
} else if idx == self.internal_window_end_col_idx() {
} else if idx == self.core.internal_window_end_col_idx() {
Some(IndexType::WindowEnd)
} else {
None
Expand All @@ -313,8 +282,8 @@ impl ColPrunable for LogicalHopWindow {
.iter()
.filter_map(|&idx| match idx {
IndexType::Input(x) => input_change.try_map(x),
IndexType::WindowStart => Some(new_hop.internal_window_start_col_idx()),
IndexType::WindowEnd => Some(new_hop.internal_window_end_col_idx()),
IndexType::WindowStart => Some(new_hop.core.internal_window_start_col_idx()),
IndexType::WindowEnd => Some(new_hop.core.internal_window_end_col_idx()),
})
.collect_vec()
};
Expand All @@ -334,8 +303,8 @@ impl PredicatePushdown for LogicalHopWindow {
) -> PlanRef {
let mut window_columns = FixedBitSet::with_capacity(self.schema().len());

let window_start_idx = self.internal_window_start_col_idx();
let window_end_idx = self.internal_window_end_col_idx();
let window_start_idx = self.core.internal_window_start_col_idx();
let window_end_idx = self.core.internal_window_end_col_idx();
for (i, v) in self.output_indices().iter().enumerate() {
if *v == window_start_idx || *v == window_end_idx {
window_columns.insert(i);
Expand All @@ -351,19 +320,21 @@ impl PredicatePushdown for LogicalHopWindow {
impl ToBatch for LogicalHopWindow {
fn to_batch(&self) -> Result<PlanRef> {
let new_input = self.input().to_batch()?;
let new_logical = self.clone_with_input(new_input);
let mut new_logical = self.core.clone();
new_logical.input = new_input;
let (window_start_exprs, window_end_exprs) =
new_logical.core.derive_window_start_and_end_exprs()?;
new_logical.derive_window_start_and_end_exprs()?;
Ok(BatchHopWindow::new(new_logical, window_start_exprs, window_end_exprs).into())
}
}

impl ToStream for LogicalHopWindow {
fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<PlanRef> {
let new_input = self.input().to_stream(ctx)?;
let new_logical = self.clone_with_input(new_input);
let mut new_logical = self.core.clone();
new_logical.input = new_input;
let (window_start_exprs, window_end_exprs) =
new_logical.core.derive_window_start_and_end_exprs()?;
new_logical.derive_window_start_and_end_exprs()?;
Ok(StreamHopWindow::new(new_logical, window_start_exprs, window_end_exprs).into())
}

Expand All @@ -382,7 +353,7 @@ impl ToStream for LogicalHopWindow {
{
output_indices.push(input.schema().len());
}
let i2o = self.i2o_col_mapping();
let i2o = self.core.i2o_col_mapping();
output_indices.extend(
input
.logical_pk()
Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use risingwave_pb::stream_plan::StreamNode as StreamPlanPb;
use serde::Serialize;
use smallvec::SmallVec;

use self::batch::BatchPlanRef;
use self::generic::GenericPlanRef;
use self::stream::StreamPlanRef;
use super::property::{Distribution, FunctionalDependencySet, Order};
Expand Down Expand Up @@ -385,6 +386,12 @@ impl StreamPlanRef for PlanRef {
}
}

impl BatchPlanRef for PlanRef {
fn order(&self) -> &Order {
&self.plan_base().order
}
}

impl GenericPlanRef for PlanRef {
fn schema(&self) -> &Schema {
&self.plan_base().schema
Expand Down Expand Up @@ -597,6 +604,7 @@ pub use merge_eq_nodes::*;
pub mod generic;
pub mod stream;
pub mod stream_derive;
pub mod batch;

pub use generic::{PlanAggCall, PlanAggCallDisplay};

Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/optimizer/plan_node/plan_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ impl stream::StreamPlanRef for PlanBase {
self.append_only
}
}
impl batch::BatchPlanRef for PlanBase {
fn order(&self) -> &Order {
&self.order
}
}
impl PlanBase {
pub fn new_logical(
ctx: OptimizerContextRef,
Expand Down
Loading

0 comments on commit fea86be

Please sign in to comment.