diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs index bd8f6fdbbea0..107cc15bfa05 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs @@ -24,12 +24,13 @@ use crate::serde::scheduler::PartitionLocation; use arrow::datatypes::SchemaRef; use async_trait::async_trait; -use datafusion::physical_plan::{ExecutionPlan, Partitioning}; +use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning}; use datafusion::{ error::{DataFusionError, Result}, physical_plan::RecordBatchStream, }; use log::info; +use std::fmt::Formatter; /// ShuffleReaderExec reads partitions that have already been materialized by an executor. #[derive(Debug, Clone)] @@ -103,4 +104,31 @@ impl ExecutionPlan for ShuffleReaderExec { .await .map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e))) } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + let loc_str = self + .partition_location + .iter() + .map(|l| { + format!( + "[executor={} part={}:{}:{} stats={:?}]", + l.executor_meta.id, + l.partition_id.job_id, + l.partition_id.stage_id, + l.partition_id.partition_id, + l.partition_stats + ) + }) + .collect::>() + .join(","); + write!(f, "ShuffleReaderExec: partition_locations={}", loc_str) + } + } + } }