Skip to content

Commit

Permalink
refactor(plan_node_fmt): further prepare generic code where they use …
Browse files Browse the repository at this point in the history
…`fmt_with_builder` (#10155)
  • Loading branch information
ice1000 authored Jun 5, 2023
1 parent a504c40 commit 1897aef
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 38 deletions.
44 changes: 34 additions & 10 deletions src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::fmt;

use fixedbitset::FixedBitSet;
use itertools::{Either, Itertools};
use pretty_xmlish::{Pretty, StrAssocArr};
use risingwave_common::catalog::{Field, FieldDisplay, Schema};
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::{ColumnOrder, ColumnOrderDisplay, OrderType};
Expand All @@ -25,7 +26,7 @@ use risingwave_pb::expr::PbAggCall;
use risingwave_pb::stream_plan::{agg_call_state, AggCallState as AggCallStatePb};

use super::super::utils::TableCatalogBuilder;
use super::{stream, GenericPlanNode, GenericPlanRef};
use super::{impl_distill_unit_from_fields, stream, GenericPlanNode, GenericPlanRef};
use crate::expr::{Expr, ExprRewriter, InputRef, InputRefDisplay};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::plan_node::batch::BatchPlanRef;
Expand Down Expand Up @@ -174,7 +175,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Agg<PlanRef> {
}

fn logical_pk(&self) -> Option<Vec<usize>> {
Some((0..self.group_key.count_ones(..)).collect_vec())
Some((0..self.group_key.count_ones(..)).collect())
}

fn ctx(&self) -> OptimizerContextRef {
Expand Down Expand Up @@ -596,24 +597,47 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
builder.field("aggs", &self.agg_calls_display());
}

pub fn fields_pretty<'a>(&self) -> StrAssocArr<'a> {
let last = ("aggs", self.agg_calls_pretty());
if self.group_key.count_ones(..) != 0 {
let first = ("group_key", self.group_key_pretty());
vec![first, last]
} else {
vec![last]
}
}

fn agg_calls_display(&self) -> Vec<PlanAggCallDisplay<'_>> {
self.agg_calls
.iter()
.map(|plan_agg_call| PlanAggCallDisplay {
let f = |plan_agg_call| PlanAggCallDisplay {
plan_agg_call,
input_schema: self.input.schema(),
};
self.agg_calls.iter().map(f).collect()
}

fn agg_calls_pretty<'a>(&self) -> Pretty<'a> {
let f = |plan_agg_call| {
Pretty::debug(&PlanAggCallDisplay {
plan_agg_call,
input_schema: self.input.schema(),
})
.collect_vec()
};
Pretty::Array(self.agg_calls.iter().map(f).collect())
}

fn group_key_display(&self) -> Vec<FieldDisplay<'_>> {
self.group_key
.ones()
.map(|i| FieldDisplay(self.input.schema().fields.get(i).unwrap()))
.collect_vec()
let f = |i| FieldDisplay(self.input.schema().fields.get(i).unwrap());
self.group_key.ones().map(f).collect()
}

fn group_key_pretty<'a>(&self) -> Pretty<'a> {
let f = |i| Pretty::display(&FieldDisplay(self.input.schema().fields.get(i).unwrap()));
Pretty::Array(self.group_key.ones().map(f).collect())
}
}

impl_distill_unit_from_fields!(Agg, stream::StreamPlanRef);

/// Rewritten version of [`AggCall`] which uses `InputRef` instead of `ExprImpl`.
/// Refer to [`LogicalAggBuilder::try_rewrite_agg_call`] for more details.
#[derive(Clone, PartialEq, Eq, Hash)]
Expand Down
10 changes: 7 additions & 3 deletions src/frontend/src/optimizer/plan_node/generic/dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,22 @@ impl<PlanRef: GenericPlanRef> DynamicFilter<PlanRef> {
watermark_columns
}

