diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index e60250adf2f3..06fa6d8915bf 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -61,6 +61,7 @@ struct AggregateStreamMetrics { processing_time: Time, idle_time: Time, last_batch_at: Instant, + waiting_time: Time, } impl AggregateStreamMetrics { @@ -70,6 +71,8 @@ impl AggregateStreamMetrics { .subset_time("processing_time", partition), idle_time: MetricBuilder::new(metrics).subset_time("idle_time", partition), last_batch_at: Instant::now(), + waiting_time: MetricBuilder::new(metrics) + .subset_time("waiting_time", partition), } } @@ -453,7 +456,12 @@ impl Stream for GroupedHashAggregateStream { loop { match &self.exec_state { ExecutionState::ReadingInput => 'reading_input: { - match ready!(self.input.poll_next_unpin(cx)) { + let wait_started = Instant::now(); + let result = ready!(self.input.poll_next_unpin(cx)); + self.aggregate_stream_metrics + .waiting_time + .add_duration(wait_started.elapsed()); + match result { // new batch to aggregate Some(Ok(batch)) => { let timer = elapsed_compute.timer();