diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index f44599ff47a..8a72476f7c4 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -215,7 +215,7 @@ macro_rules! cfg_macros { } } -macro_rules! cfg_metrics { +macro_rules! cfg_unstable_metrics { ($($item:item)*) => { $( #[cfg(tokio_unstable)] @@ -245,7 +245,7 @@ macro_rules! cfg_no_64bit_metrics { } } -macro_rules! cfg_not_metrics { +macro_rules! cfg_not_unstable_metrics { ($($item:item)*) => { $( #[cfg(not(tokio_unstable))] diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index c74aea76568..3757079f329 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -40,7 +40,7 @@ impl SpawnerMetrics { self.num_idle_threads.load(Ordering::Relaxed) } - cfg_metrics! { + cfg_unstable_metrics! { fn queue_depth(&self) -> usize { self.queue_depth.load(Ordering::Relaxed) } @@ -474,7 +474,7 @@ impl Spawner { } } -cfg_metrics! { +cfg_unstable_metrics! { impl Spawner { pub(crate) fn num_threads(&self) -> usize { self.inner.metrics.num_threads() diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 05f736d3e50..519c7d01413 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -957,7 +957,7 @@ impl Builder { } } - cfg_metrics! { + cfg_unstable_metrics! { /// Enables tracking the distribution of task poll times. /// /// Task poll times are not instrumented by default as doing so requires diff --git a/tokio/src/runtime/coop.rs b/tokio/src/runtime/coop.rs index d9f7ff2af29..f2afa75c9c4 100644 --- a/tokio/src/runtime/coop.rs +++ b/tokio/src/runtime/coop.rs @@ -197,7 +197,7 @@ cfg_coop! { } cfg_rt! { - cfg_metrics! { + cfg_unstable_metrics! { #[inline(always)] fn inc_budget_forced_yield_count() { let _ = context::with_current(|handle| { @@ -206,7 +206,7 @@ cfg_coop! { } } - cfg_not_metrics! { + cfg_not_unstable_metrics! { #[inline(always)] fn inc_budget_forced_yield_count() {} } diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 01d210cd36f..5691a6e3bd2 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -1,6 +1,6 @@ #[cfg(tokio_unstable)] use crate::runtime; -use crate::runtime::{context, scheduler, RuntimeFlavor}; +use crate::runtime::{context, scheduler, RuntimeFlavor, RuntimeMetrics}; /// Handle to the runtime. /// @@ -393,17 +393,11 @@ impl Handle { owned_id.into() } } -} - -cfg_metrics! { - use crate::runtime::RuntimeMetrics; - impl Handle { - /// Returns a view that lets you get information about how the runtime - /// is performing. - pub fn metrics(&self) -> RuntimeMetrics { - RuntimeMetrics::new(self.clone()) - } + /// Returns a view that lets you get information about how the runtime + /// is performing. + pub fn metrics(&self) -> RuntimeMetrics { + RuntimeMetrics::new(self.clone()) } } diff --git a/tokio/src/runtime/io/metrics.rs b/tokio/src/runtime/io/metrics.rs index ec341efe680..e7a01bc2f46 100644 --- a/tokio/src/runtime/io/metrics.rs +++ b/tokio/src/runtime/io/metrics.rs @@ -17,7 +17,7 @@ cfg_not_rt_and_metrics_and_net! { cfg_net! { cfg_rt! { - cfg_metrics! { + cfg_unstable_metrics! { pub(crate) use crate::runtime::IoDriverMetrics; } } diff --git a/tokio/src/runtime/metrics/histogram.rs b/tokio/src/runtime/metrics/histogram.rs index f75ffa3b495..4cfd769a94e 100644 --- a/tokio/src/runtime/metrics/histogram.rs +++ b/tokio/src/runtime/metrics/histogram.rs @@ -1,5 +1,5 @@ -use crate::loom::sync::atomic::Ordering::Relaxed; use crate::util::metric_atomics::MetricAtomicU64; +use std::sync::atomic::Ordering::Relaxed; use std::cmp; use std::ops::Range; diff --git a/tokio/src/runtime/metrics/io.rs b/tokio/src/runtime/metrics/io.rs index 674fca5faec..9fdf3c96694 100644 --- a/tokio/src/runtime/metrics/io.rs +++ b/tokio/src/runtime/metrics/io.rs @@ -1,6 +1,7 @@ #![cfg_attr(not(feature = "net"), allow(dead_code))] -use crate::{loom::sync::atomic::Ordering::Relaxed, util::metric_atomics::MetricAtomicU64}; +use crate::util::metric_atomics::MetricAtomicU64; +use std::sync::atomic::Ordering::Relaxed; #[derive(Default)] pub(crate) struct IoDriverMetrics { diff --git a/tokio/src/runtime/metrics/mod.rs b/tokio/src/runtime/metrics/mod.rs index 88be4a5211f..295c97cce88 100644 --- a/tokio/src/runtime/metrics/mod.rs +++ b/tokio/src/runtime/metrics/mod.rs @@ -8,7 +8,10 @@ //! [unstable]: crate#unstable-features #![allow(clippy::module_inception)] -cfg_metrics! { +mod runtime; +pub use runtime::RuntimeMetrics; + +cfg_unstable_metrics! { mod batch; pub(crate) use batch::MetricsBatch; @@ -17,9 +20,6 @@ cfg_metrics! { #[allow(unreachable_pub)] // rust-lang/rust#57411 pub use histogram::HistogramScale; - mod runtime; - #[allow(unreachable_pub)] // rust-lang/rust#57411 - pub use runtime::RuntimeMetrics; mod scheduler; pub(crate) use scheduler::SchedulerMetrics; @@ -33,7 +33,7 @@ cfg_metrics! { } } -cfg_not_metrics! { +cfg_not_unstable_metrics! { mod mock; pub(crate) use mock::{SchedulerMetrics, WorkerMetrics, MetricsBatch, HistogramBuilder}; diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 865a6406a6a..8d30f66f6ff 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -1,10 +1,12 @@ use crate::runtime::Handle; -use std::ops::Range; -cfg_64bit_metrics! { - use std::sync::atomic::Ordering::Relaxed; +cfg_unstable_metrics! { + use std::ops::Range; + cfg_64bit_metrics! { + use std::sync::atomic::Ordering::Relaxed; + } + use std::time::Duration; } -use std::time::Duration; /// Handle to the runtime's metrics. /// @@ -45,221 +47,354 @@ impl RuntimeMetrics { self.handle.inner.num_workers() } - /// Returns the number of additional threads spawned by the runtime. - /// - /// The number of workers is set by configuring `max_blocking_threads` on - /// `runtime::Builder`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let _ = tokio::task::spawn_blocking(move || { - /// // Stand-in for compute-heavy work or using synchronous APIs - /// 1 + 1 - /// }).await; - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.num_blocking_threads(); - /// println!("Runtime has created {} threads", n); - /// } - /// ``` - pub fn num_blocking_threads(&self) -> usize { - self.handle.inner.num_blocking_threads() - } + cfg_unstable_metrics! { - /// Returns the number of active tasks in the runtime. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.active_tasks_count(); - /// println!("Runtime has {} active tasks", n); - /// } - /// ``` - pub fn active_tasks_count(&self) -> usize { - self.handle.inner.active_tasks_count() - } + /// Returns the number of additional threads spawned by the runtime. + /// + /// The number of workers is set by configuring `max_blocking_threads` on + /// `runtime::Builder`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let _ = tokio::task::spawn_blocking(move || { + /// // Stand-in for compute-heavy work or using synchronous APIs + /// 1 + 1 + /// }).await; + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.num_blocking_threads(); + /// println!("Runtime has created {} threads", n); + /// } + /// ``` + pub fn num_blocking_threads(&self) -> usize { + self.handle.inner.num_blocking_threads() + } - /// Returns the number of idle threads, which have spawned by the runtime - /// for `spawn_blocking` calls. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let _ = tokio::task::spawn_blocking(move || { - /// // Stand-in for compute-heavy work or using synchronous APIs - /// 1 + 1 - /// }).await; - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.num_idle_blocking_threads(); - /// println!("Runtime has {} idle blocking thread pool threads", n); - /// } - /// ``` - pub fn num_idle_blocking_threads(&self) -> usize { - self.handle.inner.num_idle_blocking_threads() - } + /// Returns the number of active tasks in the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.active_tasks_count(); + /// println!("Runtime has {} active tasks", n); + /// } + /// ``` + pub fn active_tasks_count(&self) -> usize { + self.handle.inner.active_tasks_count() + } - cfg_64bit_metrics! { - /// Returns the number of tasks scheduled from **outside** of the runtime. - /// - /// The remote schedule count starts at zero when the runtime is created and - /// increases by one each time a task is woken from **outside** of the - /// runtime. This usually means that a task is spawned or notified from a - /// non-runtime thread and must be queued using the Runtime's injection - /// queue, which tends to be slower. - /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.remote_schedule_count(); - /// println!("{} tasks were scheduled from outside the runtime", n); - /// } - /// ``` - pub fn remote_schedule_count(&self) -> u64 { - self.handle - .inner - .scheduler_metrics() - .remote_schedule_count - .load(Relaxed) - } + /// Returns the number of idle threads, which have spawned by the runtime + /// for `spawn_blocking` calls. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let _ = tokio::task::spawn_blocking(move || { + /// // Stand-in for compute-heavy work or using synchronous APIs + /// 1 + 1 + /// }).await; + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.num_idle_blocking_threads(); + /// println!("Runtime has {} idle blocking thread pool threads", n); + /// } + /// ``` + pub fn num_idle_blocking_threads(&self) -> usize { + self.handle.inner.num_idle_blocking_threads() + } - /// Returns the number of times that tasks have been forced to yield back to the scheduler - /// after exhausting their task budgets. - /// - /// This count starts at zero when the runtime is created and increases by one each time a task yields due to exhausting its budget. - /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. - pub fn budget_forced_yield_count(&self) -> u64 { - self.handle - .inner - .scheduler_metrics() - .budget_forced_yield_count - .load(Relaxed) - } + cfg_64bit_metrics! { + /// Returns the number of tasks scheduled from **outside** of the runtime. + /// + /// The remote schedule count starts at zero when the runtime is created and + /// increases by one each time a task is woken from **outside** of the + /// runtime. This usually means that a task is spawned or notified from a + /// non-runtime thread and must be queued using the Runtime's injection + /// queue, which tends to be slower. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.remote_schedule_count(); + /// println!("{} tasks were scheduled from outside the runtime", n); + /// } + /// ``` + pub fn remote_schedule_count(&self) -> u64 { + self.handle + .inner + .scheduler_metrics() + .remote_schedule_count + .load(Relaxed) + } - /// Returns the total number of times the given worker thread has parked. - /// - /// The worker park count starts at zero when the runtime is created and - /// increases by one each time the worker parks the thread waiting for new - /// inbound events to process. This usually means the worker has processed - /// all pending work and is currently idle. - /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.worker_park_count(0); - /// println!("worker 0 parked {} times", n); - /// } - /// ``` - pub fn worker_park_count(&self, worker: usize) -> u64 { - self.handle - .inner - .worker_metrics(worker) - .park_count - .load(Relaxed) - } + /// Returns the number of times that tasks have been forced to yield back to the scheduler + /// after exhausting their task budgets. + /// + /// This count starts at zero when the runtime is created and increases by one each time a task yields due to exhausting its budget. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + pub fn budget_forced_yield_count(&self) -> u64 { + self.handle + .inner + .scheduler_metrics() + .budget_forced_yield_count + .load(Relaxed) + } - /// Returns the number of times the given worker thread unparked but - /// performed no work before parking again. - /// - /// The worker no-op count starts at zero when the runtime is created and - /// increases by one each time the worker unparks the thread but finds no - /// new work and goes back to sleep. This indicates a false-positive wake up. - /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.worker_noop_count(0); - /// println!("worker 0 had {} no-op unparks", n); - /// } - /// ``` - pub fn worker_noop_count(&self, worker: usize) -> u64 { - self.handle - .inner - .worker_metrics(worker) - .noop_count - .load(Relaxed) - } + /// Returns the total number of times the given worker thread has parked. + /// + /// The worker park count starts at zero when the runtime is created and + /// increases by one each time the worker parks the thread waiting for new + /// inbound events to process. This usually means the worker has processed + /// all pending work and is currently idle. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_park_count(0); + /// println!("worker 0 parked {} times", n); + /// } + /// ``` + pub fn worker_park_count(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .park_count + .load(Relaxed) + } - /// Returns the number of tasks the given worker thread stole from - /// another worker thread. - /// - /// This metric only applies to the **multi-threaded** runtime and will - /// always return `0` when using the current thread runtime. + /// Returns the number of times the given worker thread unparked but + /// performed no work before parking again. + /// + /// The worker no-op count starts at zero when the runtime is created and + /// increases by one each time the worker unparks the thread but finds no + /// new work and goes back to sleep. This indicates a false-positive wake up. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_noop_count(0); + /// println!("worker 0 had {} no-op unparks", n); + /// } + /// ``` + pub fn worker_noop_count(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .noop_count + .load(Relaxed) + } + + /// Returns the number of tasks the given worker thread stole from + /// another worker thread. + /// + /// This metric only applies to the **multi-threaded** runtime and will + /// always return `0` when using the current thread runtime. + /// + /// The worker steal count starts at zero when the runtime is created and + /// increases by `N` each time the worker has processed its scheduled queue + /// and successfully steals `N` more pending tasks from another worker. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_steal_count(0); + /// println!("worker 0 has stolen {} tasks", n); + /// } + /// ``` + pub fn worker_steal_count(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .steal_count + .load(Relaxed) + } + + /// Returns the number of times the given worker thread stole tasks from + /// another worker thread. + /// + /// This metric only applies to the **multi-threaded** runtime and will + /// always return `0` when using the current thread runtime. + /// + /// The worker steal count starts at zero when the runtime is created and + /// increases by one each time the worker has processed its scheduled queue + /// and successfully steals more pending tasks from another worker. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_steal_operations(0); + /// println!("worker 0 has stolen tasks {} times", n); + /// } + /// ``` + pub fn worker_steal_operations(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .steal_operations + .load(Relaxed) + } + + /// Returns the number of tasks the given worker thread has polled. + /// + /// The worker poll count starts at zero when the runtime is created and + /// increases by one each time the worker polls a scheduled task. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_poll_count(0); + /// println!("worker 0 has polled {} tasks", n); + /// } + /// ``` + pub fn worker_poll_count(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .poll_count + .load(Relaxed) + } + + /// Returns the amount of time the given worker thread has been busy. /// - /// The worker steal count starts at zero when the runtime is created and - /// increases by `N` each time the worker has processed its scheduled queue - /// and successfully steals `N` more pending tasks from another worker. + /// The worker busy duration starts at zero when the runtime is created and + /// increases whenever the worker is spending time processing work. Using + /// this value can indicate the load of the given worker. If a lot of time + /// is spent busy, then the worker is under load and will check for inbound + /// events less often. /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. + /// The timer is monotonically increasing. It is never decremented or reset + /// to zero. /// /// # Arguments /// @@ -282,27 +417,28 @@ impl RuntimeMetrics { /// async fn main() { /// let metrics = Handle::current().metrics(); /// - /// let n = metrics.worker_steal_count(0); - /// println!("worker 0 has stolen {} tasks", n); + /// let n = metrics.worker_total_busy_duration(0); + /// println!("worker 0 was busy for a total of {:?}", n); /// } /// ``` - pub fn worker_steal_count(&self, worker: usize) -> u64 { - self.handle + pub fn worker_total_busy_duration(&self, worker: usize) -> Duration { + let nanos = self + .handle .inner .worker_metrics(worker) - .steal_count - .load(Relaxed) + .busy_duration_total + .load(Relaxed); + Duration::from_nanos(nanos) } - /// Returns the number of times the given worker thread stole tasks from - /// another worker thread. + /// Returns the number of tasks scheduled from **within** the runtime on the + /// given worker's local queue. /// - /// This metric only applies to the **multi-threaded** runtime and will - /// always return `0` when using the current thread runtime. - /// - /// The worker steal count starts at zero when the runtime is created and - /// increases by one each time the worker has processed its scheduled queue - /// and successfully steals more pending tasks from another worker. + /// The local schedule count starts at zero when the runtime is created and + /// increases by one each time a task is woken from **inside** of the + /// runtime on the given worker. This usually means that a task is spawned + /// or notified from within a runtime thread and will be queued on the + /// worker-local queue. /// /// The counter is monotonically increasing. It is never decremented or /// reset to zero. @@ -328,22 +464,27 @@ impl RuntimeMetrics { /// async fn main() { /// let metrics = Handle::current().metrics(); /// - /// let n = metrics.worker_steal_operations(0); - /// println!("worker 0 has stolen tasks {} times", n); + /// let n = metrics.worker_local_schedule_count(0); + /// println!("{} tasks were scheduled on the worker's local queue", n); /// } /// ``` - pub fn worker_steal_operations(&self, worker: usize) -> u64 { + pub fn worker_local_schedule_count(&self, worker: usize) -> u64 { self.handle .inner .worker_metrics(worker) - .steal_operations + .local_schedule_count .load(Relaxed) } - /// Returns the number of tasks the given worker thread has polled. + /// Returns the number of times the given worker thread saturated its local + /// queue. + /// + /// This metric only applies to the **multi-threaded** scheduler. /// - /// The worker poll count starts at zero when the runtime is created and - /// increases by one each time the worker polls a scheduled task. + /// The worker overflow count starts at zero when the runtime is created and + /// increases by one each time the worker attempts to schedule a task + /// locally, but its local queue is full. When this happens, half of the + /// local queue is moved to the injection queue. /// /// The counter is monotonically increasing. It is never decremented or /// reset to zero. @@ -369,40 +510,27 @@ impl RuntimeMetrics { /// async fn main() { /// let metrics = Handle::current().metrics(); /// - /// let n = metrics.worker_poll_count(0); - /// println!("worker 0 has polled {} tasks", n); + /// let n = metrics.worker_overflow_count(0); + /// println!("worker 0 has overflowed its queue {} times", n); /// } /// ``` - pub fn worker_poll_count(&self, worker: usize) -> u64 { + pub fn worker_overflow_count(&self, worker: usize) -> u64 { self.handle .inner .worker_metrics(worker) - .poll_count + .overflow_count .load(Relaxed) } + } - /// Returns the amount of time the given worker thread has been busy. - /// - /// The worker busy duration starts at zero when the runtime is created and - /// increases whenever the worker is spending time processing work. Using - /// this value can indicate the load of the given worker. If a lot of time - /// is spent busy, then the worker is under load and will check for inbound - /// events less often. - /// - /// The timer is monotonically increasing. It is never decremented or reset - /// to zero. + /// Returns the number of tasks currently scheduled in the runtime's + /// injection queue. /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. + /// Tasks that are spawned or notified from a non-runtime thread are + /// scheduled using the runtime's injection queue. This metric returns the + /// **current** number of tasks pending in the injection queue. As such, the + /// returned value may increase or decrease as new tasks are scheduled and + /// processed. /// /// # Examples /// @@ -413,31 +541,22 @@ impl RuntimeMetrics { /// async fn main() { /// let metrics = Handle::current().metrics(); /// - /// let n = metrics.worker_total_busy_duration(0); - /// println!("worker 0 was busy for a total of {:?}", n); + /// let n = metrics.injection_queue_depth(); + /// println!("{} tasks currently pending in the runtime's injection queue", n); /// } /// ``` - pub fn worker_total_busy_duration(&self, worker: usize) -> Duration { - let nanos = self - .handle - .inner - .worker_metrics(worker) - .busy_duration_total - .load(Relaxed); - Duration::from_nanos(nanos) + pub fn injection_queue_depth(&self) -> usize { + self.handle.inner.injection_queue_depth() } - /// Returns the number of tasks scheduled from **within** the runtime on the - /// given worker's local queue. - /// - /// The local schedule count starts at zero when the runtime is created and - /// increases by one each time a task is woken from **inside** of the - /// runtime on the given worker. This usually means that a task is spawned - /// or notified from within a runtime thread and will be queued on the - /// worker-local queue. + /// Returns the number of tasks currently scheduled in the given worker's + /// local queue. /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. + /// Tasks that are spawned or notified from within a runtime thread are + /// scheduled using that worker's local queue. This metric returns the + /// **current** number of tasks pending in the worker's local queue. As + /// such, the returned value may increase or decrease as new tasks are + /// scheduled and processed. /// /// # Arguments /// @@ -460,283 +579,56 @@ impl RuntimeMetrics { /// async fn main() { /// let metrics = Handle::current().metrics(); /// - /// let n = metrics.worker_local_schedule_count(0); - /// println!("{} tasks were scheduled on the worker's local queue", n); + /// let n = metrics.worker_local_queue_depth(0); + /// println!("{} tasks currently pending in worker 0's local queue", n); /// } /// ``` - pub fn worker_local_schedule_count(&self, worker: usize) -> u64 { - self.handle - .inner - .worker_metrics(worker) - .local_schedule_count - .load(Relaxed) + pub fn worker_local_queue_depth(&self, worker: usize) -> usize { + self.handle.inner.worker_local_queue_depth(worker) } - /// Returns the number of times the given worker thread saturated its local - /// queue. - /// - /// This metric only applies to the **multi-threaded** scheduler. - /// - /// The worker overflow count starts at zero when the runtime is created and - /// increases by one each time the worker attempts to schedule a task - /// locally, but its local queue is full. When this happens, half of the - /// local queue is moved to the injection queue. - /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. + /// Returns `true` if the runtime is tracking the distribution of task poll + /// times. /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. + /// Task poll times are not instrumented by default as doing so requires + /// calling [`Instant::now()`] twice per task poll. The feature is enabled + /// by calling [`enable_metrics_poll_count_histogram()`] when building the + /// runtime. /// /// # Examples /// /// ``` - /// use tokio::runtime::Handle; + /// use tokio::runtime::{self, Handle}; /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_poll_count_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let enabled = metrics.poll_count_histogram_enabled(); /// - /// let n = metrics.worker_overflow_count(0); - /// println!("worker 0 has overflowed its queue {} times", n); + /// println!("Tracking task poll time distribution: {:?}", enabled); + /// }); /// } /// ``` - pub fn worker_overflow_count(&self, worker: usize) -> u64 { + /// + /// [`enable_metrics_poll_count_histogram()`]: crate::runtime::Builder::enable_metrics_poll_count_histogram + /// [`Instant::now()`]: std::time::Instant::now + pub fn poll_count_histogram_enabled(&self) -> bool { self.handle .inner - .worker_metrics(worker) - .overflow_count - .load(Relaxed) + .worker_metrics(0) + .poll_count_histogram + .is_some() } - } - /// Returns the number of tasks currently scheduled in the runtime's - /// injection queue. - /// - /// Tasks that are spawned or notified from a non-runtime thread are - /// scheduled using the runtime's injection queue. This metric returns the - /// **current** number of tasks pending in the injection queue. As such, the - /// returned value may increase or decrease as new tasks are scheduled and - /// processed. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.injection_queue_depth(); - /// println!("{} tasks currently pending in the runtime's injection queue", n); - /// } - /// ``` - pub fn injection_queue_depth(&self) -> usize { - self.handle.inner.injection_queue_depth() - } - - /// Returns the number of tasks currently scheduled in the given worker's - /// local queue. - /// - /// Tasks that are spawned or notified from within a runtime thread are - /// scheduled using that worker's local queue. This metric returns the - /// **current** number of tasks pending in the worker's local queue. As - /// such, the returned value may increase or decrease as new tasks are - /// scheduled and processed. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.worker_local_queue_depth(0); - /// println!("{} tasks currently pending in worker 0's local queue", n); - /// } - /// ``` - pub fn worker_local_queue_depth(&self, worker: usize) -> usize { - self.handle.inner.worker_local_queue_depth(worker) - } - - /// Returns `true` if the runtime is tracking the distribution of task poll - /// times. - /// - /// Task poll times are not instrumented by default as doing so requires - /// calling [`Instant::now()`] twice per task poll. The feature is enabled - /// by calling [`enable_metrics_poll_count_histogram()`] when building the - /// runtime. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::{self, Handle}; - /// - /// fn main() { - /// runtime::Builder::new_current_thread() - /// .enable_metrics_poll_count_histogram() - /// .build() - /// .unwrap() - /// .block_on(async { - /// let metrics = Handle::current().metrics(); - /// let enabled = metrics.poll_count_histogram_enabled(); - /// - /// println!("Tracking task poll time distribution: {:?}", enabled); - /// }); - /// } - /// ``` - /// - /// [`enable_metrics_poll_count_histogram()`]: crate::runtime::Builder::enable_metrics_poll_count_histogram - /// [`Instant::now()`]: std::time::Instant::now - pub fn poll_count_histogram_enabled(&self) -> bool { - self.handle - .inner - .worker_metrics(0) - .poll_count_histogram - .is_some() - } - - /// Returns the number of histogram buckets tracking the distribution of - /// task poll times. - /// - /// This value is configured by calling - /// [`metrics_poll_count_histogram_buckets()`] when building the runtime. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::{self, Handle}; - /// - /// fn main() { - /// runtime::Builder::new_current_thread() - /// .enable_metrics_poll_count_histogram() - /// .build() - /// .unwrap() - /// .block_on(async { - /// let metrics = Handle::current().metrics(); - /// let buckets = metrics.poll_count_histogram_num_buckets(); - /// - /// println!("Histogram buckets: {:?}", buckets); - /// }); - /// } - /// ``` - /// - /// [`metrics_poll_count_histogram_buckets()`]: - /// crate::runtime::Builder::metrics_poll_count_histogram_buckets - pub fn poll_count_histogram_num_buckets(&self) -> usize { - self.handle - .inner - .worker_metrics(0) - .poll_count_histogram - .as_ref() - .map(|histogram| histogram.num_buckets()) - .unwrap_or_default() - } - - /// Returns the range of task poll times tracked by the given bucket. - /// - /// This value is configured by calling - /// [`metrics_poll_count_histogram_resolution()`] when building the runtime. - /// - /// # Panics - /// - /// The method panics if `bucket` represents an invalid bucket index, i.e. - /// is greater than or equal to `poll_count_histogram_num_buckets()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::{self, Handle}; - /// - /// fn main() { - /// runtime::Builder::new_current_thread() - /// .enable_metrics_poll_count_histogram() - /// .build() - /// .unwrap() - /// .block_on(async { - /// let metrics = Handle::current().metrics(); - /// let buckets = metrics.poll_count_histogram_num_buckets(); - /// - /// for i in 0..buckets { - /// let range = metrics.poll_count_histogram_bucket_range(i); - /// println!("Histogram bucket {} range: {:?}", i, range); - /// } - /// }); - /// } - /// ``` - /// - /// [`metrics_poll_count_histogram_resolution()`]: - /// crate::runtime::Builder::metrics_poll_count_histogram_resolution - #[track_caller] - pub fn poll_count_histogram_bucket_range(&self, bucket: usize) -> Range { - self.handle - .inner - .worker_metrics(0) - .poll_count_histogram - .as_ref() - .map(|histogram| { - let range = histogram.bucket_range(bucket); - std::ops::Range { - start: Duration::from_nanos(range.start), - end: Duration::from_nanos(range.end), - } - }) - .unwrap_or_default() - } - - cfg_64bit_metrics! { - /// Returns the number of times the given worker polled tasks with a poll - /// duration within the given bucket's range. - /// - /// Each worker maintains its own histogram and the counts for each bucket - /// starts at zero when the runtime is created. Each time the worker polls a - /// task, it tracks the duration the task poll time took and increments the - /// associated bucket by 1. - /// - /// Each bucket is a monotonically increasing counter. It is never - /// decremented or reset to zero. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// `bucket` is the index of the bucket being queried. The bucket is scoped - /// to the worker. The range represented by the bucket can be queried by - /// calling [`poll_count_histogram_bucket_range()`]. Each worker maintains - /// identical bucket ranges. + /// Returns the number of histogram buckets tracking the distribution of + /// task poll times. /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()` or if `bucket` represents an - /// invalid bucket. + /// This value is configured by calling + /// [`metrics_poll_count_histogram_buckets()`] when building the runtime. /// /// # Examples /// @@ -752,149 +644,156 @@ impl RuntimeMetrics { /// let metrics = Handle::current().metrics(); /// let buckets = metrics.poll_count_histogram_num_buckets(); /// - /// for worker in 0..metrics.num_workers() { - /// for i in 0..buckets { - /// let count = metrics.poll_count_histogram_bucket_count(worker, i); - /// println!("Poll count {}", count); - /// } - /// } + /// println!("Histogram buckets: {:?}", buckets); /// }); /// } /// ``` /// - /// [`poll_count_histogram_bucket_range()`]: crate::runtime::RuntimeMetrics::poll_count_histogram_bucket_range - #[track_caller] - pub fn poll_count_histogram_bucket_count(&self, worker: usize, bucket: usize) -> u64 { + /// [`metrics_poll_count_histogram_buckets()`]: + /// crate::runtime::Builder::metrics_poll_count_histogram_buckets + pub fn poll_count_histogram_num_buckets(&self) -> usize { self.handle .inner - .worker_metrics(worker) + .worker_metrics(0) .poll_count_histogram .as_ref() - .map(|histogram| histogram.get(bucket)) + .map(|histogram| histogram.num_buckets()) .unwrap_or_default() } - /// Returns the mean duration of task polls, in nanoseconds. + /// Returns the range of task poll times tracked by the given bucket. /// - /// This is an exponentially weighted moving average. Currently, this metric - /// is only provided by the multi-threaded runtime. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. + /// This value is configured by calling + /// [`metrics_poll_count_histogram_resolution()`] when building the runtime. /// /// # Panics /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. + /// The method panics if `bucket` represents an invalid bucket index, i.e. + /// is greater than or equal to `poll_count_histogram_num_buckets()`. /// /// # Examples /// /// ``` - /// use tokio::runtime::Handle; + /// use tokio::runtime::{self, Handle}; /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_poll_count_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let buckets = metrics.poll_count_histogram_num_buckets(); /// - /// let n = metrics.worker_mean_poll_time(0); - /// println!("worker 0 has a mean poll time of {:?}", n); + /// for i in 0..buckets { + /// let range = metrics.poll_count_histogram_bucket_range(i); + /// println!("Histogram bucket {} range: {:?}", i, range); + /// } + /// }); /// } /// ``` + /// + /// [`metrics_poll_count_histogram_resolution()`]: + /// crate::runtime::Builder::metrics_poll_count_histogram_resolution #[track_caller] - pub fn worker_mean_poll_time(&self, worker: usize) -> Duration { - let nanos = self - .handle + pub fn poll_count_histogram_bucket_range(&self, bucket: usize) -> Range { + self.handle .inner - .worker_metrics(worker) - .mean_poll_time - .load(Relaxed); - Duration::from_nanos(nanos) + .worker_metrics(0) + .poll_count_histogram + .as_ref() + .map(|histogram| { + let range = histogram.bucket_range(bucket); + std::ops::Range { + start: Duration::from_nanos(range.start), + end: Duration::from_nanos(range.end), + } + }) + .unwrap_or_default() } - } - - /// Returns the number of tasks currently scheduled in the blocking - /// thread pool, spawned using `spawn_blocking`. - /// - /// This metric returns the **current** number of tasks pending in - /// blocking thread pool. As such, the returned value may increase - /// or decrease as new tasks are scheduled and processed. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.blocking_queue_depth(); - /// println!("{} tasks currently pending in the blocking thread pool", n); - /// } - /// ``` - pub fn blocking_queue_depth(&self) -> usize { - self.handle.inner.blocking_queue_depth() - } -} -cfg_net! { - impl RuntimeMetrics { cfg_64bit_metrics! { - /// Returns the number of file descriptors that have been registered with the - /// runtime's I/O driver. + /// Returns the number of times the given worker polled tasks with a poll + /// duration within the given bucket's range. /// - /// # Examples + /// Each worker maintains its own histogram and the counts for each bucket + /// starts at zero when the runtime is created. Each time the worker polls a + /// task, it tracks the duration the task poll time took and increments the + /// associated bucket by 1. /// - /// ``` - /// use tokio::runtime::Handle; + /// Each bucket is a monotonically increasing counter. It is never + /// decremented or reset to zero. /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); + /// # Arguments /// - /// let registered_fds = metrics.io_driver_fd_registered_count(); - /// println!("{} fds have been registered with the runtime's I/O driver.", registered_fds); + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. /// - /// let deregistered_fds = metrics.io_driver_fd_deregistered_count(); + /// `bucket` is the index of the bucket being queried. The bucket is scoped + /// to the worker. The range represented by the bucket can be queried by + /// calling [`poll_count_histogram_bucket_range()`]. Each worker maintains + /// identical bucket ranges. /// - /// let current_fd_count = registered_fds - deregistered_fds; - /// println!("{} fds are currently registered by the runtime's I/O driver.", current_fd_count); - /// } - /// ``` - pub fn io_driver_fd_registered_count(&self) -> u64 { - self.with_io_driver_metrics(|m| { - m.fd_registered_count.load(Relaxed) - }) - } - - /// Returns the number of file descriptors that have been deregistered by the - /// runtime's I/O driver. + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()` or if `bucket` represents an + /// invalid bucket. /// /// # Examples /// /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.io_driver_fd_deregistered_count(); - /// println!("{} fds have been deregistered by the runtime's I/O driver.", n); + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_poll_count_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let buckets = metrics.poll_count_histogram_num_buckets(); + /// + /// for worker in 0..metrics.num_workers() { + /// for i in 0..buckets { + /// let count = metrics.poll_count_histogram_bucket_count(worker, i); + /// println!("Poll count {}", count); + /// } + /// } + /// }); /// } /// ``` - pub fn io_driver_fd_deregistered_count(&self) -> u64 { - self.with_io_driver_metrics(|m| { - m.fd_deregistered_count.load(Relaxed) - }) + /// + /// [`poll_count_histogram_bucket_range()`]: crate::runtime::RuntimeMetrics::poll_count_histogram_bucket_range + #[track_caller] + pub fn poll_count_histogram_bucket_count(&self, worker: usize, bucket: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .poll_count_histogram + .as_ref() + .map(|histogram| histogram.get(bucket)) + .unwrap_or_default() } - /// Returns the number of ready events processed by the runtime's - /// I/O driver. + /// Returns the mean duration of task polls, in nanoseconds. + /// + /// This is an exponentially weighted moving average. Currently, this metric + /// is only provided by the multi-threaded runtime. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. /// /// # Examples /// @@ -905,27 +804,131 @@ cfg_net! { /// async fn main() { /// let metrics = Handle::current().metrics(); /// - /// let n = metrics.io_driver_ready_count(); - /// println!("{} ready events processed by the runtime's I/O driver.", n); + /// let n = metrics.worker_mean_poll_time(0); + /// println!("worker 0 has a mean poll time of {:?}", n); /// } /// ``` - pub fn io_driver_ready_count(&self) -> u64 { - self.with_io_driver_metrics(|m| m.ready_count.load(Relaxed)) + #[track_caller] + pub fn worker_mean_poll_time(&self, worker: usize) -> Duration { + let nanos = self + .handle + .inner + .worker_metrics(worker) + .mean_poll_time + .load(Relaxed); + Duration::from_nanos(nanos) } + } - fn with_io_driver_metrics(&self, f: F) -> u64 - where - F: Fn(&super::IoDriverMetrics) -> u64, - { - // TODO: Investigate if this should return 0, most of our metrics always increase - // thus this breaks that guarantee. - self.handle - .inner - .driver() - .io - .as_ref() - .map(|h| f(&h.metrics)) - .unwrap_or(0) + /// Returns the number of tasks currently scheduled in the blocking + /// thread pool, spawned using `spawn_blocking`. + /// + /// This metric returns the **current** number of tasks pending in + /// blocking thread pool. As such, the returned value may increase + /// or decrease as new tasks are scheduled and processed. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.blocking_queue_depth(); + /// println!("{} tasks currently pending in the blocking thread pool", n); + /// } + /// ``` + pub fn blocking_queue_depth(&self) -> usize { + self.handle.inner.blocking_queue_depth() + } + + cfg_net! { + cfg_64bit_metrics! { + /// Returns the number of file descriptors that have been registered with the + /// runtime's I/O driver. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let registered_fds = metrics.io_driver_fd_registered_count(); + /// println!("{} fds have been registered with the runtime's I/O driver.", registered_fds); + /// + /// let deregistered_fds = metrics.io_driver_fd_deregistered_count(); + /// + /// let current_fd_count = registered_fds - deregistered_fds; + /// println!("{} fds are currently registered by the runtime's I/O driver.", current_fd_count); + /// } + /// ``` + pub fn io_driver_fd_registered_count(&self) -> u64 { + self.with_io_driver_metrics(|m| { + m.fd_registered_count.load(Relaxed) + }) + } + + /// Returns the number of file descriptors that have been deregistered by the + /// runtime's I/O driver. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.io_driver_fd_deregistered_count(); + /// println!("{} fds have been deregistered by the runtime's I/O driver.", n); + /// } + /// ``` + pub fn io_driver_fd_deregistered_count(&self) -> u64 { + self.with_io_driver_metrics(|m| { + m.fd_deregistered_count.load(Relaxed) + }) + } + + /// Returns the number of ready events processed by the runtime's + /// I/O driver. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.io_driver_ready_count(); + /// println!("{} ready events processed by the runtime's I/O driver.", n); + /// } + /// ``` + pub fn io_driver_ready_count(&self) -> u64 { + self.with_io_driver_metrics(|m| m.ready_count.load(Relaxed)) + } + + fn with_io_driver_metrics(&self, f: F) -> u64 + where + F: Fn(&super::IoDriverMetrics) -> u64, + { + // TODO: Investigate if this should return 0, most of our metrics always increase + // thus this breaks that guarantee. + self.handle + .inner + .driver() + .io + .as_ref() + .map(|h| f(&h.metrics)) + .unwrap_or(0) + } } } } diff --git a/tokio/src/runtime/metrics/worker.rs b/tokio/src/runtime/metrics/worker.rs index fc7c4e6dfe4..a396bf5a391 100644 --- a/tokio/src/runtime/metrics/worker.rs +++ b/tokio/src/runtime/metrics/worker.rs @@ -1,8 +1,10 @@ -use crate::loom::sync::atomic::AtomicUsize; -use crate::loom::sync::atomic::Ordering::Relaxed; use crate::runtime::metrics::Histogram; use crate::runtime::Config; use crate::util::metric_atomics::MetricAtomicU64; +// This is NOT the Loom atomic. To avoid an unnecessary state explosion in loom, +// all metrics use regular atomics. +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::Relaxed; /// Retrieve runtime worker metrics. /// diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 3d333960f3d..3fcde75b54e 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -388,21 +388,18 @@ cfg_rt! { mod thread_id; pub(crate) use thread_id::ThreadId; - cfg_metrics! { - mod metrics; - pub use metrics::{RuntimeMetrics, HistogramScale}; + pub(crate) mod metrics; + pub use metrics::RuntimeMetrics; - pub(crate) use metrics::{MetricsBatch, SchedulerMetrics, WorkerMetrics, HistogramBuilder}; + cfg_unstable_metrics! { + pub use metrics::HistogramScale; cfg_net! { - pub(crate) use metrics::IoDriverMetrics; + pub(crate) use metrics::IoDriverMetrics; } } - cfg_not_metrics! { - pub(crate) mod metrics; - pub(crate) use metrics::{SchedulerMetrics, WorkerMetrics, MetricsBatch, HistogramBuilder}; - } + pub(crate) use metrics::{MetricsBatch, SchedulerMetrics, WorkerMetrics, HistogramBuilder}; /// After thread starts / before thread stops type Callback = std::sync::Arc; diff --git a/tokio/src/runtime/runtime.rs b/tokio/src/runtime/runtime.rs index 7cf2cebeffc..d904af50458 100644 --- a/tokio/src/runtime/runtime.rs +++ b/tokio/src/runtime/runtime.rs @@ -455,6 +455,12 @@ impl Runtime { pub fn shutdown_background(self) { self.shutdown_timeout(Duration::from_nanos(0)); } + + /// Returns a view that lets you get information about how the runtime + /// is performing. + pub fn metrics(&self) -> crate::runtime::RuntimeMetrics { + self.handle.metrics() + } } #[allow(clippy::single_match)] // there are comments in the error branch, so we don't want if-let @@ -486,13 +492,3 @@ impl Drop for Runtime { impl std::panic::UnwindSafe for Runtime {} impl std::panic::RefUnwindSafe for Runtime {} - -cfg_metrics! { - impl Runtime { - /// Returns a view that lets you get information about how the runtime - /// is performing. - pub fn metrics(&self) -> crate::runtime::RuntimeMetrics { - self.handle.metrics() - } - } -} diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index 36bcefc4406..b9c23837a58 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -502,7 +502,7 @@ impl Handle { } } -cfg_metrics! { +cfg_unstable_metrics! { impl Handle { pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { &self.shared.scheduler_metrics diff --git a/tokio/src/runtime/scheduler/inject.rs b/tokio/src/runtime/scheduler/inject.rs index 39976fcd7a2..811b02c136c 100644 --- a/tokio/src/runtime/scheduler/inject.rs +++ b/tokio/src/runtime/scheduler/inject.rs @@ -16,7 +16,7 @@ cfg_rt_multi_thread! { mod rt_multi_thread; } -cfg_metrics! { +cfg_unstable_metrics! { mod metrics; } diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index 04fbff39e47..3cbba11b752 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -163,20 +163,22 @@ cfg_rt! { } } - cfg_metrics! { + impl Handle { + pub(crate) fn num_workers(&self) -> usize { + match self { + Handle::CurrentThread(_) => 1, + #[cfg(feature = "rt-multi-thread")] + Handle::MultiThread(handle) => handle.num_workers(), + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] + Handle::MultiThreadAlt(handle) => handle.num_workers(), + } + } + } + + cfg_unstable_metrics! { use crate::runtime::{SchedulerMetrics, WorkerMetrics}; impl Handle { - pub(crate) fn num_workers(&self) -> usize { - match self { - Handle::CurrentThread(_) => 1, - #[cfg(feature = "rt-multi-thread")] - Handle::MultiThread(handle) => handle.num_workers(), - #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] - Handle::MultiThreadAlt(handle) => handle.num_workers(), - } - } - pub(crate) fn num_blocking_threads(&self) -> usize { match_flavor!(self, Handle(handle) => handle.num_blocking_threads()) } diff --git a/tokio/src/runtime/scheduler/multi_thread/handle.rs b/tokio/src/runtime/scheduler/multi_thread/handle.rs index 568eb80af8b..72f776e47fa 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle.rs @@ -9,9 +9,7 @@ use crate::util::RngSeedGenerator; use std::fmt; -cfg_metrics! { - mod metrics; -} +mod metrics; cfg_taskdump! { mod taskdump; diff --git a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs index 3d614b478c5..6ced245ee5b 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs @@ -1,44 +1,48 @@ use super::Handle; -use crate::runtime::{SchedulerMetrics, WorkerMetrics}; +cfg_unstable_metrics! { + use crate::runtime::{SchedulerMetrics, WorkerMetrics}; +} impl Handle { pub(crate) fn num_workers(&self) -> usize { self.shared.worker_metrics.len() } - pub(crate) fn num_blocking_threads(&self) -> usize { - // workers are currently spawned using spawn_blocking - self.blocking_spawner - .num_threads() - .saturating_sub(self.num_workers()) - } + cfg_unstable_metrics! { + pub(crate) fn num_blocking_threads(&self) -> usize { + // workers are currently spawned using spawn_blocking + self.blocking_spawner + .num_threads() + .saturating_sub(self.num_workers()) + } - pub(crate) fn num_idle_blocking_threads(&self) -> usize { - self.blocking_spawner.num_idle_threads() - } + pub(crate) fn num_idle_blocking_threads(&self) -> usize { + self.blocking_spawner.num_idle_threads() + } - pub(crate) fn active_tasks_count(&self) -> usize { - self.shared.owned.active_tasks_count() - } + pub(crate) fn active_tasks_count(&self) -> usize { + self.shared.owned.active_tasks_count() + } - pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { - &self.shared.scheduler_metrics - } + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { + &self.shared.scheduler_metrics + } - pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { - &self.shared.worker_metrics[worker] - } + pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { + &self.shared.worker_metrics[worker] + } - pub(crate) fn injection_queue_depth(&self) -> usize { - self.shared.injection_queue_depth() - } + pub(crate) fn injection_queue_depth(&self) -> usize { + self.shared.injection_queue_depth() + } - pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { - self.shared.worker_local_queue_depth(worker) - } + pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { + self.shared.worker_local_queue_depth(worker) + } - pub(crate) fn blocking_queue_depth(&self) -> usize { - self.blocking_spawner.queue_depth() + pub(crate) fn blocking_queue_depth(&self) -> usize { + self.blocking_spawner.queue_depth() + } } } diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index 35223289870..99ee31ba15b 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -546,7 +546,7 @@ impl Steal { } } -cfg_metrics! { +cfg_unstable_metrics! { impl Steal { pub(crate) fn len(&self) -> usize { self.0.len() as _ diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 9f0dd98dfdc..65851b21e82 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -74,7 +74,7 @@ use std::cell::RefCell; use std::task::Waker; use std::time::Duration; -cfg_metrics! { +cfg_unstable_metrics! { mod metrics; } diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/handle.rs b/tokio/src/runtime/scheduler/multi_thread_alt/handle.rs index d746bca1a18..1f5b7818521 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/handle.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/handle.rs @@ -9,7 +9,7 @@ use crate::util::RngSeedGenerator; use std::fmt; -cfg_metrics! { +cfg_unstable_metrics! { mod metrics; } diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/queue.rs b/tokio/src/runtime/scheduler/multi_thread_alt/queue.rs index 2694d27cbdf..c8293fdc845 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/queue.rs @@ -538,7 +538,7 @@ impl Steal { } } -cfg_metrics! { +cfg_unstable_metrics! { impl Steal { pub(crate) fn len(&self) -> usize { self.0.len() as _ diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs index 63ae0a49743..9ceb7815a53 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs @@ -74,7 +74,7 @@ use std::cmp; use std::task::Waker; use std::time::Duration; -cfg_metrics! { +cfg_unstable_metrics! { mod metrics; } diff --git a/tokio/src/runtime/tests/queue.rs b/tokio/src/runtime/tests/queue.rs index 55429b1b11b..f463355f0d3 100644 --- a/tokio/src/runtime/tests/queue.rs +++ b/tokio/src/runtime/tests/queue.rs @@ -40,7 +40,7 @@ fn fits_256_one_at_a_time() { local.push_back_or_overflow(task, &inject, &mut stats); } - cfg_metrics! { + cfg_unstable_metrics! { assert_metrics!(stats, overflow_count == 0); } @@ -98,7 +98,7 @@ fn overflow() { local.push_back_or_overflow(task, &inject, &mut stats); } - cfg_metrics! { + cfg_unstable_metrics! { assert_metrics!(stats, overflow_count == 1); } @@ -128,7 +128,7 @@ fn steal_batch() { assert!(steal1.steal_into(&mut local2, &mut stats).is_some()); - cfg_metrics! { + cfg_unstable_metrics! { assert_metrics!(stats, steal_count == 2); } @@ -184,7 +184,7 @@ fn stress1() { thread::yield_now(); } - cfg_metrics! { + cfg_unstable_metrics! { assert_metrics!(stats, steal_count == n as _); }