Skip to content

Commit

Permalink
Implement readable explain plans for physical plans (#337)
Browse files Browse the repository at this point in the history
* Implement readable explain plans for physical plans

* Add apache copyright to display.rs

* Set concurrency explictly in test and make it windows friendly

* fix doc example test

* fmt!
  • Loading branch information
alamb authored May 14, 2021
1 parent b096539 commit 9cf32cf
Show file tree
Hide file tree
Showing 30 changed files with 683 additions and 48 deletions.
13 changes: 3 additions & 10 deletions datafusion/src/logical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ pub struct IndentVisitor<'a, 'b> {
f: &'a mut fmt::Formatter<'b>,
/// If true, includes summarized schema information
with_schema: bool,
indent: u32,
/// The current indent
indent: usize,
}

impl<'a, 'b> IndentVisitor<'a, 'b> {
Expand All @@ -42,13 +43,6 @@ impl<'a, 'b> IndentVisitor<'a, 'b> {
indent: 0,
}
}

fn write_indent(&mut self) -> fmt::Result {
for _ in 0..self.indent {
write!(self.f, " ")?;
}
Ok(())
}
}

impl<'a, 'b> PlanVisitor for IndentVisitor<'a, 'b> {
Expand All @@ -58,8 +52,7 @@ impl<'a, 'b> PlanVisitor for IndentVisitor<'a, 'b> {
if self.indent > 0 {
writeln!(self.f)?;
}
self.write_indent()?;

write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
write!(self.f, "{}", plan.display())?;
if self.with_schema {
write!(
Expand Down
4 changes: 3 additions & 1 deletion datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,15 @@ pub enum Partitioning {
/// after all children have been visited.
////
/// To use, define a struct that implements this trait and then invoke
/// "LogicalPlan::accept".
/// [`LogicalPlan::accept`].
///
/// For example, for a logical plan like:
///
/// ```text
/// Projection: #id
/// Filter: #state Eq Utf8(\"CO\")\
/// CsvScan: employee.csv projection=Some([0, 3])";
/// ```
///
/// The sequence of visit operations would be:
/// ```text
Expand Down
19 changes: 18 additions & 1 deletion datafusion/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use std::task::{Context, Poll};

use crate::error::{DataFusionError, Result};
use crate::physical_plan::{
ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream,
};

use arrow::compute::kernels::concat::concat;
Expand Down Expand Up @@ -114,6 +115,22 @@ impl ExecutionPlan for CoalesceBatchesExec {
is_closed: false,
}))
}

fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(
f,
"CoalesceBatchesExec: target_batch_size={}",
self.target_batch_size
)
}
}
}
}

struct CoalesceBatchesStream {
Expand Down
19 changes: 16 additions & 3 deletions datafusion/src/physical_plan/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
use futures::{lock::Mutex, StreamExt};
use std::{any::Any, sync::Arc, task::Poll};

use crate::physical_plan::memory::MemoryStream;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
Expand All @@ -36,8 +35,10 @@ use crate::{
use async_trait::async_trait;
use std::time::Instant;

use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream};
use crate::physical_plan::coalesce_batches::concat_batches;
use super::{
coalesce_batches::concat_batches, memory::MemoryStream, DisplayFormatType,
ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
};
use log::debug;

/// Data of the left side
Expand Down Expand Up @@ -192,6 +193,18 @@ impl ExecutionPlan for CrossJoinExec {
join_time: 0,
}))
}

fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(f, "CrossJoinExec")
}
}
}
}

/// A stream that issues [RecordBatch]es as they arrive from the right of the join.
Expand Down
32 changes: 30 additions & 2 deletions datafusion/src/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
//! Execution plan for reading CSV files

use crate::error::{DataFusionError, Result};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::{common, Partitioning};
use crate::physical_plan::{common, DisplayFormatType, ExecutionPlan, Partitioning};
use arrow::csv;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
Expand Down Expand Up @@ -135,6 +134,19 @@ impl std::fmt::Debug for Source {
}
}

impl std::fmt::Display for Source {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Source::PartitionedFiles { path, filenames } => {
write!(f, "Path({}: [{}])", path, filenames.join(","))
}
Source::Reader(_) => {
write!(f, "Reader(...)")
}
}
}
}

impl Clone for Source {
fn clone(&self) -> Self {
match self {
Expand Down Expand Up @@ -405,6 +417,22 @@ impl ExecutionPlan for CsvExec {
}
}
}

fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(
f,
"CsvExec: source={}, has_header={}",
self.source, self.has_header
)
}
}
}
}

/// Iterator over batches
Expand Down
90 changes: 90 additions & 0 deletions datafusion/src/physical_plan/display.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Implementation of physical plan display. See
//! [`crate::physical_plan::displayable`] for examples of how to
//! format

use std::fmt;

use super::{accept, ExecutionPlan, ExecutionPlanVisitor};

