From 394dc4e4968b903b58b0e3cda82201215034b8bb Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Mon, 13 May 2024 15:33:06 -0400 Subject: [PATCH] metrics: stabilize `RuntimeMetrics::worker_count` This PR stabilizes a single metric API to start the process of stabilizing metrics. Future work will continue to stabilize more metrics. Refs: #6546 --- tokio/src/lib.rs | 1 + tokio/src/macros/cfg.rs | 24 +- tokio/src/runtime/blocking/pool.rs | 4 +- tokio/src/runtime/builder.rs | 2 +- tokio/src/runtime/coop.rs | 4 +- tokio/src/runtime/io/metrics.rs | 2 +- tokio/src/runtime/metrics/mod.rs | 41 +- tokio/src/runtime/metrics/runtime.rs | 1715 +++++++++-------- tokio/src/runtime/mod.rs | 20 +- tokio/src/runtime/runtime.rs | 18 +- .../runtime/scheduler/current_thread/mod.rs | 2 +- tokio/src/runtime/scheduler/inject.rs | 2 +- tokio/src/runtime/scheduler/mod.rs | 9 +- .../runtime/scheduler/multi_thread/handle.rs | 4 +- .../scheduler/multi_thread/handle/metrics.rs | 52 +- .../runtime/scheduler/multi_thread/queue.rs | 2 +- .../runtime/scheduler/multi_thread/worker.rs | 2 +- tokio/src/runtime/tests/queue.rs | 8 +- 18 files changed, 975 insertions(+), 937 deletions(-) diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index f15f8763e36..bacc8192742 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -471,6 +471,7 @@ compile_error!("The `tokio_taskdump` feature requires `--cfg tokio_unstable`."); #[cfg(all( tokio_taskdump, + not(doc), not(all( target_os = "linux", any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64") diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index c67e0e8379f..7c7f9b473e2 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -215,11 +215,19 @@ macro_rules! cfg_macros { } } +// need to disable metrics on loom macro_rules! cfg_metrics { ($($item:item)*) => { $( - // For now, metrics is only disabled in loom tests. - // When stabilized, it might have a dedicated feature flag. + #[cfg(not(loom))] + $item + )* + } +} + +macro_rules! cfg_unstable_metrics { + ($($item:item)*) => { + $( #[cfg(all(tokio_unstable, not(loom)))] #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] $item @@ -227,10 +235,20 @@ macro_rules! cfg_metrics { } } +macro_rules! cfg_not_unstable_metrics { + ($($item:item)*) => { + $( + #[cfg(any(not(tokio_unstable), loom))] + $item + )* + } + +} + macro_rules! cfg_not_metrics { ($($item:item)*) => { $( - #[cfg(not(all(tokio_unstable, not(loom))))] + #[cfg(loom)] $item )* } diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index c74aea76568..3757079f329 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -40,7 +40,7 @@ impl SpawnerMetrics { self.num_idle_threads.load(Ordering::Relaxed) } - cfg_metrics! { + cfg_unstable_metrics! { fn queue_depth(&self) -> usize { self.queue_depth.load(Ordering::Relaxed) } @@ -474,7 +474,7 @@ impl Spawner { } } -cfg_metrics! { +cfg_unstable_metrics! { impl Spawner { pub(crate) fn num_threads(&self) -> usize { self.inner.metrics.num_threads() diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 3b09c0d4b10..c7724c6f40d 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -956,7 +956,7 @@ impl Builder { } } - cfg_metrics! { + cfg_unstable_metrics! { /// Enables tracking the distribution of task poll times. /// /// Task poll times are not instrumented by default as doing so requires diff --git a/tokio/src/runtime/coop.rs b/tokio/src/runtime/coop.rs index d9f7ff2af29..f2afa75c9c4 100644 --- a/tokio/src/runtime/coop.rs +++ b/tokio/src/runtime/coop.rs @@ -197,7 +197,7 @@ cfg_coop! { } cfg_rt! { - cfg_metrics! { + cfg_unstable_metrics! { #[inline(always)] fn inc_budget_forced_yield_count() { let _ = context::with_current(|handle| { @@ -206,7 +206,7 @@ cfg_coop! { } } - cfg_not_metrics! { + cfg_not_unstable_metrics! { #[inline(always)] fn inc_budget_forced_yield_count() {} } diff --git a/tokio/src/runtime/io/metrics.rs b/tokio/src/runtime/io/metrics.rs index ec341efe680..e7a01bc2f46 100644 --- a/tokio/src/runtime/io/metrics.rs +++ b/tokio/src/runtime/io/metrics.rs @@ -17,7 +17,7 @@ cfg_not_rt_and_metrics_and_net! { cfg_net! { cfg_rt! { - cfg_metrics! { + cfg_unstable_metrics! { pub(crate) use crate::runtime::IoDriverMetrics; } } diff --git a/tokio/src/runtime/metrics/mod.rs b/tokio/src/runtime/metrics/mod.rs index 88be4a5211f..d656cb28b26 100644 --- a/tokio/src/runtime/metrics/mod.rs +++ b/tokio/src/runtime/metrics/mod.rs @@ -9,27 +9,36 @@ #![allow(clippy::module_inception)] cfg_metrics! { - mod batch; - pub(crate) use batch::MetricsBatch; - - mod histogram; - pub(crate) use histogram::{Histogram, HistogramBatch, HistogramBuilder}; - #[allow(unreachable_pub)] // rust-lang/rust#57411 - pub use histogram::HistogramScale; - mod runtime; - #[allow(unreachable_pub)] // rust-lang/rust#57411 pub use runtime::RuntimeMetrics; - mod scheduler; - pub(crate) use scheduler::SchedulerMetrics; - mod worker; - pub(crate) use worker::WorkerMetrics; - cfg_net! { - mod io; - pub(crate) use io::IoDriverMetrics; + cfg_unstable_metrics! { + mod batch; + pub(crate) use batch::MetricsBatch; + + mod histogram; + pub(crate) use histogram::{HistogramBatch, HistogramBuilder}; + + mod scheduler; + pub(crate) use scheduler::SchedulerMetrics; + + mod worker; + pub(crate) use worker::WorkerMetrics; + + cfg_net! { + mod io; + pub(crate) use io::IoDriverMetrics; + } + #[allow(unreachable_pub)] // rust-lang/rust#57411 + pub use histogram::{HistogramScale, Histogram}; + } + + cfg_not_unstable_metrics! { + mod mock; + + pub(crate) use mock::{SchedulerMetrics, WorkerMetrics, MetricsBatch, HistogramBuilder}; } } diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 66a3e51bb97..e08c1c24d21 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -1,9 +1,5 @@ use crate::runtime::Handle; -use std::ops::Range; -use std::sync::atomic::Ordering::Relaxed; -use std::time::Duration; - /// Handle to the runtime's metrics. /// /// This handle is internally reference-counted and can be freely cloned. A @@ -15,6 +11,12 @@ pub struct RuntimeMetrics { handle: Handle, } +cfg_unstable_metrics! { + use std::time::Duration; + use std::sync::atomic::Ordering::Relaxed; + use std::ops::Range; +} + impl RuntimeMetrics { pub(crate) fn new(handle: Handle) -> RuntimeMetrics { RuntimeMetrics { handle } @@ -43,882 +45,883 @@ impl RuntimeMetrics { self.handle.inner.num_workers() } - /// Returns the number of additional threads spawned by the runtime. - /// - /// The number of workers is set by configuring `max_blocking_threads` on - /// `runtime::Builder`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let _ = tokio::task::spawn_blocking(move || { - /// // Stand-in for compute-heavy work or using synchronous APIs - /// 1 + 1 - /// }).await; - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.num_blocking_threads(); - /// println!("Runtime has created {} threads", n); - /// } - /// ``` - pub fn num_blocking_threads(&self) -> usize { - self.handle.inner.num_blocking_threads() - } + cfg_unstable_metrics! { - /// Returns the number of active tasks in the runtime. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.active_tasks_count(); - /// println!("Runtime has {} active tasks", n); - /// } - /// ``` - pub fn active_tasks_count(&self) -> usize { - self.handle.inner.active_tasks_count() - } + /// Returns the number of additional threads spawned by the runtime. + /// + /// The number of workers is set by configuring `max_blocking_threads` on + /// `runtime::Builder`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let _ = tokio::task::spawn_blocking(move || { + /// // Stand-in for compute-heavy work or using synchronous APIs + /// 1 + 1 + /// }).await; + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.num_blocking_threads(); + /// println!("Runtime has created {} threads", n); + /// } + /// ``` + pub fn num_blocking_threads(&self) -> usize { + self.handle.inner.num_blocking_threads() + } - /// Returns the number of idle threads, which have spawned by the runtime - /// for `spawn_blocking` calls. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let _ = tokio::task::spawn_blocking(move || { - /// // Stand-in for compute-heavy work or using synchronous APIs - /// 1 + 1 - /// }).await; - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.num_idle_blocking_threads(); - /// println!("Runtime has {} idle blocking thread pool threads", n); - /// } - /// ``` - pub fn num_idle_blocking_threads(&self) -> usize { - self.handle.inner.num_idle_blocking_threads() - } + /// Returns the number of active tasks in the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.active_tasks_count(); + /// println!("Runtime has {} active tasks", n); + /// } + /// ``` + pub fn active_tasks_count(&self) -> usize { + self.handle.inner.active_tasks_count() + } - /// Returns the number of tasks scheduled from **outside** of the runtime. - /// - /// The remote schedule count starts at zero when the runtime is created and - /// increases by one each time a task is woken from **outside** of the - /// runtime. This usually means that a task is spawned or notified from a - /// non-runtime thread and must be queued using the Runtime's injection - /// queue, which tends to be slower. - /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.remote_schedule_count(); - /// println!("{} tasks were scheduled from outside the runtime", n); - /// } - /// ``` - pub fn remote_schedule_count(&self) -> u64 { - self.handle - .inner - .scheduler_metrics() - .remote_schedule_count - .load(Relaxed) - } + /// Returns the number of idle threads, which have spawned by the runtime + /// for `spawn_blocking` calls. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let _ = tokio::task::spawn_blocking(move || { + /// // Stand-in for compute-heavy work or using synchronous APIs + /// 1 + 1 + /// }).await; + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.num_idle_blocking_threads(); + /// println!("Runtime has {} idle blocking thread pool threads", n); + /// } + /// ``` + pub fn num_idle_blocking_threads(&self) -> usize { + self.handle.inner.num_idle_blocking_threads() + } - /// Returns the number of times that tasks have been forced to yield back to the scheduler - /// after exhausting their task budgets. - /// - /// This count starts at zero when the runtime is created and increases by one each time a task yields due to exhausting its budget. - /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. - pub fn budget_forced_yield_count(&self) -> u64 { - self.handle - .inner - .scheduler_metrics() - .budget_forced_yield_count - .load(Relaxed) - } + /// Returns the number of tasks scheduled from **outside** of the runtime. + /// + /// The remote schedule count starts at zero when the runtime is created and + /// increases by one each time a task is woken from **outside** of the + /// runtime. This usually means that a task is spawned or notified from a + /// non-runtime thread and must be queued using the Runtime's injection + /// queue, which tends to be slower. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.remote_schedule_count(); + /// println!("{} tasks were scheduled from outside the runtime", n); + /// } + /// ``` + pub fn remote_schedule_count(&self) -> u64 { + self.handle + .inner + .scheduler_metrics() + .remote_schedule_count + .load(Relaxed) + } - /// Returns the total number of times the given worker thread has parked. - /// - /// The worker park count starts at zero when the runtime is created and - /// increases by one each time the worker parks the thread waiting for new - /// inbound events to process. This usually means the worker has processed - /// all pending work and is currently idle. - /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.worker_park_count(0); - /// println!("worker 0 parked {} times", n); - /// } - /// ``` - pub fn worker_park_count(&self, worker: usize) -> u64 { - self.handle - .inner - .worker_metrics(worker) - .park_count - .load(Relaxed) - } + /// Returns the number of times that tasks have been forced to yield back to the scheduler + /// after exhausting their task budgets. + /// + /// This count starts at zero when the runtime is created and increases by one each time a task yields due to exhausting its budget. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + pub fn budget_forced_yield_count(&self) -> u64 { + self.handle + .inner + .scheduler_metrics() + .budget_forced_yield_count + .load(Relaxed) + } - /// Returns the number of times the given worker thread unparked but - /// performed no work before parking again. - /// - /// The worker no-op count starts at zero when the runtime is created and - /// increases by one each time the worker unparks the thread but finds no - /// new work and goes back to sleep. This indicates a false-positive wake up. - /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.worker_noop_count(0); - /// println!("worker 0 had {} no-op unparks", n); - /// } - /// ``` - pub fn worker_noop_count(&self, worker: usize) -> u64 { - self.handle - .inner - .worker_metrics(worker) - .noop_count - .load(Relaxed) - } + /// Returns the total number of times the given worker thread has parked. + /// + /// The worker park count starts at zero when the runtime is created and + /// increases by one each time the worker parks the thread waiting for new + /// inbound events to process. This usually means the worker has processed + /// all pending work and is currently idle. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_park_count(0); + /// println!("worker 0 parked {} times", n); + /// } + /// ``` + pub fn worker_park_count(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .park_count + .load(Relaxed) + } - /// Returns the number of tasks the given worker thread stole from - /// another worker thread. - /// - /// This metric only applies to the **multi-threaded** runtime and will - /// always return `0` when using the current thread runtime. - /// - /// The worker steal count starts at zero when the runtime is created and - /// increases by `N` each time the worker has processed its scheduled queue - /// and successfully steals `N` more pending tasks from another worker. - /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.worker_steal_count(0); - /// println!("worker 0 has stolen {} tasks", n); - /// } - /// ``` - pub fn worker_steal_count(&self, worker: usize) -> u64 { - self.handle - .inner - .worker_metrics(worker) - .steal_count - .load(Relaxed) - } + /// Returns the number of times the given worker thread unparked but + /// performed no work before parking again. + /// + /// The worker no-op count starts at zero when the runtime is created and + /// increases by one each time the worker unparks the thread but finds no + /// new work and goes back to sleep. This indicates a false-positive wake up. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_noop_count(0); + /// println!("worker 0 had {} no-op unparks", n); + /// } + /// ``` + pub fn worker_noop_count(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .noop_count + .load(Relaxed) + } - /// Returns the number of times the given worker thread stole tasks from - /// another worker thread. - /// - /// This metric only applies to the **multi-threaded** runtime and will - /// always return `0` when using the current thread runtime. - /// - /// The worker steal count starts at zero when the runtime is created and - /// increases by one each time the worker has processed its scheduled queue - /// and successfully steals more pending tasks from another worker. - /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.worker_steal_operations(0); - /// println!("worker 0 has stolen tasks {} times", n); - /// } - /// ``` - pub fn worker_steal_operations(&self, worker: usize) -> u64 { - self.handle - .inner - .worker_metrics(worker) - .steal_operations - .load(Relaxed) - } + /// Returns the number of tasks the given worker thread stole from + /// another worker thread. + /// + /// This metric only applies to the **multi-threaded** runtime and will + /// always return `0` when using the current thread runtime. + /// + /// The worker steal count starts at zero when the runtime is created and + /// increases by `N` each time the worker has processed its scheduled queue + /// and successfully steals `N` more pending tasks from another worker. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_steal_count(0); + /// println!("worker 0 has stolen {} tasks", n); + /// } + /// ``` + pub fn worker_steal_count(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .steal_count + .load(Relaxed) + } - /// Returns the number of tasks the given worker thread has polled. - /// - /// The worker poll count starts at zero when the runtime is created and - /// increases by one each time the worker polls a scheduled task. - /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.worker_poll_count(0); - /// println!("worker 0 has polled {} tasks", n); - /// } - /// ``` - pub fn worker_poll_count(&self, worker: usize) -> u64 { - self.handle - .inner - .worker_metrics(worker) - .poll_count - .load(Relaxed) - } + /// Returns the number of times the given worker thread stole tasks from + /// another worker thread. + /// + /// This metric only applies to the **multi-threaded** runtime and will + /// always return `0` when using the current thread runtime. + /// + /// The worker steal count starts at zero when the runtime is created and + /// increases by one each time the worker has processed its scheduled queue + /// and successfully steals more pending tasks from another worker. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_steal_operations(0); + /// println!("worker 0 has stolen tasks {} times", n); + /// } + /// ``` + pub fn worker_steal_operations(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .steal_operations + .load(Relaxed) + } - /// Returns the amount of time the given worker thread has been busy. - /// - /// The worker busy duration starts at zero when the runtime is created and - /// increases whenever the worker is spending time processing work. Using - /// this value can indicate the load of the given worker. If a lot of time - /// is spent busy, then the worker is under load and will check for inbound - /// events less often. - /// - /// The timer is monotonically increasing. It is never decremented or reset - /// to zero. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.worker_total_busy_duration(0); - /// println!("worker 0 was busy for a total of {:?}", n); - /// } - /// ``` - pub fn worker_total_busy_duration(&self, worker: usize) -> Duration { - let nanos = self - .handle - .inner - .worker_metrics(worker) - .busy_duration_total - .load(Relaxed); - Duration::from_nanos(nanos) - } + /// Returns the number of tasks the given worker thread has polled. + /// + /// The worker poll count starts at zero when the runtime is created and + /// increases by one each time the worker polls a scheduled task. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_poll_count(0); + /// println!("worker 0 has polled {} tasks", n); + /// } + /// ``` + pub fn worker_poll_count(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .poll_count + .load(Relaxed) + } - /// Returns the number of tasks scheduled from **within** the runtime on the - /// given worker's local queue. - /// - /// The local schedule count starts at zero when the runtime is created and - /// increases by one each time a task is woken from **inside** of the - /// runtime on the given worker. This usually means that a task is spawned - /// or notified from within a runtime thread and will be queued on the - /// worker-local queue. - /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.worker_local_schedule_count(0); - /// println!("{} tasks were scheduled on the worker's local queue", n); - /// } - /// ``` - pub fn worker_local_schedule_count(&self, worker: usize) -> u64 { - self.handle - .inner - .worker_metrics(worker) - .local_schedule_count - .load(Relaxed) - } + /// Returns the amount of time the given worker thread has been busy. + /// + /// The worker busy duration starts at zero when the runtime is created and + /// increases whenever the worker is spending time processing work. Using + /// this value can indicate the load of the given worker. If a lot of time + /// is spent busy, then the worker is under load and will check for inbound + /// events less often. + /// + /// The timer is monotonically increasing. It is never decremented or reset + /// to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_total_busy_duration(0); + /// println!("worker 0 was busy for a total of {:?}", n); + /// } + /// ``` + pub fn worker_total_busy_duration(&self, worker: usize) -> Duration { + let nanos = self + .handle + .inner + .worker_metrics(worker) + .busy_duration_total + .load(Relaxed); + Duration::from_nanos(nanos) + } - /// Returns the number of times the given worker thread saturated its local - /// queue. - /// - /// This metric only applies to the **multi-threaded** scheduler. - /// - /// The worker overflow count starts at zero when the runtime is created and - /// increases by one each time the worker attempts to schedule a task - /// locally, but its local queue is full. When this happens, half of the - /// local queue is moved to the injection queue. - /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.worker_overflow_count(0); - /// println!("worker 0 has overflowed its queue {} times", n); - /// } - /// ``` - pub fn worker_overflow_count(&self, worker: usize) -> u64 { - self.handle - .inner - .worker_metrics(worker) - .overflow_count - .load(Relaxed) - } + /// Returns the number of tasks scheduled from **within** the runtime on the + /// given worker's local queue. + /// + /// The local schedule count starts at zero when the runtime is created and + /// increases by one each time a task is woken from **inside** of the + /// runtime on the given worker. This usually means that a task is spawned + /// or notified from within a runtime thread and will be queued on the + /// worker-local queue. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_local_schedule_count(0); + /// println!("{} tasks were scheduled on the worker's local queue", n); + /// } + /// ``` + pub fn worker_local_schedule_count(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .local_schedule_count + .load(Relaxed) + } - /// Returns the number of tasks currently scheduled in the runtime's - /// injection queue. - /// - /// Tasks that are spawned or notified from a non-runtime thread are - /// scheduled using the runtime's injection queue. This metric returns the - /// **current** number of tasks pending in the injection queue. As such, the - /// returned value may increase or decrease as new tasks are scheduled and - /// processed. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.injection_queue_depth(); - /// println!("{} tasks currently pending in the runtime's injection queue", n); - /// } - /// ``` - pub fn injection_queue_depth(&self) -> usize { - self.handle.inner.injection_queue_depth() - } + /// Returns the number of times the given worker thread saturated its local + /// queue. + /// + /// This metric only applies to the **multi-threaded** scheduler. + /// + /// The worker overflow count starts at zero when the runtime is created and + /// increases by one each time the worker attempts to schedule a task + /// locally, but its local queue is full. When this happens, half of the + /// local queue is moved to the injection queue. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_overflow_count(0); + /// println!("worker 0 has overflowed its queue {} times", n); + /// } + /// ``` + pub fn worker_overflow_count(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .overflow_count + .load(Relaxed) + } - /// Returns the number of tasks currently scheduled in the given worker's - /// local queue. - /// - /// Tasks that are spawned or notified from within a runtime thread are - /// scheduled using that worker's local queue. This metric returns the - /// **current** number of tasks pending in the worker's local queue. As - /// such, the returned value may increase or decrease as new tasks are - /// scheduled and processed. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.worker_local_queue_depth(0); - /// println!("{} tasks currently pending in worker 0's local queue", n); - /// } - /// ``` - pub fn worker_local_queue_depth(&self, worker: usize) -> usize { - self.handle.inner.worker_local_queue_depth(worker) - } + /// Returns the number of tasks currently scheduled in the runtime's + /// injection queue. + /// + /// Tasks that are spawned or notified from a non-runtime thread are + /// scheduled using the runtime's injection queue. This metric returns the + /// **current** number of tasks pending in the injection queue. As such, the + /// returned value may increase or decrease as new tasks are scheduled and + /// processed. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.injection_queue_depth(); + /// println!("{} tasks currently pending in the runtime's injection queue", n); + /// } + /// ``` + pub fn injection_queue_depth(&self) -> usize { + self.handle.inner.injection_queue_depth() + } - /// Returns `true` if the runtime is tracking the distribution of task poll - /// times. - /// - /// Task poll times are not instrumented by default as doing so requires - /// calling [`Instant::now()`] twice per task poll. The feature is enabled - /// by calling [`enable_metrics_poll_count_histogram()`] when building the - /// runtime. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::{self, Handle}; - /// - /// fn main() { - /// runtime::Builder::new_current_thread() - /// .enable_metrics_poll_count_histogram() - /// .build() - /// .unwrap() - /// .block_on(async { - /// let metrics = Handle::current().metrics(); - /// let enabled = metrics.poll_count_histogram_enabled(); - /// - /// println!("Tracking task poll time distribution: {:?}", enabled); - /// }); - /// } - /// ``` - /// - /// [`enable_metrics_poll_count_histogram()`]: crate::runtime::Builder::enable_metrics_poll_count_histogram - /// [`Instant::now()`]: std::time::Instant::now - pub fn poll_count_histogram_enabled(&self) -> bool { - self.handle - .inner - .worker_metrics(0) - .poll_count_histogram - .is_some() - } + /// Returns the number of tasks currently scheduled in the given worker's + /// local queue. + /// + /// Tasks that are spawned or notified from within a runtime thread are + /// scheduled using that worker's local queue. This metric returns the + /// **current** number of tasks pending in the worker's local queue. As + /// such, the returned value may increase or decrease as new tasks are + /// scheduled and processed. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_local_queue_depth(0); + /// println!("{} tasks currently pending in worker 0's local queue", n); + /// } + /// ``` + pub fn worker_local_queue_depth(&self, worker: usize) -> usize { + self.handle.inner.worker_local_queue_depth(worker) + } - /// Returns the number of histogram buckets tracking the distribution of - /// task poll times. - /// - /// This value is configured by calling - /// [`metrics_poll_count_histogram_buckets()`] when building the runtime. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::{self, Handle}; - /// - /// fn main() { - /// runtime::Builder::new_current_thread() - /// .enable_metrics_poll_count_histogram() - /// .build() - /// .unwrap() - /// .block_on(async { - /// let metrics = Handle::current().metrics(); - /// let buckets = metrics.poll_count_histogram_num_buckets(); - /// - /// println!("Histogram buckets: {:?}", buckets); - /// }); - /// } - /// ``` - /// - /// [`metrics_poll_count_histogram_buckets()`]: - /// crate::runtime::Builder::metrics_poll_count_histogram_buckets - pub fn poll_count_histogram_num_buckets(&self) -> usize { - self.handle - .inner - .worker_metrics(0) - .poll_count_histogram - .as_ref() - .map(|histogram| histogram.num_buckets()) - .unwrap_or_default() - } + /// Returns `true` if the runtime is tracking the distribution of task poll + /// times. + /// + /// Task poll times are not instrumented by default as doing so requires + /// calling [`Instant::now()`] twice per task poll. The feature is enabled + /// by calling [`enable_metrics_poll_count_histogram()`] when building the + /// runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_poll_count_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let enabled = metrics.poll_count_histogram_enabled(); + /// + /// println!("Tracking task poll time distribution: {:?}", enabled); + /// }); + /// } + /// ``` + /// + /// [`enable_metrics_poll_count_histogram()`]: crate::runtime::Builder::enable_metrics_poll_count_histogram + /// [`Instant::now()`]: std::time::Instant::now + pub fn poll_count_histogram_enabled(&self) -> bool { + self.handle + .inner + .worker_metrics(0) + .poll_count_histogram + .is_some() + } - /// Returns the range of task poll times tracked by the given bucket. - /// - /// This value is configured by calling - /// [`metrics_poll_count_histogram_resolution()`] when building the runtime. - /// - /// # Panics - /// - /// The method panics if `bucket` represents an invalid bucket index, i.e. - /// is greater than or equal to `poll_count_histogram_num_buckets()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::{self, Handle}; - /// - /// fn main() { - /// runtime::Builder::new_current_thread() - /// .enable_metrics_poll_count_histogram() - /// .build() - /// .unwrap() - /// .block_on(async { - /// let metrics = Handle::current().metrics(); - /// let buckets = metrics.poll_count_histogram_num_buckets(); - /// - /// for i in 0..buckets { - /// let range = metrics.poll_count_histogram_bucket_range(i); - /// println!("Histogram bucket {} range: {:?}", i, range); - /// } - /// }); - /// } - /// ``` - /// - /// [`metrics_poll_count_histogram_resolution()`]: - /// crate::runtime::Builder::metrics_poll_count_histogram_resolution - #[track_caller] - pub fn poll_count_histogram_bucket_range(&self, bucket: usize) -> Range { - self.handle - .inner - .worker_metrics(0) - .poll_count_histogram - .as_ref() - .map(|histogram| { - let range = histogram.bucket_range(bucket); - std::ops::Range { - start: Duration::from_nanos(range.start), - end: Duration::from_nanos(range.end), - } - }) - .unwrap_or_default() - } + /// Returns the number of histogram buckets tracking the distribution of + /// task poll times. + /// + /// This value is configured by calling + /// [`metrics_poll_count_histogram_buckets()`] when building the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_poll_count_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let buckets = metrics.poll_count_histogram_num_buckets(); + /// + /// println!("Histogram buckets: {:?}", buckets); + /// }); + /// } + /// ``` + /// + /// [`metrics_poll_count_histogram_buckets()`]: + /// crate::runtime::Builder::metrics_poll_count_histogram_buckets + pub fn poll_count_histogram_num_buckets(&self) -> usize { + self.handle + .inner + .worker_metrics(0) + .poll_count_histogram + .as_ref() + .map(|histogram| histogram.num_buckets()) + .unwrap_or_default() + } - /// Returns the number of times the given worker polled tasks with a poll - /// duration within the given bucket's range. - /// - /// Each worker maintains its own histogram and the counts for each bucket - /// starts at zero when the runtime is created. Each time the worker polls a - /// task, it tracks the duration the task poll time took and increments the - /// associated bucket by 1. - /// - /// Each bucket is a monotonically increasing counter. It is never - /// decremented or reset to zero. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// `bucket` is the index of the bucket being queried. The bucket is scoped - /// to the worker. The range represented by the bucket can be queried by - /// calling [`poll_count_histogram_bucket_range()`]. Each worker maintains - /// identical bucket ranges. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()` or if `bucket` represents an - /// invalid bucket. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::{self, Handle}; - /// - /// fn main() { - /// runtime::Builder::new_current_thread() - /// .enable_metrics_poll_count_histogram() - /// .build() - /// .unwrap() - /// .block_on(async { - /// let metrics = Handle::current().metrics(); - /// let buckets = metrics.poll_count_histogram_num_buckets(); - /// - /// for worker in 0..metrics.num_workers() { - /// for i in 0..buckets { - /// let count = metrics.poll_count_histogram_bucket_count(worker, i); - /// println!("Poll count {}", count); - /// } - /// } - /// }); - /// } - /// ``` - /// - /// [`poll_count_histogram_bucket_range()`]: crate::runtime::RuntimeMetrics::poll_count_histogram_bucket_range - #[track_caller] - pub fn poll_count_histogram_bucket_count(&self, worker: usize, bucket: usize) -> u64 { - self.handle - .inner - .worker_metrics(worker) - .poll_count_histogram - .as_ref() - .map(|histogram| histogram.get(bucket)) - .unwrap_or_default() - } + /// Returns the range of task poll times tracked by the given bucket. + /// + /// This value is configured by calling + /// [`metrics_poll_count_histogram_resolution()`] when building the runtime. + /// + /// # Panics + /// + /// The method panics if `bucket` represents an invalid bucket index, i.e. + /// is greater than or equal to `poll_count_histogram_num_buckets()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_poll_count_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let buckets = metrics.poll_count_histogram_num_buckets(); + /// + /// for i in 0..buckets { + /// let range = metrics.poll_count_histogram_bucket_range(i); + /// println!("Histogram bucket {} range: {:?}", i, range); + /// } + /// }); + /// } + /// ``` + /// + /// [`metrics_poll_count_histogram_resolution()`]: + /// crate::runtime::Builder::metrics_poll_count_histogram_resolution + #[track_caller] + pub fn poll_count_histogram_bucket_range(&self, bucket: usize) -> Range { + self.handle + .inner + .worker_metrics(0) + .poll_count_histogram + .as_ref() + .map(|histogram| { + let range = histogram.bucket_range(bucket); + std::ops::Range { + start: Duration::from_nanos(range.start), + end: Duration::from_nanos(range.end), + } + }) + .unwrap_or_default() + } - /// Returns the mean duration of task polls, in nanoseconds. - /// - /// This is an exponentially weighted moving average. Currently, this metric - /// is only provided by the multi-threaded runtime. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.worker_mean_poll_time(0); - /// println!("worker 0 has a mean poll time of {:?}", n); - /// } - /// ``` - #[track_caller] - pub fn worker_mean_poll_time(&self, worker: usize) -> Duration { - let nanos = self - .handle - .inner - .worker_metrics(worker) - .mean_poll_time - .load(Relaxed); - Duration::from_nanos(nanos) - } + /// Returns the number of times the given worker polled tasks with a poll + /// duration within the given bucket's range. + /// + /// Each worker maintains its own histogram and the counts for each bucket + /// starts at zero when the runtime is created. Each time the worker polls a + /// task, it tracks the duration the task poll time took and increments the + /// associated bucket by 1. + /// + /// Each bucket is a monotonically increasing counter. It is never + /// decremented or reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// `bucket` is the index of the bucket being queried. The bucket is scoped + /// to the worker. The range represented by the bucket can be queried by + /// calling [`poll_count_histogram_bucket_range()`]. Each worker maintains + /// identical bucket ranges. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()` or if `bucket` represents an + /// invalid bucket. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_poll_count_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let buckets = metrics.poll_count_histogram_num_buckets(); + /// + /// for worker in 0..metrics.num_workers() { + /// for i in 0..buckets { + /// let count = metrics.poll_count_histogram_bucket_count(worker, i); + /// println!("Poll count {}", count); + /// } + /// } + /// }); + /// } + /// ``` + /// + /// [`poll_count_histogram_bucket_range()`]: crate::runtime::RuntimeMetrics::poll_count_histogram_bucket_range + #[track_caller] + pub fn poll_count_histogram_bucket_count(&self, worker: usize, bucket: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .poll_count_histogram + .as_ref() + .map(|histogram| histogram.get(bucket)) + .unwrap_or_default() + } - /// Returns the number of tasks currently scheduled in the blocking - /// thread pool, spawned using `spawn_blocking`. - /// - /// This metric returns the **current** number of tasks pending in - /// blocking thread pool. As such, the returned value may increase - /// or decrease as new tasks are scheduled and processed. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.blocking_queue_depth(); - /// println!("{} tasks currently pending in the blocking thread pool", n); - /// } - /// ``` - pub fn blocking_queue_depth(&self) -> usize { - self.handle.inner.blocking_queue_depth() - } -} + /// Returns the mean duration of task polls, in nanoseconds. + /// + /// This is an exponentially weighted moving average. Currently, this metric + /// is only provided by the multi-threaded runtime. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_mean_poll_time(0); + /// println!("worker 0 has a mean poll time of {:?}", n); + /// } + /// ``` + #[track_caller] + pub fn worker_mean_poll_time(&self, worker: usize) -> Duration { + let nanos = self + .handle + .inner + .worker_metrics(worker) + .mean_poll_time + .load(Relaxed); + Duration::from_nanos(nanos) + } -cfg_net! { - impl RuntimeMetrics { - /// Returns the number of file descriptors that have been registered with the - /// runtime's I/O driver. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let registered_fds = metrics.io_driver_fd_registered_count(); - /// println!("{} fds have been registered with the runtime's I/O driver.", registered_fds); - /// - /// let deregistered_fds = metrics.io_driver_fd_deregistered_count(); - /// - /// let current_fd_count = registered_fds - deregistered_fds; - /// println!("{} fds are currently registered by the runtime's I/O driver.", current_fd_count); - /// } - /// ``` - pub fn io_driver_fd_registered_count(&self) -> u64 { - self.with_io_driver_metrics(|m| { - m.fd_registered_count.load(Relaxed) - }) - } + /// Returns the number of tasks currently scheduled in the blocking + /// thread pool, spawned using `spawn_blocking`. + /// + /// This metric returns the **current** number of tasks pending in + /// blocking thread pool. As such, the returned value may increase + /// or decrease as new tasks are scheduled and processed. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.blocking_queue_depth(); + /// println!("{} tasks currently pending in the blocking thread pool", n); + /// } + /// ``` + pub fn blocking_queue_depth(&self) -> usize { + self.handle.inner.blocking_queue_depth() + } - /// Returns the number of file descriptors that have been deregistered by the - /// runtime's I/O driver. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.io_driver_fd_deregistered_count(); - /// println!("{} fds have been deregistered by the runtime's I/O driver.", n); - /// } - /// ``` - pub fn io_driver_fd_deregistered_count(&self) -> u64 { - self.with_io_driver_metrics(|m| { - m.fd_deregistered_count.load(Relaxed) - }) - } + cfg_net! { + /// Returns the number of file descriptors that have been registered with the + /// runtime's I/O driver. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let registered_fds = metrics.io_driver_fd_registered_count(); + /// println!("{} fds have been registered with the runtime's I/O driver.", registered_fds); + /// + /// let deregistered_fds = metrics.io_driver_fd_deregistered_count(); + /// + /// let current_fd_count = registered_fds - deregistered_fds; + /// println!("{} fds are currently registered by the runtime's I/O driver.", current_fd_count); + /// } + /// ``` + pub fn io_driver_fd_registered_count(&self) -> u64 { + self.with_io_driver_metrics(|m| { + m.fd_registered_count.load(Relaxed) + }) + } - /// Returns the number of ready events processed by the runtime's - /// I/O driver. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.io_driver_ready_count(); - /// println!("{} ready events processed by the runtime's I/O driver.", n); - /// } - /// ``` - pub fn io_driver_ready_count(&self) -> u64 { - self.with_io_driver_metrics(|m| m.ready_count.load(Relaxed)) - } + /// Returns the number of file descriptors that have been deregistered by the + /// runtime's I/O driver. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.io_driver_fd_deregistered_count(); + /// println!("{} fds have been deregistered by the runtime's I/O driver.", n); + /// } + /// ``` + pub fn io_driver_fd_deregistered_count(&self) -> u64 { + self.with_io_driver_metrics(|m| { + m.fd_deregistered_count.load(Relaxed) + }) + } + + /// Returns the number of ready events processed by the runtime's + /// I/O driver. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.io_driver_ready_count(); + /// println!("{} ready events processed by the runtime's I/O driver.", n); + /// } + /// ``` + pub fn io_driver_ready_count(&self) -> u64 { + self.with_io_driver_metrics(|m| m.ready_count.load(Relaxed)) + } - fn with_io_driver_metrics(&self, f: F) -> u64 - where - F: Fn(&super::IoDriverMetrics) -> u64, - { - // TODO: Investigate if this should return 0, most of our metrics always increase - // thus this breaks that guarantee. - self.handle - .inner - .driver() - .io - .as_ref() - .map(|h| f(&h.metrics)) - .unwrap_or(0) + fn with_io_driver_metrics(&self, f: F) -> u64 + where + F: Fn(&super::IoDriverMetrics) -> u64, + { + // TODO: Investigate if this should return 0, most of our metrics always increase + // thus this breaks that guarantee. + self.handle + .inner + .driver() + .io + .as_ref() + .map(|h| f(&h.metrics)) + .unwrap_or(0) + } } } } diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 3d333960f3d..89c8e9afe28 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -388,21 +388,23 @@ cfg_rt! { mod thread_id; pub(crate) use thread_id::ThreadId; + pub(crate) mod metrics; cfg_metrics! { - mod metrics; - pub use metrics::{RuntimeMetrics, HistogramScale}; + pub use metrics::RuntimeMetrics; - pub(crate) use metrics::{MetricsBatch, SchedulerMetrics, WorkerMetrics, HistogramBuilder}; - cfg_net! { - pub(crate) use metrics::IoDriverMetrics; + cfg_unstable_metrics! { + pub use metrics::HistogramScale; + + + cfg_net! { + pub(crate) use metrics::IoDriverMetrics; + } } } - cfg_not_metrics! { - pub(crate) mod metrics; - pub(crate) use metrics::{SchedulerMetrics, WorkerMetrics, MetricsBatch, HistogramBuilder}; - } + pub(crate) use metrics::{MetricsBatch, SchedulerMetrics, WorkerMetrics, HistogramBuilder}; + /// After thread starts / before thread stops type Callback = std::sync::Arc; diff --git a/tokio/src/runtime/runtime.rs b/tokio/src/runtime/runtime.rs index 7cf2cebeffc..eb354cce46f 100644 --- a/tokio/src/runtime/runtime.rs +++ b/tokio/src/runtime/runtime.rs @@ -455,6 +455,14 @@ impl Runtime { pub fn shutdown_background(self) { self.shutdown_timeout(Duration::from_nanos(0)); } + + cfg_metrics! { + /// Returns a view that lets you get information about how the runtime + /// is performing. + pub fn metrics(&self) -> crate::runtime::RuntimeMetrics { + self.handle.metrics() + } + } } #[allow(clippy::single_match)] // there are comments in the error branch, so we don't want if-let @@ -487,12 +495,4 @@ impl std::panic::UnwindSafe for Runtime {} impl std::panic::RefUnwindSafe for Runtime {} -cfg_metrics! { - impl Runtime { - /// Returns a view that lets you get information about how the runtime - /// is performing. - pub fn metrics(&self) -> crate::runtime::RuntimeMetrics { - self.handle.metrics() - } - } -} +impl Runtime {} diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index 36bcefc4406..b9c23837a58 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -502,7 +502,7 @@ impl Handle { } } -cfg_metrics! { +cfg_unstable_metrics! { impl Handle { pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { &self.shared.scheduler_metrics diff --git a/tokio/src/runtime/scheduler/inject.rs b/tokio/src/runtime/scheduler/inject.rs index 39976fcd7a2..811b02c136c 100644 --- a/tokio/src/runtime/scheduler/inject.rs +++ b/tokio/src/runtime/scheduler/inject.rs @@ -16,7 +16,7 @@ cfg_rt_multi_thread! { mod rt_multi_thread; } -cfg_metrics! { +cfg_unstable_metrics! { mod metrics; } diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index 04fbff39e47..47860765f48 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -164,8 +164,6 @@ cfg_rt! { } cfg_metrics! { - use crate::runtime::{SchedulerMetrics, WorkerMetrics}; - impl Handle { pub(crate) fn num_workers(&self) -> usize { match self { @@ -176,6 +174,12 @@ cfg_rt! { Handle::MultiThreadAlt(handle) => handle.num_workers(), } } + } + + cfg_unstable_metrics! { + use crate::runtime::{SchedulerMetrics, WorkerMetrics}; + impl Handle { + pub(crate) fn num_blocking_threads(&self) -> usize { match_flavor!(self, Handle(handle) => handle.num_blocking_threads()) @@ -210,6 +214,7 @@ cfg_rt! { } } } +} impl Context { #[track_caller] diff --git a/tokio/src/runtime/scheduler/multi_thread/handle.rs b/tokio/src/runtime/scheduler/multi_thread/handle.rs index 568eb80af8b..72f776e47fa 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle.rs @@ -9,9 +9,7 @@ use crate::util::RngSeedGenerator; use std::fmt; -cfg_metrics! { - mod metrics; -} +mod metrics; cfg_taskdump! { mod taskdump; diff --git a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs index 838694fc89e..61d4d950173 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs @@ -1,41 +1,43 @@ use super::Handle; -use crate::runtime::{SchedulerMetrics, WorkerMetrics}; - impl Handle { pub(crate) fn num_workers(&self) -> usize { self.shared.worker_metrics.len() } +} - pub(crate) fn num_blocking_threads(&self) -> usize { - self.blocking_spawner.num_threads() - } +cfg_unstable_metrics! { + impl Handle { + pub(crate) fn num_blocking_threads(&self) -> usize { + self.blocking_spawner.num_threads() + } - pub(crate) fn num_idle_blocking_threads(&self) -> usize { - self.blocking_spawner.num_idle_threads() - } + pub(crate) fn num_idle_blocking_threads(&self) -> usize { + self.blocking_spawner.num_idle_threads() + } - pub(crate) fn active_tasks_count(&self) -> usize { - self.shared.owned.active_tasks_count() - } + pub(crate) fn active_tasks_count(&self) -> usize { + self.shared.owned.active_tasks_count() + } - pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { - &self.shared.scheduler_metrics - } + pub(crate) fn scheduler_metrics(&self) -> &crate::runtime::SchedulerMetrics { + &self.shared.scheduler_metrics + } - pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { - &self.shared.worker_metrics[worker] - } + pub(crate) fn worker_metrics(&self, worker: usize) -> &crate::runtime::WorkerMetrics { + &self.shared.worker_metrics[worker] + } - pub(crate) fn injection_queue_depth(&self) -> usize { - self.shared.injection_queue_depth() - } + pub(crate) fn injection_queue_depth(&self) -> usize { + self.shared.injection_queue_depth() + } - pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { - self.shared.worker_local_queue_depth(worker) - } + pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { + self.shared.worker_local_queue_depth(worker) + } - pub(crate) fn blocking_queue_depth(&self) -> usize { - self.blocking_spawner.queue_depth() + pub(crate) fn blocking_queue_depth(&self) -> usize { + self.blocking_spawner.queue_depth() + } } } diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index 35223289870..99ee31ba15b 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -546,7 +546,7 @@ impl Steal { } } -cfg_metrics! { +cfg_unstable_metrics! { impl Steal { pub(crate) fn len(&self) -> usize { self.0.len() as _ diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 83e70795f4f..fdbe6c268ff 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -74,7 +74,7 @@ use std::cell::RefCell; use std::task::Waker; use std::time::Duration; -cfg_metrics! { +cfg_unstable_metrics! { mod metrics; } diff --git a/tokio/src/runtime/tests/queue.rs b/tokio/src/runtime/tests/queue.rs index 5df92b7a291..e957fcb37da 100644 --- a/tokio/src/runtime/tests/queue.rs +++ b/tokio/src/runtime/tests/queue.rs @@ -37,7 +37,7 @@ fn fits_256_one_at_a_time() { local.push_back_or_overflow(task, &inject, &mut stats); } - cfg_metrics! { + cfg_unstable_metrics! { assert_metrics!(stats, overflow_count == 0); } @@ -95,7 +95,7 @@ fn overflow() { local.push_back_or_overflow(task, &inject, &mut stats); } - cfg_metrics! { + cfg_unstable_metrics! { assert_metrics!(stats, overflow_count == 1); } @@ -125,7 +125,7 @@ fn steal_batch() { assert!(steal1.steal_into(&mut local2, &mut stats).is_some()); - cfg_metrics! { + cfg_unstable_metrics! { assert_metrics!(stats, steal_count == 2); } @@ -181,7 +181,7 @@ fn stress1() { thread::yield_now(); } - cfg_metrics! { + cfg_unstable_metrics! { assert_metrics!(stats, steal_count == n as _); }