Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement readable explain plans for physical plans #337

Merged
merged 5 commits into from
May 14, 2021

Conversation

alamb
Copy link
Contributor

@alamb alamb commented May 13, 2021

Which issue does this PR close?

#333

Rationale for this change

EXPLAIN output for physical plans is currently close to useless (in my opinion).

What changes are included in this PR?

  • Visitor pattern to traverse ExecutionPlans
  • New displayable function for displaying ExecutionPlans reasonably
  • Documentation and test

Note I will hope to use the same basic infrastructure to implement graphviz plans #219

Example new format

> explain verbose select * from foo where a < 4;
+-----------------------------------------+------------------------------------------------------------------------+
| plan_type                               | plan                                                                   |
+-----------------------------------------+------------------------------------------------------------------------+
| logical_plan                            | Projection: #a, #b, #c                                                 |
|                                         |   Filter: #a Lt Int64(4)                                               |
|                                         |     TableScan: foo projection=None                                     |
| logical_plan after projection_push_down | Projection: #a, #b, #c                                                 |
|                                         |   Filter: #a Lt Int64(4)                                               |
|                                         |     TableScan: foo projection=Some([0, 1, 2])                          |
| logical_plan after projection_push_down | Projection: #a, #b, #c                                                 |
|                                         |   Filter: #a Lt Int64(4)                                               |
|                                         |     TableScan: foo projection=Some([0, 1, 2])                          |
| physical_plan                           | ProjectionExec: expr=[a, b, c]                                         |
|                                         |  FilterExec: CAST(a AS Int64) < 4                                      |
|                                         |   CsvExec: source=Path(/tmp/foo.csv: [/tmp/foo.csv]), has_header=false |
+-----------------------------------------+------------------------------------------------------------------------+

Are there any user-facing changes?

Yes: output format for EXPLAIN VERBOSE has changed

New Output:

API changes

None: All changes are backwards compatible

Example old format

> explain verbose select * from foo where a < 4;

