Skip to content

Commit

Permalink
refactor(plan_node_fmt): slightly generalize the fmt of join-relate…
Browse files Browse the repository at this point in the history
…d nodes (#10253)
  • Loading branch information
ice1000 authored and Little-Wallace committed Jun 12, 2023
1 parent 4691d75 commit 2712751
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 120 deletions.
56 changes: 39 additions & 17 deletions src/frontend/src/optimizer/plan_node/batch_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

use std::fmt;

use pretty_xmlish::Pretty;
use risingwave_common::catalog::Schema;
use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::HashJoinNode;
use risingwave_pb::plan_common::JoinType;

use super::generic::{self, GenericPlanRef};
use super::utils::Distill;
use super::{
EqJoinPredicate, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, ToBatchPb,
ToDistributedBatch,
Expand Down Expand Up @@ -98,6 +100,35 @@ impl BatchHashJoin {
}
}

impl Distill for BatchHashJoin {
fn distill<'a>(&self) -> Pretty<'a> {
let verbose = self.base.ctx.is_explain_verbose();
let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
vec.push(("type", Pretty::debug(&self.logical.join_type)));
let mut concat_schema = self.left().schema().fields.clone();
concat_schema.extend(self.right().schema().fields.clone());
let concat_schema = Schema::new(concat_schema);

vec.push((
"predicate",
Pretty::debug(&EqJoinPredicateDisplay {
eq_join_predicate: self.eq_join_predicate(),
input_schema: &concat_schema,
}),
));
if verbose {
let data = IndicesDisplay::from(
&self.logical.output_indices,
self.logical.internal_column_num(),
&concat_schema,
)
.map_or_else(|| Pretty::from("all"), |id| Pretty::display(&id));
vec.push(("output", data));
}
Pretty::childless_record("BatchHashJoin", vec)
}
}

impl fmt::Display for BatchHashJoin {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let verbose = self.base.ctx.is_explain_verbose();
Expand All @@ -116,23 +147,14 @@ impl fmt::Display for BatchHashJoin {
);

if verbose {
if self
.logical
.output_indices
.iter()
.copied()
.eq(0..self.logical.internal_column_num())
{
builder.field("output", &format_args!("all"));
} else {
builder.field(
"output",
&IndicesDisplay {
indices: &self.logical.output_indices,
input_schema: &concat_schema,
},
);
}
match IndicesDisplay::from(
&self.logical.output_indices,
self.logical.internal_column_num(),
&concat_schema,
) {
None => builder.field("output", &format_args!("all")),
Some(id) => builder.field("output", &id),
};
}

builder.finish()
Expand Down
11 changes: 10 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

use std::fmt;

use pretty_xmlish::Pretty;
use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::InsertNode;
use risingwave_pb::plan_common::{DefaultColumns, IndexAndExpr};

use super::utils::Distill;
use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch};
use crate::expr::Expr;
use crate::optimizer::plan_node::{PlanBase, ToLocalBatch};
Expand All @@ -44,12 +46,19 @@ impl BatchInsert {
}
}

impl Distill for BatchInsert {
fn distill<'a>(&self) -> Pretty<'a> {
let vec = self
.logical
.fields_pretty(self.base.ctx.is_explain_verbose());
Pretty::childless_record("BatchInsert", vec)
}
}
impl fmt::Display for BatchInsert {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.logical.fmt_with_name(f, "BatchInsert")
}
}
// impl_distill_by_unit!(BatchInsert, logical, "BatchInsert");

