Skip to content

Commit

Permalink
add wait time
Browse files Browse the repository at this point in the history
  • Loading branch information
fsdvh committed Nov 4, 2024
1 parent beb9799 commit 6d023f8
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ struct AggregateStreamMetrics {
processing_time: Time,
idle_time: Time,
last_batch_at: Instant,
waiting_time: Time,
}

impl AggregateStreamMetrics {
Expand All @@ -70,6 +71,8 @@ impl AggregateStreamMetrics {
.subset_time("processing_time", partition),
idle_time: MetricBuilder::new(metrics).subset_time("idle_time", partition),
last_batch_at: Instant::now(),
waiting_time: MetricBuilder::new(metrics)
.subset_time("waiting_time", partition),
}
}

Expand Down Expand Up @@ -453,7 +456,12 @@ impl Stream for GroupedHashAggregateStream {
loop {
match &self.exec_state {
ExecutionState::ReadingInput => 'reading_input: {
match ready!(self.input.poll_next_unpin(cx)) {
let wait_started = Instant::now();
let result = ready!(self.input.poll_next_unpin(cx));
self.aggregate_stream_metrics
.waiting_time
.add_duration(wait_started.elapsed());
match result {
// new batch to aggregate
Some(Ok(batch)) => {
let timer = elapsed_compute.timer();
Expand Down

0 comments on commit 6d023f8

Please sign in to comment.