Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(explain): add missed fields in explain streaming dist plan #8544

Merged
merged 5 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 74 additions & 74 deletions src/frontend/planner_test/tests/testdata/distribution_derive.yaml

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions src/frontend/planner_test/tests/testdata/join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -668,12 +668,12 @@
Upstream
BatchPlanNode

Table 0 { columns: [t_src, t_dst, t_src_0, t_dst_0, t__row_id, t__row_id_0], primary key: [$2 ASC, $0 ASC, $4 ASC, $5 ASC, $1 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [0] }
Table 1 { columns: [t_src, t_src_0, t__row_id, t__row_id_0, t_dst, _degree], primary key: [$0 ASC, $1 ASC, $2 ASC, $3 ASC, $4 ASC], value indices: [5], distribution key: [1] }
Table 2 { columns: [t_dst, t__row_id], primary key: [$0 ASC, $0 ASC, $1 ASC], value indices: [0, 1], distribution key: [0] }
Table 3 { columns: [t_dst, t_dst_0, t__row_id, _degree], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3], distribution key: [1] }
Table 4 { columns: [t_src, t_dst, t__row_id], primary key: [$1 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [1] }
Table 5 { columns: [t_dst, t__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Table 6 { columns: [t_src, t_dst, t__row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] }
Table 7 { columns: [t_src, t__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Table 4294967294 { columns: [p1, p2, p3, t._row_id, t._row_id#1, t.src, t._row_id#2], primary key: [$3 ASC, $4 ASC, $1 ASC, $6 ASC, $5 ASC, $0 ASC], value indices: [0, 1, 2, 3, 4, 5, 6], distribution key: [0] }
Table 0 { columns: [t_src, t_dst, t_src_0, t_dst_0, t__row_id, t__row_id_0], primary key: [$2 ASC, $0 ASC, $4 ASC, $5 ASC, $1 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [0], read_pk_prefix_len_hint: 2 }
Table 1 { columns: [t_src, t_src_0, t__row_id, t__row_id_0, t_dst, _degree], primary key: [$0 ASC, $1 ASC, $2 ASC, $3 ASC, $4 ASC], value indices: [5], distribution key: [1], read_pk_prefix_len_hint: 2 }
Table 2 { columns: [t_dst, t__row_id], primary key: [$0 ASC, $0 ASC, $1 ASC], value indices: [0, 1], distribution key: [0], read_pk_prefix_len_hint: 2 }
Table 3 { columns: [t_dst, t_dst_0, t__row_id, _degree], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3], distribution key: [1], read_pk_prefix_len_hint: 2 }
Table 4 { columns: [t_src, t_dst, t__row_id], primary key: [$1 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [1], read_pk_prefix_len_hint: 1 }
Table 5 { columns: [t_dst, t__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read_pk_prefix_len_hint: 1 }
Table 6 { columns: [t_src, t_dst, t__row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0], read_pk_prefix_len_hint: 1 }
Table 7 { columns: [t_src, t__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read_pk_prefix_len_hint: 1 }
Table 4294967294 { columns: [p1, p2, p3, t._row_id, t._row_id#1, t.src, t._row_id#2], primary key: [$3 ASC, $4 ASC, $1 ASC, $6 ASC, $5 ASC, $0 ASC], value indices: [0, 1, 2, 3, 4, 5, 6], distribution key: [0], read_pk_prefix_len_hint: 6 }
256 changes: 130 additions & 126 deletions src/frontend/planner_test/tests/testdata/nexmark.yaml

Large diffs are not rendered by default.

320 changes: 162 additions & 158 deletions src/frontend/planner_test/tests/testdata/nexmark_source.yaml

Large diffs are not rendered by default.

244 changes: 122 additions & 122 deletions src/frontend/planner_test/tests/testdata/stream_dist_agg.yaml

Large diffs are not rendered by default.

697 changes: 349 additions & 348 deletions src/frontend/planner_test/tests/testdata/tpch.yaml

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions src/frontend/planner_test/tests/testdata/union.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
Upstream
BatchPlanNode

Table 4294967294 { columns: [a, b, c, t1._row_id, null:Int64, 0:Int32], primary key: [$3 ASC, $4 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [3, 4, 5] }
Table 4294967294 { columns: [a, b, c, t1._row_id, null:Int64, 0:Int32], primary key: [$3 ASC, $4 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [3, 4, 5], read_pk_prefix_len_hint: 3 }
- sql: |
create table t1 (a int, b numeric, c bigint);
create table t2 (a int, b numeric, c bigint);
Expand Down Expand Up @@ -75,7 +75,7 @@
materialized table: 4294967294
StreamProject { exprs: [t1.a, t1.b, t1.c] }
StreamHashAgg { group_key: [t1.a, t1.b, t1.c], aggs: [count] }
result table: 0, state tables: []
result table: 0, state tables: [], distinct_tables: []
StreamExchange Hash([0, 1, 2]) from 1

Fragment 1
Expand All @@ -95,8 +95,8 @@
Upstream
BatchPlanNode

Table 0 { columns: [t1_a, t1_b, t1_c, count], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3], distribution key: [0, 1, 2] }
Table 4294967294 { columns: [a, b, c], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0, 1, 2] }
Table 0 { columns: [t1_a, t1_b, t1_c, count], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3], distribution key: [0, 1, 2], read_pk_prefix_len_hint: 3 }
Table 4294967294 { columns: [a, b, c], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0, 1, 2], read_pk_prefix_len_hint: 3 }
- sql: |
create table t1 (a int, b numeric, c bigint, primary key(a));
create table t2 (a int, b numeric, c bigint, primary key(a));
Expand Down Expand Up @@ -133,7 +133,7 @@
materialized table: 4294967294
StreamProject { exprs: [t1.a, t1.b, t1.c] }
StreamHashAgg { group_key: [t1.a, t1.b, t1.c], aggs: [count] }
result table: 0, state tables: []
result table: 0, state tables: [], distinct_tables: []
StreamExchange Hash([0, 1, 2]) from 1

Fragment 1
Expand All @@ -153,8 +153,8 @@
Upstream
BatchPlanNode

Table 0 { columns: [t1_a, t1_b, t1_c, count], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3], distribution key: [0, 1, 2] }
Table 4294967294 { columns: [a, b, c], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0, 1, 2] }
Table 0 { columns: [t1_a, t1_b, t1_c, count], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3], distribution key: [0, 1, 2], read_pk_prefix_len_hint: 3 }
Table 4294967294 { columns: [a, b, c], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0, 1, 2], read_pk_prefix_len_hint: 3 }
- sql: |
create table t1 (a int, b numeric, c bigint);
create table t2 (a int, b numeric, c bigint);
Expand Down
165 changes: 96 additions & 69 deletions src/frontend/src/utils/stream_graph_formatter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl StreamGraphFormatter {
let tb = TableCatalog::from(tb.clone());
writeln!(
f,
" Table {} {{ columns: [{}], primary key: {:?}, value indices: {:?}, distribution key: {:?}{} }}",
" Table {} {{ columns: [{}], primary key: {:?}, value indices: {:?}, distribution key: {:?}, read_pk_prefix_len_hint: {:?}{} }}",
st1page marked this conversation as resolved.
Show resolved Hide resolved
tb.id,
tb.columns
.iter()
Expand All @@ -96,6 +96,7 @@ impl StreamGraphFormatter {
tb.pk,
tb.value_indices,
tb.distribution_key,
tb.read_prefix_len_hint,
if let Some(vnode_col_idx) = tb.vnode_col_index {
format!(", vnode column idx: {}", vnode_col_idx)
} else {
Expand Down Expand Up @@ -139,23 +140,26 @@ impl StreamGraphFormatter {
}
_ => node.identity.clone(),
};

writeln!(f, "{}{}", " ".repeat(level * 2), one_line_explain)?;
let explain_table_oneline =
match node.get_node_body().unwrap() {
stream_node::NodeBody::Source(node) => node.source_inner.as_ref().map(|source| {
format!(
"source state table: {}",
self.add_table(source.get_state_table().unwrap())
)
}),
stream_node::NodeBody::Materialize(node) => Some(format!(
"materialized table: {}",
self.add_table(node.get_table().unwrap())
)),
stream_node::NodeBody::GlobalSimpleAgg(node) => Some(format!(
"result table: {}, state tables: [{}]",
self.add_table(node.get_result_table().unwrap()),
node.agg_call_states
let explain_table_oneline = match node.get_node_body().unwrap() {
stream_node::NodeBody::Source(node) => node.source_inner.as_ref().map(|source| {
format!(
"source state table: {}",
self.add_table(source.get_state_table().unwrap())
)
}),
stream_node::NodeBody::Materialize(node) => Some(format!(
"materialized table: {}",
self.add_table(node.get_table().unwrap())
)),
stream_node::NodeBody::GlobalSimpleAgg(inner) => {
let in_fields = &node.get_input()[0].fields;
Some(format!(
"result table: {}, state tables: [{}], distinct_tables: [{}]",
st1page marked this conversation as resolved.
Show resolved Hide resolved
self.add_table(inner.get_result_table().unwrap()),
inner
.agg_call_states
.iter()
.filter_map(|state| match state.get_inner().unwrap() {
agg_call_state::Inner::ResultValueState(_) => None,
Expand All @@ -164,12 +168,25 @@ impl StreamGraphFormatter {
MaterializedInputState { table, .. },
) => Some(self.add_table(table.as_ref().unwrap())),
})
.join(", ")
)),
stream_node::NodeBody::HashAgg(node) => Some(format!(
"result table: {}, state tables: [{}]",
self.add_table(node.get_result_table().unwrap()),
node.agg_call_states
.join(", "),
inner
.get_distinct_dedup_tables()
.iter()
.map(|(i, table)| format!(
"(distinct key: {}, table_id: {})",
st1page marked this conversation as resolved.
Show resolved Hide resolved
in_fields[*i as usize].name,
self.add_table(table)
))
.join(", "),
))
}
stream_node::NodeBody::HashAgg(inner) => {
let in_fields = &node.get_input()[0].fields;
Some(format!(
"result table: {}, state tables: [{}], distinct_tables: [{}]",
st1page marked this conversation as resolved.
Show resolved Hide resolved
self.add_table(inner.get_result_table().unwrap()),
inner
.agg_call_states
.iter()
.filter_map(|state| match state.get_inner().unwrap() {
agg_call_state::Inner::ResultValueState(_) => None,
Expand All @@ -178,52 +195,62 @@ impl StreamGraphFormatter {
MaterializedInputState { table, .. },
) => Some(self.add_table(table.as_ref().unwrap())),
})
.join(", ")
)),
stream_node::NodeBody::HashJoin(node) => Some(format!(
"left table: {}, right table {},{}{}",
self.add_table(node.get_left_table().unwrap()),
self.add_table(node.get_right_table().unwrap()),
match &node.left_degree_table {
Some(tb) => format!(" left degree table: {},", self.add_table(tb)),
None => "".to_string(),
},
match &node.right_degree_table {
Some(tb) => format!(" right degree table: {},", self.add_table(tb)),
None => "".to_string(),
},
)),
stream_node::NodeBody::TopN(node) => Some(format!(
"state table: {}",
self.add_table(node.get_table().unwrap())
)),
stream_node::NodeBody::AppendOnlyTopN(node) => Some(format!(
"state table: {}",
self.add_table(node.get_table().unwrap())
)),
stream_node::NodeBody::Arrange(node) => Some(format!(
"arrange table: {}",
self.add_table(node.get_table().unwrap())
)),
stream_node::NodeBody::DynamicFilter(node) => Some(format!(
"left table: {}, right table {}",
self.add_table(node.get_left_table().unwrap()),
self.add_table(node.get_right_table().unwrap()),
)),
stream_node::NodeBody::GroupTopN(node) => Some(format!(
"state table: {}",
self.add_table(node.get_table().unwrap())
)),
stream_node::NodeBody::AppendOnlyGroupTopN(node) => Some(format!(
"state table: {}",
self.add_table(node.get_table().unwrap())
)),
stream_node::NodeBody::Now(node) => Some(format!(
"state table: {}",
self.add_table(node.get_state_table().unwrap())
)),
_ => None,
};
.join(", "),
inner
.get_distinct_dedup_tables()
.iter()
.map(|(i, table)| format!(
"(distinct key: {}, table_id: {})",
st1page marked this conversation as resolved.
Show resolved Hide resolved
in_fields[*i as usize].name,
self.add_table(table)
))
.join(", "),
))
}
stream_node::NodeBody::HashJoin(node) => Some(format!(
"left table: {}, right table {},{}{}",
self.add_table(node.get_left_table().unwrap()),
self.add_table(node.get_right_table().unwrap()),
match &node.left_degree_table {
Some(tb) => format!(" left degree table: {},", self.add_table(tb)),
None => "".to_string(),
},
match &node.right_degree_table {
Some(tb) => format!(" right degree table: {},", self.add_table(tb)),
None => "".to_string(),
},
)),
stream_node::NodeBody::TopN(node) => Some(format!(
"state table: {}",
self.add_table(node.get_table().unwrap())
)),
stream_node::NodeBody::AppendOnlyTopN(node) => Some(format!(
"state table: {}",
self.add_table(node.get_table().unwrap())
)),
stream_node::NodeBody::Arrange(node) => Some(format!(
"arrange table: {}",
self.add_table(node.get_table().unwrap())
)),
stream_node::NodeBody::DynamicFilter(node) => Some(format!(
"left table: {}, right table {}",
self.add_table(node.get_left_table().unwrap()),
self.add_table(node.get_right_table().unwrap()),
)),
stream_node::NodeBody::GroupTopN(node) => Some(format!(
"state table: {}",
self.add_table(node.get_table().unwrap())
)),
stream_node::NodeBody::AppendOnlyGroupTopN(node) => Some(format!(
"state table: {}",
self.add_table(node.get_table().unwrap())
)),
stream_node::NodeBody::Now(node) => Some(format!(
"state table: {}",
self.add_table(node.get_state_table().unwrap())
)),
_ => None,
};
if let Some(explain_table_oneline) = explain_table_oneline {
writeln!(f, "{}{}", " ".repeat(level * 2 + 4), explain_table_oneline)?;
}
Expand Down