Skip to content

Commit

Permalink
Show physical plan with metrics in benchmark (#662)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Jul 4, 2021
1 parent 9314dbb commit a5b3a81
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 10 deletions.
15 changes: 11 additions & 4 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use datafusion::prelude::*;

use datafusion::parquet::basic::Compression;
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use structopt::StructOpt;

#[cfg(feature = "snmalloc")]
Expand Down Expand Up @@ -343,21 +344,27 @@ async fn execute_query(
debug: bool,
) -> Result<Vec<RecordBatch>> {
if debug {
println!("Logical plan:\n{:?}", plan);
println!("=== Logical plan ===\n{:?}\n", plan);
}
let plan = ctx.optimize(plan)?;
if debug {
println!("Optimized logical plan:\n{:?}", plan);
println!("=== Optimized logical plan ===\n{:?}\n", plan);
}
let physical_plan = ctx.create_physical_plan(&plan)?;
if debug {
println!(
"Physical plan:\n{}",
"=== Physical plan ===\n{}\n",
displayable(physical_plan.as_ref()).indent().to_string()
);
}
let result = collect(physical_plan).await?;
let result = collect(physical_plan.clone()).await?;
if debug {
println!(
"=== Physical plan with metrics ===\n{}\n",
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref())
.indent()
.to_string()
);
pretty::print_batches(&result)?;
}
Ok(result)
Expand Down
50 changes: 44 additions & 6 deletions datafusion/src/physical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,27 @@ pub enum DisplayFormatType {
/// Wraps an `ExecutionPlan` with various ways to display this plan
pub struct DisplayableExecutionPlan<'a> {
inner: &'a dyn ExecutionPlan,
/// whether to show metrics or not
with_metrics: bool,
}

impl<'a> DisplayableExecutionPlan<'a> {
/// Create a wrapper around an [`'ExecutionPlan'] which can be
/// pretty printed in a variety of ways
pub fn new(inner: &'a dyn ExecutionPlan) -> Self {
Self { inner }
Self {
inner,
with_metrics: false,
}
}

/// Create a wrapper around an [`'ExecutionPlan'] which can be
/// pretty printed in a variety of ways
pub fn with_metrics(inner: &'a dyn ExecutionPlan) -> Self {
Self {
inner,
with_metrics: true,
}
}

/// Return a `format`able structure that produces a single line
Expand All @@ -53,15 +67,26 @@ impl<'a> DisplayableExecutionPlan<'a> {
/// CsvExec: source=...",
/// ```
pub fn indent(&self) -> impl fmt::Display + 'a {
struct Wrapper<'a>(&'a dyn ExecutionPlan);
struct Wrapper<'a> {
plan: &'a dyn ExecutionPlan,
with_metrics: bool,
}
impl<'a> fmt::Display for Wrapper<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let t = DisplayFormatType::Default;
let mut visitor = IndentVisitor { t, f, indent: 0 };
accept(self.0, &mut visitor)
let mut visitor = IndentVisitor {
t,
f,
indent: 0,
with_metrics: self.with_metrics,
};
accept(self.plan, &mut visitor)
}
}
Wrapper(self.inner)
Wrapper {
plan: self.inner,
with_metrics: self.with_metrics,
}
}
}

Expand All @@ -71,8 +96,10 @@ struct IndentVisitor<'a, 'b> {
t: DisplayFormatType,
/// Write to this formatter
f: &'a mut fmt::Formatter<'b>,
///with_schema: bool,
/// Indent size
indent: usize,
/// whether to show metrics or not
with_metrics: bool,
}

impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
Expand All @@ -83,6 +110,17 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
) -> std::result::Result<bool, Self::Error> {
write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
plan.fmt_as(self.t, self.f)?;
if self.with_metrics {
write!(
self.f,
", metrics=[{}]",
plan.metrics()
.iter()
.map(|(k, v)| format!("{}={:?}", k, v.value))
.collect::<Vec<_>>()
.join(", ")
)?;
}
writeln!(self.f)?;
self.indent += 1;
Ok(true)
Expand Down

0 comments on commit a5b3a81

Please sign in to comment.