pub fn fmt_fields_with_builder(&self, builder: &mut fmt::DebugStruct<'_, '_>) {
fn condition_display(&self) -> (Condition, Schema) {
let mut concat_schema = self.left.schema().fields.clone();
concat_schema.extend(self.right.schema().fields.clone());
let concat_schema = Schema::new(concat_schema);

let predicate = self.predicate();
(predicate, concat_schema)
}

pub fn fmt_fields_with_builder(&self, builder: &mut fmt::DebugStruct<'_, '_>) {
let (condition, input_schema) = &self.condition_display();
builder.field(
"predicate",
&ConditionDisplay {
condition: &predicate,
input_schema: &concat_schema,
condition,
input_schema,
},
);
}
Expand Down
14 changes: 13 additions & 1 deletion src/frontend/src/optimizer/plan_node/generic/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@

use std::fmt;

use pretty_xmlish::Pretty;
use risingwave_common::catalog::Schema;

use super::{GenericPlanNode, GenericPlanRef};
use super::{DistillUnit, GenericPlanNode, GenericPlanRef};
use crate::expr::ExprRewriter;
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;
Expand All @@ -32,6 +33,17 @@ pub struct Filter<PlanRef> {
pub input: PlanRef,
}

impl<PlanRef: GenericPlanRef> DistillUnit for Filter<PlanRef> {
fn distill_with_name<'a>(&self, name: &'a str) -> Pretty<'a> {
let input_schema = self.input.schema();
let predicate = ConditionDisplay {
condition: &self.predicate,
input_schema,
};
Pretty::childless_record(name, vec![("predicate", Pretty::display(&predicate))])
}
}

impl<PlanRef: GenericPlanRef> Filter<PlanRef> {
pub(crate) fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result {
let input_schema = self.input.schema();
Expand Down
48 changes: 47 additions & 1 deletion src/frontend/src/optimizer/plan_node/generic/hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ use std::fmt;
use std::num::NonZeroUsize;

use itertools::Itertools;
use pretty_xmlish::{Pretty, StrAssocArr};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::error::Result;
use risingwave_common::types::{DataType, Interval, IntervalDisplay};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_expr::ExprError;

use super::super::utils::IndicesDisplay;
use super::{GenericPlanNode, GenericPlanRef};
use super::{impl_distill_unit_from_fields, GenericPlanNode, GenericPlanRef};
use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef, InputRefDisplay, Literal};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::plan_node::batch::BatchPlanRef;
Expand Down Expand Up @@ -338,9 +339,54 @@ impl<PlanRef: GenericPlanRef> HopWindow<PlanRef> {
}
}

pub fn fields_pretty<'a>(&self) -> StrAssocArr<'a> {
let mut out = Vec::with_capacity(5);
let output_type = DataType::window_of(&self.time_col.data_type).unwrap();
out.push((
"time_col",
Pretty::display(&InputRefDisplay {
input_ref: &self.time_col,
input_schema: self.input.schema(),
}),
));
out.push(("slide", Pretty::debug(&self.window_slide)));
out.push(("size", Pretty::debug(&self.window_size)));
if self
.output_indices
.iter()
.copied()
// Behavior is the same as `LogicalHopWindow::internal_column_num`
.eq(0..(self.input.schema().len() + 2))
{
out.push(("output", Pretty::from("all")));
} else {
let original_schema: Schema = self
.input
.schema()
.clone()
.into_fields()
.into_iter()
.chain([
Field::with_name(output_type.clone(), "window_start"),
Field::with_name(output_type, "window_end"),
])
.collect();
out.push((
"output",
Pretty::display(&IndicesDisplay {
indices: &self.output_indices,
input_schema: &original_schema,
}),
));
}
out
}

pub fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result {
let mut builder = f.debug_struct(name);
self.fmt_fields_with_builder(&mut builder);
builder.finish()
}
}

impl_distill_unit_from_fields!(HopWindow, GenericPlanRef);
8 changes: 7 additions & 1 deletion src/frontend/src/optimizer/plan_node/generic/intersect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@

use std::fmt;

use pretty_xmlish::{Pretty, StrAssocArr};
use risingwave_common::catalog::Schema;

use super::{GenericPlanNode, GenericPlanRef};
use super::{impl_distill_unit_from_fields, GenericPlanNode, GenericPlanRef};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;