+-----------------------------------------+-------------------------------------------------+
| plan_type                               | plan                                            |
+-----------------------------------------+-------------------------------------------------+
| logical_plan                            | Projection: #a, #b, #c                          |
|                                         |   Filter: #a Lt Int64(4)                        |
|                                         |     TableScan: foo projection=None              |
| logical_plan after projection_push_down | Projection: #a, #b, #c                          |
|                                         |   Filter: #a Lt Int64(4)                        |
|                                         |     TableScan: foo projection=Some([0, 1, 2])   |
| logical_plan after projection_push_down | Projection: #a, #b, #c                          |
|                                         |   Filter: #a Lt Int64(4)                        |
|                                         |     TableScan: foo projection=Some([0, 1, 2])   |
| physical_plan                           | ProjectionExec {                                |
|                                         |     expr: [                                     |
|                                         |         (                                       |
|                                         |             Column {                            |
|                                         |                 name: "a",                      |
|                                         |             },                                  |
|                                         |             "a",                                |
|                                         |         ),                                      |
|                                         |         (                                       |
|                                         |             Column {                            |
|                                         |                 name: "b",                      |
|                                         |             },                                  |
|                                         |             "b",                                |
|                                         |         ),                                      |
|                                         |         (                                       |
|                                         |             Column {                            |
|                                         |                 name: "c",                      |
|                                         |             },                                  |
|                                         |             "c",                                |
|                                         |         ),                                      |
|                                         |     ],                                          |
|                                         |     schema: Schema {                            |
|                                         |         fields: [                               |
|                                         |             Field {                             |
|                                         |                 name: "a",                      |
|                                         |                 data_type: Int32,               |
|                                         |                 nullable: false,                |
|                                         |                 dict_id: 0,                     |
|                                         |                 dict_is_ordered: false,         |
|                                         |                 metadata: None,                 |
|                                         |             },                                  |
|                                         |             Field {                             |
|                                         |                 name: "b",                      |
|                                         |                 data_type: Int32,               |
|                                         |                 nullable: false,                |
|                                         |                 dict_id: 0,                     |
|                                         |                 dict_is_ordered: false,         |
|                                         |                 metadata: None,                 |
|                                         |             },                                  |
|                                         |             Field {                             |
|                                         |                 name: "c",                      |
|                                         |                 data_type: Int32,               |
|                                         |                 nullable: false,                |
|                                         |                 dict_id: 0,                     |
|                                         |                 dict_is_ordered: false,         |
|                                         |                 metadata: None,                 |
|                                         |             },                                  |
|                                         |         ],                                      |
|                                         |         metadata: {},                           |
|                                         |     },                                          |
|                                         |     input: FilterExec {                         |
|                                         |         predicate: BinaryExpr {                 |
|                                         |             left: TryCastExpr {                 |
|                                         |                 expr: Column {                  |
|                                         |                     name: "a",                  |
|                                         |                 },                              |
|                                         |                 cast_type: Int64,               |
|                                         |             },                                  |
|                                         |             op: Lt,                             |
|                                         |             right: Literal {                    |
|                                         |                 value: Int64(4),                |
|                                         |             },                                  |
|                                         |         },                                      |
|                                         |         input: CsvExec {                        |
|                                         |             source: PartitionedFiles {          |
|                                         |                 path: "/tmp/foo.csv",           |
|                                         |                 filenames: [                    |
|                                         |                     "/tmp/foo.csv",             |
|                                         |                 ],                              |
|                                         |             },                                  |
|                                         |             schema: Schema {                    |
|                                         |                 fields: [                       |
|                                         |                     Field {                     |
|                                         |                         name: "a",              |
|                                         |                         data_type: Int32,       |
|                                         |                         nullable: false,        |
|                                         |                         dict_id: 0,             |
|                                         |                         dict_is_ordered: false, |
|                                         |                         metadata: None,         |
|                                         |                     },                          |
|                                         |                     Field {                     |
|                                         |                         name: "b",              |
|                                         |                         data_type: Int32,       |
|                                         |                         nullable: false,        |
|                                         |                         dict_id: 0,             |
|                                         |                         dict_is_ordered: false, |
|                                         |                         metadata: None,         |
|                                         |                     },                          |
|                                         |                     Field {                     |
|                                         |                         name: "c",              |
|                                         |                         data_type: Int32,       |
|                                         |                         nullable: false,        |
|                                         |                         dict_id: 0,             |
|                                         |                         dict_is_ordered: false, |
|                                         |                         metadata: None,         |
|                                         |                     },                          |
|                                         |                 ],                              |
|                                         |                 metadata: {},                   |
|                                         |             },                                  |
|                                         |             has_header: false,                  |
|                                         |             delimiter: Some(                    |
|                                         |                 44,                             |
|                                         |             ),                                  |
|                                         |             file_extension: ".csv",             |
|                                         |             projection: Some(                   |
|                                         |                 [                               |
|                                         |                     0,                          |
|                                         |                     1,                          |
|                                         |                     2,                          |
|                                         |                 ],                              |
|                                         |             ),                                  |
|                                         |             projected_schema: Schema {          |
|                                         |                 fields: [                       |
|                                         |                     Field {                     |
|                                         |                         name: "a",              |
|                                         |                         data_type: Int32,       |
|                                         |                         nullable: false,        |
|                                         |                         dict_id: 0,             |
|                                         |                         dict_is_ordered: false, |
|                                         |                         metadata: None,         |
|                                         |                     },                          |
|                                         |                     Field {                     |
|                                         |                         name: "b",              |
|                                         |                         data_type: Int32,       |
|                                         |                         nullable: false,        |
|                                         |                         dict_id: 0,             |
|                                         |                         dict_is_ordered: false, |
|                                         |                         metadata: None,         |
|                                         |                     },                          |
|                                         |                     Field {                     |
|                                         |                         name: "c",              |
|                                         |                         data_type: Int32,       |
|                                         |                         nullable: false,        |
|                                         |                         dict_id: 0,             |
|                                         |                         dict_is_ordered: false, |
|                                         |                         metadata: None,         |
|                                         |                     },                          |
|                                         |                 ],                              |
|                                         |                 metadata: {},                   |
|                                         |             },                                  |
|                                         |             batch_size: 8192,                   |
|                                         |             limit: None,                        |
|                                         |         },                                      |
|                                         |     },                                          |
|                                         | }                                               |
+-----------------------------------------+-------------------------------------------------+

@@ -58,8 +52,7 @@ impl<'a, 'b> PlanVisitor for IndentVisitor<'a, 'b> {
if self.indent > 0 {
writeln!(self.f)?;
}
self.write_indent()?;

write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a better way to make indents that I found while googling around

#[derive(Debug, Clone, Copy)]
pub enum DisplayFormatType {
/// Default, compact format. Example: `FilterExec: c12 < 10.0`
Default,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I envision adding more types (e.g. Graphviz) as needs evolve

}
}

/// Return a [wrapper](DisplayableExecutionPlan) around an
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main proposed interface: displayable which returns a struct which then has several ways to display it. It would be ideal if I could add this to ExecutionPlan directly itself, but since it is a trait this was the best I could come up with (along with a bunch of documentation)

