Skip to content

Commit

Permalink
feat(pretty): introduce the proposed pretty printer in stream_graph_f…
Browse files Browse the repository at this point in the history
…ormatter (#8576)
  • Loading branch information
ice1000 authored Mar 18, 2023
1 parent 99f4604 commit 6940ae4
Show file tree
Hide file tree
Showing 14 changed files with 5,564 additions and 3,247 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ petgraph = "0.6"
pgwire = { path = "../utils/pgwire" }
pin-project-lite = "0.2"
postgres-types = { version = "0.2.4" }
pretty-xmlish = "0.1.10"
prometheus = { version = "0.13", features = ["process"] }
rand = "0.8"
risingwave_batch = { path = "../batch" }
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ impl TestCase {
// Only generate stream_dist_plan if it is specified in test case
if self.stream_dist_plan.is_some() {
let graph = build_graph(stream_plan);
ret.stream_dist_plan = Some(explain_stream_graph(&graph, false).unwrap());
ret.stream_dist_plan = Some(explain_stream_graph(&graph, false));
}
}
}
Expand Down
975 changes: 666 additions & 309 deletions src/frontend/planner_test/tests/testdata/distribution_derive.yaml

Large diffs are not rendered by default.

108 changes: 55 additions & 53 deletions src/frontend/planner_test/tests/testdata/join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -638,66 +638,68 @@
| └─StreamTableScan { table: t, columns: [t.src, t.dst, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamExchange { dist: HashShard(t.dst) }
└─StreamTableScan { table: t, columns: [t.dst, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
stream_dist_plan: |
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [p1, p2, p3, t._row_id(hidden), t._row_id#1(hidden), t.src(hidden), t._row_id#2(hidden)], pk_columns: [t._row_id, t._row_id#1, p2, t._row_id#2, t.src, p1], pk_conflict: "no check" }
materialized table: 4294967294
StreamHashJoin { type: Inner, predicate: t.src = t.dst AND t.src = t.dst, output: [t.src, t.dst, t.dst, t._row_id, t._row_id, t.src, t._row_id] }
left table: 0, right table 2, left degree table: 1, right degree table: 3,
StreamExchange Hash([0]) from 1
StreamExchange Hash([0]) from 4
StreamMaterialize { columns: [p1, p2, p3, t._row_id(hidden), t._row_id#1(hidden), t.src(hidden), t._row_id#2(hidden)], pk_columns: [t._row_id, t._row_id#1, p2, t._row_id#2, t.src, p1], pk_conflict: "no check" }
├── materialized table: 4294967294
└── StreamHashJoin { type: Inner, predicate: t.src = t.dst AND t.src = t.dst, output: [t.src, t.dst, t.dst, t._row_id, t._row_id, t.src, t._row_id] }
├── left table: 0
├── right table: 2
├── left degree table: 1
├── right degree table: 3
├── StreamExchange Hash([0]) from 1
└── StreamExchange Hash([0]) from 4
Fragment 1
StreamHashJoin { type: Inner, predicate: t.dst = t.src, output: [t.src, t.dst, t.src, t.dst, t._row_id, t._row_id] }
left table: 4, right table 6, left degree table: 5, right degree table: 7,
StreamExchange Hash([1]) from 2
StreamExchange Hash([0]) from 3
StreamHashJoin { type: Inner, predicate: t.dst = t.src, output: [t.src, t.dst, t.src, t.dst, t._row_id, t._row_id] } { left table: 4, right table: 6, left degree table: 5, right degree table: 7 }
├── StreamExchange Hash([1]) from 2
└── StreamExchange Hash([0]) from 3
Fragment 2
Chain { table: t, columns: [t.src, t.dst, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
Upstream
BatchPlanNode
Chain { table: t, columns: [t.src, t.dst, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
├── Upstream
└── BatchPlanNode
Fragment 3
Chain { table: t, columns: [t.src, t.dst, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
Upstream
BatchPlanNode
Chain { table: t, columns: [t.src, t.dst, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
├── Upstream
└── BatchPlanNode
Fragment 4
Chain { table: t, columns: [t.dst, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
Upstream
BatchPlanNode
Chain { table: t, columns: [t.dst, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
├── Upstream
└── BatchPlanNode
Table 0
├── columns: [ t_src, t_dst, t_src_0, t_dst_0, t__row_id, t__row_id_0 ]
├── primary key: [ $2 ASC, $0 ASC, $4 ASC, $5 ASC, $1 ASC ]
├── value indices: [ 0, 1, 2, 3, 4, 5 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 2
Table 1
├── columns: [ t_src, t_src_0, t__row_id, t__row_id_0, t_dst, _degree ]
├── primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC, $4 ASC ]
├── value indices: [ 5 ]
├── distribution key: [ 1 ]
└── read pk prefix len hint: 2
Table 2 { columns: [ t_dst, t__row_id ], primary key: [ $0 ASC, $0 ASC, $1 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 2 }
Table 3 { columns: [ t_dst, t_dst_0, t__row_id, _degree ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3 ], distribution key: [ 1 ], read pk prefix len hint: 2 }
Table 4 { columns: [ t_src, t_dst, t__row_id ], primary key: [ $1 ASC, $2 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 1 ], read pk prefix len hint: 1 }
Table 5 { columns: [ t_dst, t__row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 6 { columns: [ t_src, t_dst, t__row_id ], primary key: [ $0 ASC, $2 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 7 { columns: [ t_src, t__row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 4294967294
├── columns: [ p1, p2, p3, t._row_id, t._row_id#1, t.src, t._row_id#2 ]
├── primary key: [ $3 ASC, $4 ASC, $1 ASC, $6 ASC, $5 ASC, $0 ASC ]
├── value indices: [ 0, 1, 2, 3, 4, 5, 6 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 6
Table 0 { columns: [t_src, t_dst, t_src_0, t_dst_0, t__row_id, t__row_id_0], primary key: [$2 ASC, $0 ASC, $4 ASC, $5 ASC, $1 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [0], read pk prefix len hint: 2 }
Table 1 { columns: [t_src, t_src_0, t__row_id, t__row_id_0, t_dst, _degree], primary key: [$0 ASC, $1 ASC, $2 ASC, $3 ASC, $4 ASC], value indices: [5], distribution key: [1], read pk prefix len hint: 2 }
Table 2 { columns: [t_dst, t__row_id], primary key: [$0 ASC, $0 ASC, $1 ASC], value indices: [0, 1], distribution key: [0], read pk prefix len hint: 2 }
Table 3 { columns: [t_dst, t_dst_0, t__row_id, _degree], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3], distribution key: [1], read pk prefix len hint: 2 }
Table 4 { columns: [t_src, t_dst, t__row_id], primary key: [$1 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [1], read pk prefix len hint: 1 }
Table 5 { columns: [t_dst, t__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 }
Table 6 { columns: [t_src, t_dst, t__row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0], read pk prefix len hint: 1 }
Table 7 { columns: [t_src, t__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 }
Table 4294967294 { columns: [p1, p2, p3, t._row_id, t._row_id#1, t.src, t._row_id#2], primary key: [$3 ASC, $4 ASC, $1 ASC, $6 ASC, $5 ASC, $0 ASC], value indices: [0, 1, 2, 3, 4, 5, 6], distribution key: [0], read pk prefix len hint: 6 }
- name: Fix hash join distribution key (https://github.com/risingwavelabs/risingwave/issues/8537)
sql: |
CREATE TABLE part (
p INTEGER,
c VARCHAR,
PRIMARY KEY (p)
);
CREATE TABLE B (
b INTEGER,
d VARCHAR,
PRIMARY KEY (b)
);
select B.* from part join B on part.c = B.d join part p1 on p1.p = part.p and p1.p = B.b;
stream_plan: |
StreamMaterialize { columns: [b, d, part.p(hidden), part.c(hidden), part.p#1(hidden)], pk_columns: [part.p, b, part.c, part.p#1], pk_conflict: "no check" }
└─StreamHashJoin { type: Inner, predicate: part.p = part.p AND b.b = part.p, output: [b.b, b.d, part.p, part.c, part.p] }
├─StreamExchange { dist: HashShard(part.p, b.b) }
| └─StreamHashJoin { type: Inner, predicate: part.c = b.d, output: [part.p, b.b, b.d, part.c] }
| ├─StreamExchange { dist: HashShard(part.c) }
| | └─StreamTableScan { table: part, columns: [part.p, part.c], pk: [part.p], dist: UpstreamHashShard(part.p) }
| └─StreamExchange { dist: HashShard(b.d) }
| └─StreamTableScan { table: b, columns: [b.b, b.d], pk: [b.b], dist: UpstreamHashShard(b.b) }
└─StreamExchange { dist: HashShard(part.p, part.p) }
└─StreamTableScan { table: part, columns: [part.p], pk: [part.p], dist: UpstreamHashShard(part.p) }
Loading

0 comments on commit 6940ae4

Please sign in to comment.