impl PlanTreeNodeUnary for BatchInsert {
fn input(&self) -> PlanRef {
Expand Down
58 changes: 41 additions & 17 deletions src/frontend/src/optimizer/plan_node/batch_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@

use std::fmt;

use pretty_xmlish::Pretty;
use risingwave_common::catalog::{ColumnId, Schema, TableDesc};
use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{DistributedLookupJoinNode, LocalLookupJoinNode};

use super::generic::{self, GenericPlanRef};
use super::utils::Distill;
use super::ExprRewritable;
use crate::expr::{Expr, ExprRewriter};
use crate::optimizer::plan_node::utils::IndicesDisplay;
Expand Down Expand Up @@ -112,6 +114,37 @@ impl BatchLookupJoin {
}
}

impl Distill for BatchLookupJoin {
fn distill<'a>(&self) -> Pretty<'a> {
let verbose = self.base.ctx.is_explain_verbose();
let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
vec.push(("type", Pretty::debug(&self.logical.join_type)));

let mut concat_schema = self.logical.left.schema().fields.clone();
concat_schema.extend(self.logical.right.schema().fields.clone());
let concat_schema = Schema::new(concat_schema);
vec.push((
"predicate",
Pretty::debug(&EqJoinPredicateDisplay {
eq_join_predicate: self.eq_join_predicate(),
input_schema: &concat_schema,
}),
));

if verbose {
let data = IndicesDisplay::from(
&self.logical.output_indices,
self.logical.internal_column_num(),
&concat_schema,
)
.map_or_else(|| Pretty::from("all"), |id| Pretty::display(&id));
vec.push(("output", data));
}

Pretty::childless_record("BatchLookupJoin", vec)
}
}

impl fmt::Display for BatchLookupJoin {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let verbose = self.base.ctx.is_explain_verbose();
Expand All @@ -130,23 +163,14 @@ impl fmt::Display for BatchLookupJoin {
);

if verbose {
if self
.logical
.output_indices
.iter()
.copied()
.eq(0..self.logical.internal_column_num())
{
builder.field("output", &format_args!("all"));
} else {
builder.field(
"output",
&IndicesDisplay {
indices: &self.logical.output_indices,
input_schema: &concat_schema,
},
);
}
match IndicesDisplay::from(
&self.logical.output_indices,
self.logical.internal_column_num(),
&concat_schema,
) {
None => builder.field("output", &format_args!("all")),
Some(id) => builder.field("output", &id),
};
}

builder.finish()
Expand Down
25 changes: 8 additions & 17 deletions src/frontend/src/optimizer/plan_node/batch_nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,23 +68,14 @@ impl fmt::Display for BatchNestedLoopJoin {
);

if verbose {
if self
.logical
.output_indices
.iter()
.copied()
.eq(0..self.logical.internal_column_num())
{
builder.field("output", &format_args!("all"));
} else {
builder.field(
"output",
&IndicesDisplay {
indices: &self.logical.output_indices,
input_schema: &concat_schema,
},
);
}
match IndicesDisplay::from(
&self.logical.output_indices,
self.logical.internal_column_num(),
&concat_schema,
) {
None => builder.field("output", &format_args!("all")),
Some(id) => builder.field("output", &id),
};
}

builder.finish()
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct Insert<PlanRef: Eq + Hash> {
pub returning: bool,
}

impl<PlanRef: GenericPlanRef + Eq + Hash> Insert<PlanRef> {
impl<PlanRef: GenericPlanRef> Insert<PlanRef> {
pub fn ctx(&self) -> OptimizerContextRef {
self.input.ctx()
}
Expand Down
24 changes: 8 additions & 16 deletions src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,14 @@ impl fmt::Display for LogicalJoin {
);

if verbose {
if self
.output_indices()
.iter()
.copied()
.eq(0..self.internal_column_num())
{
builder.field("output", &format_args!("all"));
} else {
builder.field(
"output",
&IndicesDisplay {
indices: self.output_indices(),
input_schema: &concat_schema,
},
);
}
match IndicesDisplay::from(
self.output_indices(),
self.internal_column_num(),
&concat_schema,
) {
None => builder.field("output", &format_args!("all")),
Some(id) => builder.field("output", &id),
};
}

builder.finish()
Expand Down
25 changes: 8 additions & 17 deletions src/frontend/src/optimizer/plan_node/stream_delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,14 @@ impl fmt::Display for StreamDeltaJoin {
);

if verbose {
if self
.logical
.output_indices
.iter()
.copied()
.eq(0..self.logical.internal_column_num())
{
builder.field("output", &format_args!("all"));
} else {
builder.field(
"output",
&IndicesDisplay {
indices: &self.logical.output_indices,
input_schema: &concat_schema,
},
);
}
match IndicesDisplay::from(
&self.logical.output_indices,
self.logical.internal_column_num(),
&concat_schema,
) {
None => builder.field("output", &format_args!("all")),
Some(id) => builder.field("output", &id),
};
}

builder.finish()
Expand Down
25 changes: 8 additions & 17 deletions src/frontend/src/optimizer/plan_node/stream_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,23 +351,14 @@ impl fmt::Display for StreamHashJoin {
};

if verbose {
if self
.logical
.output_indices
.iter()
.copied()
.eq(0..self.logical.internal_column_num())
{
builder.field("output", &format_args!("all"));
} else {
builder.field(
"output",
&IndicesDisplay {
indices: &self.logical.output_indices,
input_schema: &concat_schema,
},
);
}
match IndicesDisplay::from(
&self.logical.output_indices,
self.logical.internal_column_num(),
&concat_schema,
) {
None => builder.field("output", &format_args!("all")),
Some(id) => builder.field("output", &id),
};
}

builder.finish()
Expand Down
25 changes: 8 additions & 17 deletions src/frontend/src/optimizer/plan_node/stream_temporal_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,23 +124,14 @@ impl fmt::Display for StreamTemporalJoin {
};

if verbose {
if self
.logical
.output_indices
.iter()
.copied()
.eq(0..self.logical.internal_column_num())
{
builder.field("output", &format_args!("all"));
} else {
builder.field(
"output",
&IndicesDisplay {
indices: &self.logical.output_indices,
input_schema: &concat_schema,
},
);
}
match IndicesDisplay::from(
&self.logical.output_indices,
self.logical.internal_column_num(),
&concat_schema,
) {
None => builder.field("output", &format_args!("all")),
Some(id) => builder.field("output", &id),
};
}

builder.finish()
Expand Down
18 changes: 18 additions & 0 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,24 @@ pub struct IndicesDisplay<'a> {
pub input_schema: &'a Schema,
}

impl<'a> IndicesDisplay<'a> {
/// Returns `None` means all
pub fn from(
indices: &'a [usize],
internal_column_num: usize,
input_schema: &'a Schema,
) -> Option<Self> {
if indices.iter().copied().eq(0..internal_column_num) {
None
} else {
Some(Self {
indices,
input_schema,
})
}
}
}

impl fmt::Display for IndicesDisplay<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{self:?}")
Expand Down

0 comments on commit 2712751

Please sign in to comment.