Skip to content

Commit

Permalink
[BugFix] fix explain csv/json/avro exec can not see metrics bug (#5018)
Browse files Browse the repository at this point in the history
When we executing explain analyze select * from tablexxx, csv/json/avro exev
metrics is empty.
This bug is introduced by not implementing metrics traits in csv/json/avro.

Signed-off-by: xyz <a997647204@gmail.com>

Signed-off-by: xyz <a997647204@gmail.com>
  • Loading branch information
xiaoyong-z authored Jan 24, 2023
1 parent 0820eb9 commit 13dfdd6
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 18 deletions.
8 changes: 6 additions & 2 deletions datafusion/core/src/physical_plan/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::physical_plan::{
use arrow::datatypes::SchemaRef;

use crate::execution::context::TaskContext;
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use std::any::Any;
use std::sync::Arc;

Expand Down Expand Up @@ -122,7 +122,7 @@ impl ExecutionPlan for AvroExec {
let opener = private::AvroOpener { config };

let stream =
FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?;
FileStream::new(&self.base_config, partition, opener, &self.metrics)?;
Ok(Box::pin(stream))
}

Expand All @@ -146,6 +146,10 @@ impl ExecutionPlan for AvroExec {
fn statistics(&self) -> Statistics {
self.projected_statistics.clone()
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}

#[cfg(feature = "avro")]
Expand Down
26 changes: 24 additions & 2 deletions datafusion/core/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::physical_plan::file_format::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::file_format::FileMeta;
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
Expand Down Expand Up @@ -153,7 +153,7 @@ impl ExecutionPlan for CsvExec {
file_compression_type: self.file_compression_type.to_owned(),
};
let stream =
FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?;
FileStream::new(&self.base_config, partition, opener, &self.metrics)?;
Ok(Box::pin(stream) as SendableRecordBatchStream)
}

Expand All @@ -179,6 +179,10 @@ impl ExecutionPlan for CsvExec {
fn statistics(&self) -> Statistics {
self.projected_statistics.clone()
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -515,6 +519,13 @@ mod tests {
"+----+------------+",
];
crate::assert_batches_eq!(expected, &[batch.slice(0, 5)]);

let metrics = csv.metrics().expect("doesn't found metrics");
let time_elapsed_processing = get_value(&metrics, "time_elapsed_processing");
assert!(
time_elapsed_processing > 0,
"Expected time_elapsed_processing greater than 0",
);
Ok(())
}

Expand Down Expand Up @@ -676,4 +687,15 @@ mod tests {

Ok(())
}

fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
match metrics.sum_by_name(metric_name) {
Some(v) => v.as_usize(),
_ => {
panic!(
"Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}"
);
}
}
}
}
11 changes: 5 additions & 6 deletions datafusion/core/src/physical_plan/file_format/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl<F: FileOpener> FileStream<F> {
config: &FileScanConfig,
partition: usize,
file_reader: F,
metrics: ExecutionPlanMetricsSet,
metrics: &ExecutionPlanMetricsSet,
) -> Result<Self> {
let (projected_schema, _) = config.project();
let pc_projector = PartitionColumnProjector::new(
Expand All @@ -187,8 +187,8 @@ impl<F: FileOpener> FileStream<F> {
file_reader,
pc_projector,
state: FileStreamState::Idle,
file_stream_metrics: FileStreamMetrics::new(&metrics, partition),
baseline_metrics: BaselineMetrics::new(&metrics, partition),
file_stream_metrics: FileStreamMetrics::new(metrics, partition),
baseline_metrics: BaselineMetrics::new(metrics, partition),
})
}

Expand Down Expand Up @@ -353,9 +353,8 @@ mod tests {
output_ordering: None,
infinite_source: false,
};

let file_stream =
FileStream::new(&config, 0, reader, ExecutionPlanMetricsSet::new()).unwrap();
let metrics_set = ExecutionPlanMetricsSet::new();
let file_stream = FileStream::new(&config, 0, reader, &metrics_set).unwrap();

file_stream
.map(|b| b.expect("No error expected in stream"))
Expand Down
8 changes: 6 additions & 2 deletions datafusion/core/src/physical_plan/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::physical_plan::file_format::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::file_format::FileMeta;
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
Expand Down Expand Up @@ -134,7 +134,7 @@ impl ExecutionPlan for NdJsonExec {
};

let stream =
FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?;
FileStream::new(&self.base_config, partition, opener, &self.metrics)?;

Ok(Box::pin(stream) as SendableRecordBatchStream)
}
Expand All @@ -159,6 +159,10 @@ impl ExecutionPlan for NdJsonExec {
fn statistics(&self) -> Statistics {
self.projected_statistics.clone()
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}

struct JsonOpener {
Expand Down
8 changes: 2 additions & 6 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,12 +306,8 @@ impl ExecutionPlan for ParquetExec {
enable_page_index: self.enable_page_index(config_options),
};

let stream = FileStream::new(
&self.base_config,
partition_index,
opener,
self.metrics.clone(),
)?;
let stream =
FileStream::new(&self.base_config, partition_index, opener, &self.metrics)?;

Ok(Box::pin(stream))
}
Expand Down

0 comments on commit 13dfdd6

Please sign in to comment.