Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(plan_node_fmt): slightly generalize the fmt of join-related nodes #10253

Merged
merged 9 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 39 additions & 17 deletions src/frontend/src/optimizer/plan_node/batch_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

use std::fmt;

use pretty_xmlish::Pretty;
use risingwave_common::catalog::Schema;
use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::HashJoinNode;
use risingwave_pb::plan_common::JoinType;

use super::generic::{self, GenericPlanRef};
use super::utils::Distill;
use super::{
EqJoinPredicate, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, ToBatchPb,
ToDistributedBatch,
Expand Down Expand Up @@ -98,6 +100,35 @@ impl BatchHashJoin {
}
}

impl Distill for BatchHashJoin {
fn distill<'a>(&self) -> Pretty<'a> {
let verbose = self.base.ctx.is_explain_verbose();
let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
vec.push(("type", Pretty::debug(&self.logical.join_type)));
let mut concat_schema = self.left().schema().fields.clone();
concat_schema.extend(self.right().schema().fields.clone());
let concat_schema = Schema::new(concat_schema);

vec.push((
"predicate",
Pretty::debug(&EqJoinPredicateDisplay {
eq_join_predicate: self.eq_join_predicate(),
input_schema: &concat_schema,
}),
));
if verbose {
let data = IndicesDisplay::from(
&self.logical.output_indices,
self.logical.internal_column_num(),
&concat_schema,
)
.map_or_else(|| Pretty::from("all"), |id| Pretty::display(&id));
vec.push(("output", data));
}
Pretty::childless_record("BatchHashJoin", vec)
}
}

impl fmt::Display for BatchHashJoin {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let verbose = self.base.ctx.is_explain_verbose();
Expand All @@ -116,23 +147,14 @@ impl fmt::Display for BatchHashJoin {
);

if verbose {
if self
.logical
.output_indices
.iter()
.copied()
.eq(0..self.logical.internal_column_num())
{
builder.field("output", &format_args!("all"));
} else {
builder.field(
"output",
&IndicesDisplay {
indices: &self.logical.output_indices,
input_schema: &concat_schema,
},
);
}
match IndicesDisplay::from(
&self.logical.output_indices,
self.logical.internal_column_num(),
&concat_schema,
) {
None => builder.field("output", &format_args!("all")),
Some(id) => builder.field("output", &id),
};
}

builder.finish()
Expand Down
11 changes: 10 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

use std::fmt;

use pretty_xmlish::Pretty;
use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::InsertNode;
use risingwave_pb::plan_common::{DefaultColumns, IndexAndExpr};

use super::utils::Distill;
use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch};
use crate::expr::Expr;
use crate::optimizer::plan_node::{PlanBase, ToLocalBatch};
Expand All @@ -44,12 +46,19 @@ impl BatchInsert {
}
}

impl Distill for BatchInsert {
fn distill<'a>(&self) -> Pretty<'a> {
let vec = self
.logical
.fields_pretty(self.base.ctx.is_explain_verbose());
Pretty::childless_record("BatchInsert", vec)
}
}
impl fmt::Display for BatchInsert {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.logical.fmt_with_name(f, "BatchInsert")
}
}
// impl_distill_by_unit!(BatchInsert, logical, "BatchInsert");

impl PlanTreeNodeUnary for BatchInsert {
fn input(&self) -> PlanRef {
Expand Down
58 changes: 41 additions & 17 deletions src/frontend/src/optimizer/plan_node/batch_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@

use std::fmt;

use pretty_xmlish::Pretty;
use risingwave_common::catalog::{ColumnId, Schema, TableDesc};
use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{DistributedLookupJoinNode, LocalLookupJoinNode};

use super::generic::{self, GenericPlanRef};
use super::utils::Distill;
use super::ExprRewritable;
use crate::expr::{Expr, ExprRewriter};
use crate::optimizer::plan_node::utils::IndicesDisplay;
Expand Down Expand Up @@ -112,6 +114,37 @@ impl BatchLookupJoin {
}
}

impl Distill for BatchLookupJoin {
fn distill<'a>(&self) -> Pretty<'a> {
let verbose = self.base.ctx.is_explain_verbose();
let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
vec.push(("type", Pretty::debug(&self.logical.join_type)));

let mut concat_schema = self.logical.left.schema().fields.clone();
concat_schema.extend(self.logical.right.schema().fields.clone());
let concat_schema = Schema::new(concat_schema);
vec.push((
"predicate",
Pretty::debug(&EqJoinPredicateDisplay {
eq_join_predicate: self.eq_join_predicate(),
input_schema: &concat_schema,
}),
));

if verbose {
let data = IndicesDisplay::from(
&self.logical.output_indices,
self.logical.internal_column_num(),
&concat_schema,
)
.map_or_else(|| Pretty::from("all"), |id| Pretty::display(&id));
vec.push(("output", data));
}

Pretty::childless_record("BatchLookupJoin", vec)
}
}

impl fmt::Display for BatchLookupJoin {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let verbose = self.base.ctx.is_explain_verbose();
Expand All @@ -130,23 +163,14 @@ impl fmt::Display for BatchLookupJoin {
);

if verbose {
if self
.logical
.output_indices
.iter()
.copied()
.eq(0..self.logical.internal_column_num())
{
builder.field("output", &format_args!("all"));
} else {
builder.field(
"output",
&IndicesDisplay {
indices: &self.logical.output_indices,
input_schema: &concat_schema,
},
);
}
match IndicesDisplay::from(
&self.logical.output_indices,
self.logical.internal_column_num(),
&concat_schema,
) {
None => builder.field("output", &format_args!("all")),
Some(id) => builder.field("output", &id),
};
}

builder.finish()
Expand Down
25 changes: 8 additions & 17 deletions src/frontend/src/optimizer/plan_node/batch_nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,23 +68,14 @@ impl fmt::Display for BatchNestedLoopJoin {
);

if verbose {
if self
.logical
.output_indices
.iter()
.copied()
.eq(0..self.logical.internal_column_num())
{
builder.field("output", &format_args!("all"));
} else {
builder.field(
"output",
&IndicesDisplay {
indices: &self.logical.output_indices,
input_schema: &concat_schema,
},
);
}
match IndicesDisplay::from(
&self.logical.output_indices,
self.logical.internal_column_num(),
&concat_schema,
) {
None => builder.field("output", &format_args!("all")),
Some(id) => builder.field("output", &id),
};
}

builder.finish()
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct Insert<PlanRef: Eq + Hash> {
pub returning: bool,
}

impl<PlanRef: GenericPlanRef + Eq + Hash> Insert<PlanRef> {
impl<PlanRef: GenericPlanRef> Insert<PlanRef> {
pub fn ctx(&self) -> OptimizerContextRef {
self.input.ctx()
}
Expand Down
24 changes: 8 additions & 16 deletions src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,14 @@ impl fmt::Display for LogicalJoin {
);

if verbose {
if self
.output_indices()
.iter()
.copied()
.eq(0..self.internal_column_num())
{
builder.field("output", &format_args!("all"));
} else {
builder.field(
"output",
&IndicesDisplay {
indices: self.output_indices(),
input_schema: &concat_schema,
},
);
}
match IndicesDisplay::from(
self.output_indices(),
self.internal_column_num(),
&concat_schema,
) {
None => builder.field("output", &format_args!("all")),
Some(id) => builder.field("output", &id),
};
}

builder.finish()
Expand Down
25 changes: 8 additions & 17 deletions src/frontend/src/optimizer/plan_node/stream_delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,14 @@ impl fmt::Display for StreamDeltaJoin {
);

if verbose {
if self
.logical
.output_indices
.iter()
.copied()
.eq(0..self.logical.internal_column_num())
{
builder.field("output", &format_args!("all"));
} else {
builder.field(
"output",
&IndicesDisplay {
indices: &self.logical.output_indices,
input_schema: &concat_schema,
},
);
}
match IndicesDisplay::from(
&self.logical.output_indices,
self.logical.internal_column_num(),
&concat_schema,
) {
None => builder.field("output", &format_args!("all")),
Some(id) => builder.field("output", &id),
};
}

builder.finish()
Expand Down
25 changes: 8 additions & 17 deletions src/frontend/src/optimizer/plan_node/stream_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,23 +351,14 @@ impl fmt::Display for StreamHashJoin {
};

if verbose {
if self
.logical
.output_indices
.iter()
.copied()
.eq(0..self.logical.internal_column_num())
{
builder.field("output", &format_args!("all"));
} else {
builder.field(
"output",
&IndicesDisplay {
indices: &self.logical.output_indices,
input_schema: &concat_schema,
},
);
}
match IndicesDisplay::from(
&self.logical.output_indices,
self.logical.internal_column_num(),
&concat_schema,
) {
None => builder.field("output", &format_args!("all")),
Some(id) => builder.field("output", &id),
};
}

builder.finish()
Expand Down
25 changes: 8 additions & 17 deletions src/frontend/src/optimizer/plan_node/stream_temporal_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,23 +124,14 @@ impl fmt::Display for StreamTemporalJoin {
};

if verbose {
if self
.logical
.output_indices
.iter()
.copied()
.eq(0..self.logical.internal_column_num())
{
builder.field("output", &format_args!("all"));
} else {
builder.field(
"output",
&IndicesDisplay {
indices: &self.logical.output_indices,
input_schema: &concat_schema,
},
);
}
match IndicesDisplay::from(
&self.logical.output_indices,
self.logical.internal_column_num(),
&concat_schema,
) {
None => builder.field("output", &format_args!("all")),
Some(id) => builder.field("output", &id),
};
}

builder.finish()
Expand Down
18 changes: 18 additions & 0 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,24 @@ pub struct IndicesDisplay<'a> {
pub input_schema: &'a Schema,
}

impl<'a> IndicesDisplay<'a> {
/// Returns `None` means all
pub fn from(
indices: &'a [usize],
internal_column_num: usize,
input_schema: &'a Schema,
) -> Option<Self> {
if indices.iter().copied().eq(0..internal_column_num) {
None
} else {
Some(Self {
indices,
input_schema,
})
}
}
}

impl fmt::Display for IndicesDisplay<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{self:?}")
Expand Down