#[tokio::test]
async fn test_physical_plan_display_indent() {
let mut ctx = ExecutionContext::new();
register_aggregate_csv(&mut ctx).unwrap();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought one end to end test would be reasonable to make sure the output was ok and that it didn't regress, but also wouldn't take too much effort to maintain

Copy link
Contributor

@NGA-TRAN NGA-TRAN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not look into details but in general the changes are mostly visit+previsit+posvisit and formats which make sense. The test is very clear, too.

@@ -356,13 +356,15 @@ pub enum Partitioning {
/// after all children have been visited.
////
/// To use, define a struct that implements this trait and then invoke
/// "LogicalPlan::accept".
/// [`LogicalPlan::accept`].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: What this change does? better looking in the doc?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes it an hyperlink in API docs :)

/// visitor.post_visit(CsvExec)
/// visitor.post_visit(FilterExec)
/// visitor.post_visit(ProjectionExec)
/// ```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

" ProjectionExec: expr=[c1, MAX(c12), MIN(c12) as the_min]",
" HashAggregateExec: mode=Final, gby=[c1], aggr=[MAX(c12), MIN(c12)]",
" MergeExec",
" HashAggregateExec: mode=Partial, gby=[c1], aggr=[MAX(c12), MIN(c12)]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great to see the partial aggregate displayed here

" HashAggregateExec: mode=Partial, gby=[c1], aggr=[MAX(c12), MIN(c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
" FilterExec: c12 < CAST(10 AS Float64)",
" RepartitionExec: partitioning=RoundRobinBatch(16)",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the RepartitionExec does? to split is to smaller batches to send to multi-threads?

Copy link
Contributor

@Dandandan Dandandan May 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Round robin repartition will move the batches as is one by one to different partitions, in this case based on "round robin" so partion 1,2,3 etc. which are executed in different threads.
There is also hash repartition which sends the values based on hashed keys to different threads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. Thanks @Dandandan

" HashAggregateExec: mode=Partial, gby=[c1], aggr=[MAX(c12), MIN(c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
" FilterExec: c12 < CAST(10 AS Float64)",
" RepartitionExec: partitioning=RoundRobinBatch(16)",
Copy link
Contributor

@Dandandan Dandandan May 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should statically set the concurrency level in the execution config if we want to check the plan like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call -- I will do so.

@Dandandan
Copy link
Contributor

Looking much better @alamb !!!

Copy link
Contributor

@Dandandan Dandandan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good with the test fixed

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

@alamb alamb force-pushed the alamb/physical_display_not_suck branch from b9ca684 to 03b776f Compare May 14, 2021 17:30
@alamb
Copy link
Contributor Author

alamb commented May 14, 2021

Test fixed in 03b776f (I hope 🤞 )

@alamb alamb added datafusion Changes in the datafusion crate enhancement New feature or request labels May 14, 2021
@codecov-commenter
Copy link

Codecov Report

Merging #337 (03b776f) into master (b44238d) will decrease coverage by 0.47%.
The diff coverage is 49.19%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #337      +/-   ##
==========================================
- Coverage   76.07%   75.59%   -0.48%     
==========================================
  Files         142      143       +1     
  Lines       23788    23695      -93     
==========================================
- Hits        18097    17913     -184     
- Misses       5691     5782      +91     
Impacted Files Coverage Δ
datafusion/src/logical_plan/plan.rs 81.19% <ø> (ø)
datafusion/src/physical_plan/cross_join.rs 73.88% <0.00%> (-2.28%) ⬇️
...tafusion/src/physical_plan/distinct_expressions.rs 89.65% <0.00%> (-0.70%) ⬇️
datafusion/src/physical_plan/empty.rs 83.82% <0.00%> (-5.73%) ⬇️
datafusion/src/physical_plan/explain.rs 51.35% <0.00%> (-6.23%) ⬇️
...atafusion/src/physical_plan/expressions/average.rs 81.73% <0.00%> (-1.45%) ⬇️
datafusion/src/physical_plan/expressions/count.rs 84.04% <0.00%> (-1.83%) ⬇️
datafusion/src/physical_plan/expressions/sum.rs 76.92% <0.00%> (-1.00%) ⬇️
datafusion/src/physical_plan/hash_join.rs 86.40% <0.00%> (-1.09%) ⬇️
datafusion/src/physical_plan/memory.rs 67.27% <0.00%> (-13.17%) ⬇️
... and 53 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update b44238d...03b776f. Read the comment docs.

Copy link
Member

@jorgecarleitao jorgecarleitao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks amazing! Thanks a lot, @alamb !

@alamb alamb force-pushed the alamb/physical_display_not_suck branch from 03b776f to 3610906 Compare May 14, 2021 18:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants