Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: add worker_park_unpark_count #6696

Merged
merged 5 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions tokio/src/runtime/metrics/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ pub(crate) struct MetricsBatch {
/// Number of times the worker parked.
park_count: u64,

/// Number of times the worker parked and unparked.
park_unpark_count: u64,

/// Number of times the worker woke w/o doing work.
noop_count: u64,

Expand Down Expand Up @@ -54,6 +57,7 @@ impl MetricsBatch {

MetricsBatch {
park_count: 0,
park_unpark_count: 0,
noop_count: 0,
steal_count: 0,
steal_operations: 0,
Expand All @@ -76,6 +80,9 @@ impl MetricsBatch {
pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) {
worker.mean_poll_time.store(mean_poll_time, Relaxed);
worker.park_count.store(self.park_count, Relaxed);
worker
.park_unpark_count
.store(self.park_unpark_count, Relaxed);
worker.noop_count.store(self.noop_count, Relaxed);
worker.steal_count.store(self.steal_count, Relaxed);
worker
Expand All @@ -101,6 +108,7 @@ impl MetricsBatch {
/// The worker is about to park.
pub(crate) fn about_to_park(&mut self) {
self.park_count += 1;
self.park_unpark_count += 1;

if self.poll_count_on_last_park == self.poll_count {
self.noop_count += 1;
Expand All @@ -109,6 +117,11 @@ impl MetricsBatch {
}
}

/// The worker was unparked.
pub(crate) fn unparked(&mut self) {
self.park_unpark_count += 1;
}

/// Start processing a batch of tasks
pub(crate) fn start_processing_scheduled_tasks(&mut self) {
self.processing_scheduled_tasks_started_at = Instant::now();
Expand Down
1 change: 1 addition & 0 deletions tokio/src/runtime/metrics/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ impl MetricsBatch {

pub(crate) fn submit(&mut self, _to: &WorkerMetrics, _mean_poll_time: u64) {}
pub(crate) fn about_to_park(&mut self) {}
pub(crate) fn unparked(&mut self) {}
pub(crate) fn inc_local_schedule_count(&mut self) {}
pub(crate) fn start_processing_scheduled_tasks(&mut self) {}
pub(crate) fn end_processing_scheduled_tasks(&mut self) {}
Expand Down
53 changes: 53 additions & 0 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,59 @@ impl RuntimeMetrics {
.load(Relaxed)
}

/// Returns the total number of times the given worker thread has parked
/// and unparked.
///
/// The worker park/unpark count starts at zero when the runtime is created
/// and increases by one each time the worker parks the thread waiting for
/// new inbound events to process. This usually means the worker has processed
/// all pending work and is currently idle. When new work becomes available,
/// the worker is unparked and the park/unpark count is again increased by one.
///
/// The counter is monotonically increasing. It is never decremented or
/// reset to zero. An odd count means that the worker is currently parked;
/// an even count means that the worker is active.
surban marked this conversation as resolved.
Show resolved Hide resolved
///
/// # Arguments
///
/// `worker` is the index of the worker being queried. The given value must
/// be between 0 and `num_workers()`. The index uniquely identifies a single
/// worker and will continue to identify the worker throughout the lifetime
/// of the runtime instance.
///
/// # Panics
///
/// The method panics when `worker` represents an invalid worker, i.e. is
/// greater than or equal to `num_workers()`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
/// let n = metrics.worker_park_unpark_count(0);
///
/// println!("worker 0 parked and unparked {} times", n);
///
/// if n % 2 == 0 {
/// println!("worker 0 is active");
/// } else {
/// println!("worker 0 is parked");
/// }
/// }
/// ```
pub fn worker_park_unpark_count(&self, worker: usize) -> u64 {
self.handle
.inner
.worker_metrics(worker)
.park_unpark_count
.load(Relaxed)
}


/// Returns the number of times the given worker thread unparked but
/// performed no work before parking again.
///
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/runtime/metrics/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub(crate) struct WorkerMetrics {
/// Number of times the worker parked.
pub(crate) park_count: MetricAtomicU64,

/// Number of times the worker parked and unparked.
pub(crate) park_unpark_count: MetricAtomicU64,

/// Number of times the worker woke then parked again without doing work.
pub(crate) noop_count: MetricAtomicU64,

Expand Down
3 changes: 3 additions & 0 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,9 @@ impl Context {
});

core = c;

core.metrics.unparked();
core.submit_metrics(handle);
}

if let Some(f) = &handle.shared.config.after_unpark {
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ impl Stats {
self.batch.about_to_park();
}

pub(crate) fn unparked(&mut self) {
self.batch.unparked();
}

pub(crate) fn inc_local_schedule_count(&mut self) {
self.batch.inc_local_schedule_count();
}
Expand Down
5 changes: 5 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,8 +699,13 @@ impl Context {
if core.transition_to_parked(&self.worker) {
while !core.is_shutdown && !core.is_traced {
core.stats.about_to_park();
core.stats
.submit(&self.worker.handle.shared.worker_metrics[self.worker.index]);

core = self.park_timeout(core, None);

core.stats.unparked();

// Run regularly scheduled maintenance
core.maintenance(&self.worker);

Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread_alt/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ impl Stats {
self.batch.about_to_park();
}

pub(crate) fn unparked(&mut self) {
self.batch.unparked();
}

pub(crate) fn inc_local_schedule_count(&mut self) {
self.batch.inc_local_schedule_count();
}
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread_alt/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,9 @@ impl Worker {
let n = cmp::max(core.run_queue.remaining_slots() / 2, 1);
let maybe_task = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n);

core.stats.unparked();
self.flush_metrics(cx, &mut core);

Ok((maybe_task, core))
}

Expand Down
20 changes: 20 additions & 0 deletions tokio/tests/rt_unstable_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,26 @@ fn worker_park_count() {
assert!(1 <= metrics.worker_park_count(1));
}

#[test]
fn worker_park_unpark_count() {
let rt = current_thread();
let metrics = rt.metrics();
rt.block_on(async {
time::sleep(Duration::from_millis(1)).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it work if you use yield_now instead of sleeping here? We generally try to avoid adding sleeps in tests, as they make the test suite take much longer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the current thread runtime it only works with sleep. For the multi-threaded runtime I changed it to yield_now.

Copy link
Contributor

@Darksonn Darksonn Jul 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What!? That is very surprising to me. If yield_now works on the multi-thread runtime, then it probaby also works if you get rid of the entire block_on call. In the multi-thread runtime case, the yield_now doesn't interact with the worker threads at all because it's in a block_on and not spawned.

I'm guessing that just spawning the threaded() runtime is enough for both threads to get to a park/unpark count of two. To actually test that our task changes the count, I think we can do something along these lines:

First wait for the count to reach 2 on both workers using a loop like this one. Then, update the code to spawn tasks instead so that the yield_now actually runs on the worker threads:

rt.block_on(rt.spawn(async {}));
drop(rt);
assert!(4 <= metrics.worker_park_unpark_count(0) || 4 <= metrics.worker_park_unpark_count(1));

Here, the count already reached two before spawning, so spawning should result in a worker waking up to process the task, and the worker then goes back to sleep. Hence, one worker should now be at 4.


You probably need to use rt.spawn with the current-thread runtime too, but I don't think you need the loop in that case as there is no threading involved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the multithreaded case after launch the park/unpark count is 1 for both workers. This makes sense since they get parked after startup, because there is no work to do.

After spawning the task, the park/unpark count can be 1/3 or 3/3. I don't know why the scheduler sometimes unparks both threads.

After shutdown the count is 4/4 or 2/4. This makes sense because the threads are both unparked for shutdown.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. It makes more sense now.

});
drop(rt);
assert!(2 <= metrics.worker_park_unpark_count(0));

let rt = threaded();
let metrics = rt.metrics();
rt.block_on(async {
time::sleep(Duration::from_millis(1)).await;
});
drop(rt);
assert!(2 <= metrics.worker_park_unpark_count(0));
assert!(2 <= metrics.worker_park_unpark_count(1));
surban marked this conversation as resolved.
Show resolved Hide resolved
}

#[test]
fn worker_noop_count() {
// There isn't really a great way to generate no-op parks as they happen as
Expand Down
Loading