/// Options for controlling how each [`ExecutionPlan`] should format itself
#[derive(Debug, Clone, Copy)]
pub enum DisplayFormatType {
/// Default, compact format. Example: `FilterExec: c12 < 10.0`
Default,
}

/// Wraps an `ExecutionPlan` with various ways to display this plan
pub struct DisplayableExecutionPlan<'a> {
inner: &'a dyn ExecutionPlan,
}

impl<'a> DisplayableExecutionPlan<'a> {
/// Create a wrapper around an [`'ExecutionPlan'] which can be
/// pretty printed in a variety of ways
pub fn new(inner: &'a dyn ExecutionPlan) -> Self {
Self { inner }
}

/// Return a `format`able structure that produces a single line
/// per node.
///
/// ```text
/// ProjectionExec: expr=[a]
/// CoalesceBatchesExec: target_batch_size=4096
/// FilterExec: a < 5
/// RepartitionExec: partitioning=RoundRobinBatch(16)
/// CsvExec: source=...",
/// ```
pub fn indent(&self) -> impl fmt::Display + 'a {
struct Wrapper<'a>(&'a dyn ExecutionPlan);
impl<'a> fmt::Display for Wrapper<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let t = DisplayFormatType::Default;
let mut visitor = IndentVisitor { t, f, indent: 0 };
accept(self.0, &mut visitor)
}
}
Wrapper(self.inner)
}
}

/// Formats plans with a single line per node.
struct IndentVisitor<'a, 'b> {
/// How to format each node
t: DisplayFormatType,
/// Write to this formatter
f: &'a mut fmt::Formatter<'b>,
///with_schema: bool,
indent: usize,
}

impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
type Error = fmt::Error;
fn pre_visit(
&mut self,
plan: &dyn ExecutionPlan,
) -> std::result::Result<bool, Self::Error> {
write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
plan.fmt_as(self.t, self.f)?;
writeln!(self.f)?;
self.indent += 1;
Ok(true)
}
}
4 changes: 4 additions & 0 deletions datafusion/src/physical_plan/distinct_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ impl AggregateExpr for DistinctCount {
count_data_type: self.data_type.clone(),
}))
}

fn name(&self) -> &str {
&self.name
}
}

#[derive(Debug)]
Expand Down
17 changes: 15 additions & 2 deletions datafusion/src/physical_plan/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ use std::any::Any;
use std::sync::Arc;

use crate::error::{DataFusionError, Result};
use crate::physical_plan::memory::MemoryStream;
use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning};
use crate::physical_plan::{
memory::MemoryStream, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
};
use arrow::array::NullArray;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -120,6 +121,18 @@ impl ExecutionPlan for EmptyExec {
None,
)?))
}

fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(f, "EmptyExec: produce_one_row={}", self.produce_one_row)
}
}
}
}

#[cfg(test)]
Expand Down
19 changes: 15 additions & 4 deletions datafusion/src/physical_plan/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@
use std::any::Any;
use std::sync::Arc;

use crate::error::{DataFusionError, Result};
use crate::{
error::{DataFusionError, Result},
logical_plan::StringifiedPlan,
physical_plan::{common::SizedRecordBatchStream, ExecutionPlan},
physical_plan::Partitioning,
physical_plan::{common::SizedRecordBatchStream, DisplayFormatType, ExecutionPlan},
};
use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};

use crate::physical_plan::Partitioning;

use super::SendableRecordBatchStream;
use async_trait::async_trait;

Expand Down Expand Up @@ -122,4 +121,16 @@ impl ExecutionPlan for ExplainExec {
vec![Arc::new(record_batch)],
)))
}

fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(f, "ExplainExec")
}
}
}
}
4 changes: 4 additions & 0 deletions datafusion/src/physical_plan/expressions/average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ impl AggregateExpr for Avg {
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.expr.clone()]
}

fn name(&self) -> &str {
&self.name
}
}

/// An accumulator to compute the average
Expand Down
4 changes: 4 additions & 0 deletions datafusion/src/physical_plan/expressions/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ impl AggregateExpr for Count {
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Ok(Box::new(CountAccumulator::new()))
}

fn name(&self) -> &str {
&self.name
}
}

#[derive(Debug)]
Expand Down
8 changes: 8 additions & 0 deletions datafusion/src/physical_plan/expressions/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ impl AggregateExpr for Max {
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Ok(Box::new(MaxAccumulator::try_new(&self.data_type)?))
}

fn name(&self) -> &str {
&self.name
}
}

// Statically-typed version of min/max(array) -> ScalarValue for string types.
Expand Down Expand Up @@ -387,6 +391,10 @@ impl AggregateExpr for Min {
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Ok(Box::new(MinAccumulator::try_new(&self.data_type)?))
}

fn name(&self) -> &str {
&self.name
}
}

#[derive(Debug)]
Expand Down
Loading

0 comments on commit 9cf32cf

Please sign in to comment.