Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics for UnnestExec #8482

Merged
merged 1 commit into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use datafusion::prelude::JoinType;
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
use datafusion::test_util::parquet_test_data;
use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
use datafusion_common::{DataFusionError, ScalarValue, UnnestOptions};
use datafusion_common::{assert_contains, DataFusionError, ScalarValue, UnnestOptions};
use datafusion_execution::config::SessionConfig;
use datafusion_expr::expr::{GroupingSet, Sort};
use datafusion_expr::{
Expand Down Expand Up @@ -1408,6 +1408,28 @@ async fn unnest_with_redundant_columns() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn unnest_analyze_metrics() -> Result<()> {
const NUM_ROWS: usize = 5;

let df = table_with_nested_types(NUM_ROWS).await?;
let results = df
.unnest_column("tags")?
.explain(false, true)?
.collect()
.await?;
let formatted = arrow::util::pretty::pretty_format_batches(&results)
.unwrap()
.to_string();
assert_contains!(&formatted, "elapsed_compute=");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

assert_contains!(&formatted, "input_batches=1");
assert_contains!(&formatted, "input_rows=5");
assert_contains!(&formatted, "output_rows=10");
assert_contains!(&formatted, "output_batches=1");

Ok(())
}

async fn create_test_table(name: &str) -> Result<DataFrame> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Expand Down
92 changes: 63 additions & 29 deletions datafusion/physical-plan/src/unnest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

//! Defines the unnest column plan for unnesting values in a column that contains a list
//! type, conceptually is like joining each row with all the values in the list column.

use std::time::Instant;
use std::{any::Any, sync::Arc};

use super::DisplayAs;
Expand All @@ -44,6 +42,8 @@ use async_trait::async_trait;
use futures::{Stream, StreamExt};
use log::trace;

use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};

/// Unnest the given column by joining the row with each value in the
/// nested type.
///
Expand All @@ -58,6 +58,8 @@ pub struct UnnestExec {
column: Column,
/// Options
options: UnnestOptions,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}

impl UnnestExec {
Expand All @@ -73,6 +75,7 @@ impl UnnestExec {
schema,
column,
options,
metrics: Default::default(),
}
}
}
Expand Down Expand Up @@ -141,19 +144,58 @@ impl ExecutionPlan for UnnestExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let input = self.input.execute(partition, context)?;
let metrics = UnnestMetrics::new(partition, &self.metrics);

Ok(Box::pin(UnnestStream {
input,
schema: self.schema.clone(),
column: self.column.clone(),
options: self.options.clone(),
num_input_batches: 0,
num_input_rows: 0,
num_output_batches: 0,
num_output_rows: 0,
unnest_time: 0,
metrics,
}))
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}

#[derive(Clone, Debug)]
struct UnnestMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has some overlap with BaselineMetrics, it might be able to be combined if you wanted to reduce the code size somewhat

https://docs.rs/datafusion/latest/datafusion/physical_plan/metrics/struct.BaselineMetrics.html

/// total time for column unnesting
elapsed_compute: metrics::Time,
/// Number of batches consumed
input_batches: metrics::Count,
/// Number of rows consumed
input_rows: metrics::Count,
/// Number of batches produced
output_batches: metrics::Count,
/// Number of rows produced by this operator
output_rows: metrics::Count,
}

impl UnnestMetrics {
fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
let elapsed_compute = MetricBuilder::new(metrics).elapsed_compute(partition);

let input_batches =
MetricBuilder::new(metrics).counter("input_batches", partition);

let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);

let output_batches =
MetricBuilder::new(metrics).counter("output_batches", partition);

let output_rows = MetricBuilder::new(metrics).output_rows(partition);

Self {
input_batches,
input_rows,
output_batches,
output_rows,
elapsed_compute,
}
}
}

/// A stream that issues [RecordBatch]es with unnested column data.
Expand All @@ -166,16 +208,8 @@ struct UnnestStream {
column: Column,
/// Options
options: UnnestOptions,
/// number of input batches
num_input_batches: usize,
/// number of input rows
num_input_rows: usize,
/// number of batches produced
num_output_batches: usize,
/// number of rows produced
num_output_rows: usize,
/// total time for column unnesting, in ms
unnest_time: usize,
/// Metrics
metrics: UnnestMetrics,
}

impl RecordBatchStream for UnnestStream {
Expand Down Expand Up @@ -207,28 +241,28 @@ impl UnnestStream {
.poll_next_unpin(cx)
.map(|maybe_batch| match maybe_batch {
Some(Ok(batch)) => {
let start = Instant::now();
let timer = self.metrics.elapsed_compute.timer();
let result =
build_batch(&batch, &self.schema, &self.column, &self.options);
self.num_input_batches += 1;
self.num_input_rows += batch.num_rows();
self.metrics.input_batches.add(1);
self.metrics.input_rows.add(batch.num_rows());
if let Ok(ref batch) = result {
self.unnest_time += start.elapsed().as_millis() as usize;
self.num_output_batches += 1;
self.num_output_rows += batch.num_rows();
timer.done();
self.metrics.output_batches.add(1);
self.metrics.output_rows.add(batch.num_rows());
}

Some(result)
}
other => {
trace!(
"Processed {} probe-side input batches containing {} rows and \
produced {} output batches containing {} rows in {} ms",
self.num_input_batches,
self.num_input_rows,
self.num_output_batches,
self.num_output_rows,
self.unnest_time,
produced {} output batches containing {} rows in {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

self.metrics.input_batches,
self.metrics.input_rows,
self.metrics.output_batches,
self.metrics.output_rows,
self.metrics.elapsed_compute,
);
other
}
Expand Down