Expand Down Expand Up @@ -56,4 +57,9 @@ impl<PlanRef: GenericPlanRef> Intersect<PlanRef> {
pub fn fmt_fields_with_builder(&self, builder: &mut fmt::DebugStruct<'_, '_>) {
builder.field("all", &self.all);
}

pub fn fields_pretty<'a>(&self) -> StrAssocArr<'a> {
vec![("all", Pretty::debug(&self.all))]
}
}
impl_distill_unit_from_fields!(Intersect, GenericPlanRef);
12 changes: 12 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ pub trait DistillUnit {
fn distill_with_name<'a>(&self, name: &'a str) -> Pretty<'a>;
}

macro_rules! impl_distill_unit_from_fields {
($name:ident, $bound:path) => {
use $crate::optimizer::plan_node::generic::DistillUnit;
impl<PlanRef: $bound> DistillUnit for $name<PlanRef> {
fn distill_with_name<'a>(&self, name: &'a str) -> Pretty<'a> {
Pretty::childless_record(name, self.fields_pretty())
}
}
};
}
pub(super) use impl_distill_unit_from_fields;

pub trait GenericPlanRef {
fn schema(&self) -> &Schema;
fn logical_pk(&self) -> &[usize];
Expand Down
48 changes: 27 additions & 21 deletions src/frontend/src/optimizer/plan_node/generic/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::fmt;
use std::fmt::Formatter;

use fixedbitset::FixedBitSet;
use itertools::Itertools;
use pretty_xmlish::{Pretty, StrAssocArr};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::util::iter_util::ZipEqFast;

Expand Down Expand Up @@ -178,26 +178,32 @@ impl<PlanRef: GenericPlanRef> Project<PlanRef> {
}

pub fn fmt_fields_with_builder(&self, builder: &mut fmt::DebugStruct<'_, '_>, schema: &Schema) {
builder.field(
"exprs",
&self
.exprs
.iter()
.zip_eq_fast(schema.fields().iter())
.map(|(expr, field)| AliasedExpr {
expr: ExprDisplay {
expr,
input_schema: self.input.schema(),
},
alias: {
match expr {
ExprImpl::InputRef(_) | ExprImpl::Literal(_) => None,
_ => Some(field.name.clone()),
}
},
})
.collect_vec(),
);
builder.field("exprs", &self.exprs_for_display(schema));
}

pub fn fields_pretty<'a>(&self, schema: &Schema) -> StrAssocArr<'a> {
let f = |t| Pretty::debug(&t);
let e = Pretty::Array(self.exprs_for_display(schema).iter().map(f).collect());
vec![("exprs", e)]
}

fn exprs_for_display<'a>(&'a self, schema: &Schema) -> Vec<AliasedExpr<'a>> {
self.exprs
.iter()
.zip_eq_fast(schema.fields().iter())
.map(|(expr, field)| AliasedExpr {
expr: ExprDisplay {
expr,
input_schema: self.input.schema(),
},
alias: {
match expr {
ExprImpl::InputRef(_) | ExprImpl::Literal(_) => None,
_ => Some(field.name.clone()),
}
},
})
.collect()
}

pub fn fmt_with_name(
Expand Down
8 changes: 7 additions & 1 deletion src/frontend/src/optimizer/plan_node/generic/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@

use std::fmt;

use pretty_xmlish::{Pretty, StrAssocArr};
use risingwave_common::catalog::Schema;

use super::{GenericPlanNode, GenericPlanRef};
use super::{impl_distill_unit_from_fields, GenericPlanNode, GenericPlanRef};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;

Expand Down Expand Up @@ -72,4 +73,9 @@ impl<PlanRef: GenericPlanRef> Union<PlanRef> {
pub fn fmt_fields_with_builder(&self, builder: &mut fmt::DebugStruct<'_, '_>) {
builder.field("all", &self.all);
}

pub fn fields_pretty<'a>(&self) -> StrAssocArr<'a> {
vec![("all", Pretty::debug(&self.all))]
}
}
impl_distill_unit_from_fields!(Union, GenericPlanRef);

0 comments on commit 1897aef

Please sign in to comment.