From bc5b151e059af4ea7df94535855e0dd650a5ee21 Mon Sep 17 00:00:00 2001 From: Finomnis Date: Sat, 2 Nov 2024 16:32:19 +0100 Subject: [PATCH] Add caller tracking --- Cargo.toml | 1 + src/runner.rs | 97 +++++++++++++++++-------------- src/subsystem/subsystem_handle.rs | 2 + src/tokio_task.rs | 2 + src/toplevel.rs | 2 + 5 files changed, 59 insertions(+), 45 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 24a0bac..01d8810 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ exclude = [ ] [features] +# Enable task naming and task caller location. tracing = ["tokio/tracing"] [[example]] diff --git a/src/runner.rs b/src/runner.rs index 4811a1a..c7f7dc8 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -21,6 +21,7 @@ pub(crate) struct SubsystemRunner { } impl SubsystemRunner { + #[track_caller] pub(crate) fn new( name: Arc, subsystem: Subsys, @@ -32,7 +33,7 @@ impl SubsystemRunner { Fut: 'static + Future> + Send, Err: Into, { - let future = async { run_subsystem(name, subsystem, subsystem_handle, guard).await }; + let future = run_subsystem(name, subsystem, subsystem_handle, guard); let aborthandle = crate::tokio_task::spawn(future, "subsystem_runner").abort_handle(); SubsystemRunner { aborthandle } } @@ -44,12 +45,14 @@ impl Drop for SubsystemRunner { } } -async fn run_subsystem( +#[track_caller] +fn run_subsystem( name: Arc, subsystem: Subsys, mut subsystem_handle: SubsystemHandle, guard: AliveGuard, -) where +) -> impl Future + 'static +where Subsys: 'static + FnOnce(SubsystemHandle) -> Fut + Send, Fut: 'static + Future> + Send, Err: Into, @@ -59,52 +62,56 @@ async fn run_subsystem( let future = async { subsystem(subsystem_handle).await.map_err(|e| e.into()) }; let join_handle = crate::tokio_task::spawn(future, &name); - // Abort on drop - guard.on_cancel({ - let abort_handle = join_handle.abort_handle(); - let name = Arc::clone(&name); - move || { - if !abort_handle.is_finished() { - tracing::warn!("Subsystem cancelled: '{}'", name); + async move { + // Abort on drop + guard.on_cancel({ + let abort_handle = join_handle.abort_handle(); + let name = Arc::clone(&name); + move || { + if !abort_handle.is_finished() { + tracing::warn!("Subsystem cancelled: '{}'", name); + } + abort_handle.abort(); } - abort_handle.abort(); - } - }); + }); - let failure = match join_handle.await { - Ok(Ok(())) => None, - Ok(Err(e)) => Some(SubsystemError::Failed(name, SubsystemFailure(e))), - Err(e) => { - // We can assume that this is a panic, because a cancellation - // can never happen as long as we still hold `guard`. - assert!(e.is_panic()); - Some(SubsystemError::Panicked(name)) - } - }; + let failure = match join_handle.await { + Ok(Ok(())) => None, + Ok(Err(e)) => Some(SubsystemError::Failed(name, SubsystemFailure(e))), + Err(e) => { + // We can assume that this is a panic, because a cancellation + // can never happen as long as we still hold `guard`. + assert!(e.is_panic()); + Some(SubsystemError::Panicked(name)) + } + }; - // Retrieve the handle that was passed into the subsystem. - // Originally it was intended to pass the handle as reference, but due - // to complications (https://stackoverflow.com/questions/77172947/async-lifetime-issues-of-pass-by-reference-parameters) - // it was decided to pass ownership instead. - // - // It is still important that the handle does not leak out of the subsystem. - let subsystem_handle = match redirected_subsystem_handle.try_recv() { - Ok(s) => s, - Err(_) => { - tracing::error!("The SubsystemHandle object must not be leaked out of the subsystem!"); - panic!("The SubsystemHandle object must not be leaked out of the subsystem!"); + // Retrieve the handle that was passed into the subsystem. + // Originally it was intended to pass the handle as reference, but due + // to complications (https://stackoverflow.com/questions/77172947/async-lifetime-issues-of-pass-by-reference-parameters) + // it was decided to pass ownership instead. + // + // It is still important that the handle does not leak out of the subsystem. + let subsystem_handle = match redirected_subsystem_handle.try_recv() { + Ok(s) => s, + Err(_) => { + tracing::error!( + "The SubsystemHandle object must not be leaked out of the subsystem!" + ); + panic!("The SubsystemHandle object must not be leaked out of the subsystem!"); + } + }; + + // Raise potential errors + let joiner_token = subsystem_handle.joiner_token; + if let Some(failure) = failure { + joiner_token.raise_failure(failure); } - }; - // Raise potential errors - let joiner_token = subsystem_handle.joiner_token; - if let Some(failure) = failure { - joiner_token.raise_failure(failure); + // Wait for children to finish before we destroy the `SubsystemHandle` object. + // Otherwise the children would be cancelled immediately. + // + // This is the main mechanism that forwards a cancellation to all the children. + joiner_token.downgrade().join().await; } - - // Wait for children to finish before we destroy the `SubsystemHandle` object. - // Otherwise the children would be cancelled immediately. - // - // This is the main mechanism that forwards a cancellation to all the children. - joiner_token.downgrade().join().await; } diff --git a/src/subsystem/subsystem_handle.rs b/src/subsystem/subsystem_handle.rs index c75901f..1547029 100644 --- a/src/subsystem/subsystem_handle.rs +++ b/src/subsystem/subsystem_handle.rs @@ -72,6 +72,7 @@ impl SubsystemHandle { /// Ok(()) /// } /// ``` + #[track_caller] pub fn start( &self, builder: SubsystemBuilder, @@ -96,6 +97,7 @@ impl SubsystemHandle { ) } + #[track_caller] pub(crate) fn start_with_abs_name( &self, name: Arc, diff --git a/src/tokio_task.rs b/src/tokio_task.rs index 80dd1b2..75ed1eb 100644 --- a/src/tokio_task.rs +++ b/src/tokio_task.rs @@ -2,6 +2,7 @@ use std::future::Future; use tokio::task::JoinHandle; #[cfg(not(all(tokio_unstable, feature = "tracing")))] +#[track_caller] pub(crate) fn spawn(f: F, _name: &str) -> JoinHandle where ::Output: Send + 'static, @@ -10,6 +11,7 @@ where } #[cfg(all(tokio_unstable, feature = "tracing"))] +#[track_caller] pub(crate) fn spawn(f: F, name: &str) -> JoinHandle where ::Output: Send + 'static, diff --git a/src/toplevel.rs b/src/toplevel.rs index c799c52..d252bc3 100644 --- a/src/toplevel.rs +++ b/src/toplevel.rs @@ -57,6 +57,7 @@ impl Toplevel { /// * `subsystem` - The subsystem that should be spawned as the root node. /// Usually the job of this subsystem is to spawn further subsystems. #[allow(clippy::new_without_default)] + #[track_caller] pub fn new(subsystem: Subsys) -> Self where Subsys: 'static + FnOnce(SubsystemHandle) -> Fut + Send, @@ -118,6 +119,7 @@ impl Toplevel { /// /// Especially the caveats from [tokio::signal::unix::Signal] are important for Unix targets. /// + #[track_caller] pub fn catch_signals(self) -> Self { let shutdown_token = self.root_handle.get_cancellation_token().clone();