Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: add worker_park_unpark_count #6696

Merged
merged 5 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions tokio/src/runtime/metrics/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ pub(crate) struct MetricsBatch {
/// Number of times the worker parked.
park_count: u64,

/// Number of times the worker parked and unparked.
park_unpark_count: u64,

/// Number of times the worker woke w/o doing work.
noop_count: u64,

Expand Down Expand Up @@ -54,6 +57,7 @@ impl MetricsBatch {

MetricsBatch {
park_count: 0,
park_unpark_count: 0,
noop_count: 0,
steal_count: 0,
steal_operations: 0,
Expand All @@ -76,6 +80,9 @@ impl MetricsBatch {
pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) {
worker.mean_poll_time.store(mean_poll_time, Relaxed);
worker.park_count.store(self.park_count, Relaxed);
worker
.park_unpark_count
.store(self.park_unpark_count, Relaxed);
worker.noop_count.store(self.noop_count, Relaxed);
worker.steal_count.store(self.steal_count, Relaxed);
worker
Expand All @@ -101,6 +108,7 @@ impl MetricsBatch {
/// The worker is about to park.
pub(crate) fn about_to_park(&mut self) {
self.park_count += 1;
self.park_unpark_count += 1;

if self.poll_count_on_last_park == self.poll_count {
self.noop_count += 1;
Expand All @@ -109,6 +117,11 @@ impl MetricsBatch {
}
}

/// The worker was unparked.
pub(crate) fn unparked(&mut self) {
self.park_unpark_count += 1;
}

/// Start processing a batch of tasks
pub(crate) fn start_processing_scheduled_tasks(&mut self) {
self.processing_scheduled_tasks_started_at = Instant::now();
Expand Down
1 change: 1 addition & 0 deletions tokio/src/runtime/metrics/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ impl MetricsBatch {

pub(crate) fn submit(&mut self, _to: &WorkerMetrics, _mean_poll_time: u64) {}
pub(crate) fn about_to_park(&mut self) {}
pub(crate) fn unparked(&mut self) {}
pub(crate) fn inc_local_schedule_count(&mut self) {}
pub(crate) fn start_processing_scheduled_tasks(&mut self) {}
pub(crate) fn end_processing_scheduled_tasks(&mut self) {}
Expand Down
55 changes: 55 additions & 0 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,61 @@ impl RuntimeMetrics {
.load(Relaxed)
}

/// Returns the total number of times the given worker thread has parked
/// and unparked.
///
/// The worker park/unpark count starts at zero when the runtime is created
/// and increases by one each time the worker parks the thread waiting for
/// new inbound events to process. This usually means the worker has processed
/// all pending work and is currently idle. When new work becomes available,
/// the worker is unparked and the park/unpark count is again increased by one.
///
/// An odd count means that the worker is currently parked.
/// An even count means that the worker is currently active.
///
/// The counter is monotonically increasing. It is never decremented or
/// reset to zero.
///
/// # Arguments
///
/// `worker` is the index of the worker being queried. The given value must
/// be between 0 and `num_workers()`. The index uniquely identifies a single
/// worker and will continue to identify the worker throughout the lifetime
/// of the runtime instance.
///
/// # Panics
///
/// The method panics when `worker` represents an invalid worker, i.e. is
/// greater than or equal to `num_workers()`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
/// let n = metrics.worker_park_unpark_count(0);
///
/// println!("worker 0 parked and unparked {} times", n);
///
/// if n % 2 == 0 {
/// println!("worker 0 is active");
/// } else {
/// println!("worker 0 is parked");
/// }
/// }
/// ```
pub fn worker_park_unpark_count(&self, worker: usize) -> u64 {
self.handle
.inner
.worker_metrics(worker)
.park_unpark_count
.load(Relaxed)
}


/// Returns the number of times the given worker thread unparked but
/// performed no work before parking again.
///
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/runtime/metrics/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub(crate) struct WorkerMetrics {
/// Number of times the worker parked.
pub(crate) park_count: MetricAtomicU64,

/// Number of times the worker parked and unparked.
pub(crate) park_unpark_count: MetricAtomicU64,

/// Number of times the worker woke then parked again without doing work.
pub(crate) noop_count: MetricAtomicU64,

Expand Down
3 changes: 3 additions & 0 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,9 @@ impl Context {
});

core = c;

core.metrics.unparked();
core.submit_metrics(handle);
}

if let Some(f) = &handle.shared.config.after_unpark {
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ impl Stats {
self.batch.about_to_park();
}

pub(crate) fn unparked(&mut self) {
self.batch.unparked();
}

pub(crate) fn inc_local_schedule_count(&mut self) {
self.batch.inc_local_schedule_count();
}
Expand Down
5 changes: 5 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,8 +699,13 @@ impl Context {
if core.transition_to_parked(&self.worker) {
while !core.is_shutdown && !core.is_traced {
core.stats.about_to_park();
core.stats
.submit(&self.worker.handle.shared.worker_metrics[self.worker.index]);

core = self.park_timeout(core, None);

core.stats.unparked();

// Run regularly scheduled maintenance
core.maintenance(&self.worker);

Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread_alt/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ impl Stats {
self.batch.about_to_park();
}

pub(crate) fn unparked(&mut self) {
self.batch.unparked();
}

pub(crate) fn inc_local_schedule_count(&mut self) {
self.batch.inc_local_schedule_count();
}
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread_alt/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,9 @@ impl Worker {
let n = cmp::max(core.run_queue.remaining_slots() / 2, 1);
let maybe_task = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n);

core.stats.unparked();
self.flush_metrics(cx, &mut core);

Ok((maybe_task, core))
}

Expand Down
38 changes: 38 additions & 0 deletions tokio/tests/rt_unstable_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,44 @@ fn worker_park_count() {
assert!(1 <= metrics.worker_park_count(1));
}

#[test]
fn worker_park_unpark_count() {
let rt = current_thread();
let metrics = rt.metrics();
rt.block_on(rt.spawn(async {})).unwrap();
drop(rt);
assert!(2 <= metrics.worker_park_unpark_count(0));

let rt = threaded();
let metrics = rt.metrics();

// Wait for workers to be parked after runtime startup.
for _ in 0..100 {
if 1 <= metrics.worker_park_unpark_count(0) && 1 <= metrics.worker_park_unpark_count(1) {
break;
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
assert_eq!(1, metrics.worker_park_unpark_count(0));
assert_eq!(1, metrics.worker_park_unpark_count(1));

// Spawn a task to unpark and then park a worker.
rt.block_on(rt.spawn(async {})).unwrap();
for _ in 0..100 {
if 3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1) {
break;
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
assert!(3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1));

// Both threads unpark for runtime shutdown.
drop(rt);
assert_eq!(0, metrics.worker_park_unpark_count(0) % 2);
assert_eq!(0, metrics.worker_park_unpark_count(1) % 2);
assert!(4 <= metrics.worker_park_unpark_count(0) || 4 <= metrics.worker_park_unpark_count(1));
}

#[test]
fn worker_noop_count() {
// There isn't really a great way to generate no-op parks as they happen as
Expand Down
Loading