Skip to content

Commit

Permalink
Add caller tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
Finomnis committed Nov 2, 2024
1 parent f25f8fe commit bc5b151
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 45 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ exclude = [
]

[features]
# Enable task naming and task caller location.
tracing = ["tokio/tracing"]

[[example]]
Expand Down
97 changes: 52 additions & 45 deletions src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub(crate) struct SubsystemRunner {
}

impl SubsystemRunner {
#[track_caller]
pub(crate) fn new<Fut, Subsys, ErrType: ErrTypeTraits, Err>(
name: Arc<str>,
subsystem: Subsys,
Expand All @@ -32,7 +33,7 @@ impl SubsystemRunner {
Fut: 'static + Future<Output = Result<(), Err>> + Send,
Err: Into<ErrType>,
{
let future = async { run_subsystem(name, subsystem, subsystem_handle, guard).await };
let future = run_subsystem(name, subsystem, subsystem_handle, guard);
let aborthandle = crate::tokio_task::spawn(future, "subsystem_runner").abort_handle();
SubsystemRunner { aborthandle }
}
Expand All @@ -44,12 +45,14 @@ impl Drop for SubsystemRunner {
}
}

async fn run_subsystem<Fut, Subsys, ErrType: ErrTypeTraits, Err>(
#[track_caller]
fn run_subsystem<Fut, Subsys, ErrType: ErrTypeTraits, Err>(
name: Arc<str>,
subsystem: Subsys,
mut subsystem_handle: SubsystemHandle<ErrType>,
guard: AliveGuard,
) where
) -> impl Future<Output = ()> + 'static
where
Subsys: 'static + FnOnce(SubsystemHandle<ErrType>) -> Fut + Send,
Fut: 'static + Future<Output = Result<(), Err>> + Send,
Err: Into<ErrType>,
Expand All @@ -59,52 +62,56 @@ async fn run_subsystem<Fut, Subsys, ErrType: ErrTypeTraits, Err>(
let future = async { subsystem(subsystem_handle).await.map_err(|e| e.into()) };
let join_handle = crate::tokio_task::spawn(future, &name);

// Abort on drop
guard.on_cancel({
let abort_handle = join_handle.abort_handle();
let name = Arc::clone(&name);
move || {
if !abort_handle.is_finished() {
tracing::warn!("Subsystem cancelled: '{}'", name);
async move {
// Abort on drop
guard.on_cancel({
let abort_handle = join_handle.abort_handle();
let name = Arc::clone(&name);
move || {
if !abort_handle.is_finished() {
tracing::warn!("Subsystem cancelled: '{}'", name);
}
abort_handle.abort();
}
abort_handle.abort();
}
});
});

let failure = match join_handle.await {
Ok(Ok(())) => None,
Ok(Err(e)) => Some(SubsystemError::Failed(name, SubsystemFailure(e))),
Err(e) => {
// We can assume that this is a panic, because a cancellation
// can never happen as long as we still hold `guard`.
assert!(e.is_panic());
Some(SubsystemError::Panicked(name))
}
};
let failure = match join_handle.await {
Ok(Ok(())) => None,
Ok(Err(e)) => Some(SubsystemError::Failed(name, SubsystemFailure(e))),
Err(e) => {
// We can assume that this is a panic, because a cancellation
// can never happen as long as we still hold `guard`.
assert!(e.is_panic());
Some(SubsystemError::Panicked(name))
}
};

// Retrieve the handle that was passed into the subsystem.
// Originally it was intended to pass the handle as reference, but due
// to complications (https://stackoverflow.com/questions/77172947/async-lifetime-issues-of-pass-by-reference-parameters)
// it was decided to pass ownership instead.
//
// It is still important that the handle does not leak out of the subsystem.
let subsystem_handle = match redirected_subsystem_handle.try_recv() {
Ok(s) => s,
Err(_) => {
tracing::error!("The SubsystemHandle object must not be leaked out of the subsystem!");
panic!("The SubsystemHandle object must not be leaked out of the subsystem!");
// Retrieve the handle that was passed into the subsystem.
// Originally it was intended to pass the handle as reference, but due
// to complications (https://stackoverflow.com/questions/77172947/async-lifetime-issues-of-pass-by-reference-parameters)
// it was decided to pass ownership instead.
//
// It is still important that the handle does not leak out of the subsystem.
let subsystem_handle = match redirected_subsystem_handle.try_recv() {
Ok(s) => s,
Err(_) => {
tracing::error!(
"The SubsystemHandle object must not be leaked out of the subsystem!"
);
panic!("The SubsystemHandle object must not be leaked out of the subsystem!");
}
};

// Raise potential errors
let joiner_token = subsystem_handle.joiner_token;
if let Some(failure) = failure {
joiner_token.raise_failure(failure);
}
};

// Raise potential errors
let joiner_token = subsystem_handle.joiner_token;
if let Some(failure) = failure {
joiner_token.raise_failure(failure);
// Wait for children to finish before we destroy the `SubsystemHandle` object.
// Otherwise the children would be cancelled immediately.
//
// This is the main mechanism that forwards a cancellation to all the children.
joiner_token.downgrade().join().await;
}

// Wait for children to finish before we destroy the `SubsystemHandle` object.
// Otherwise the children would be cancelled immediately.
//
// This is the main mechanism that forwards a cancellation to all the children.
joiner_token.downgrade().join().await;
}
2 changes: 2 additions & 0 deletions src/subsystem/subsystem_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {
/// Ok(())
/// }
/// ```
#[track_caller]
pub fn start<Err, Fut, Subsys>(
&self,
builder: SubsystemBuilder<ErrType, Err, Fut, Subsys>,
Expand All @@ -96,6 +97,7 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {
)
}

#[track_caller]
pub(crate) fn start_with_abs_name<Err, Fut, Subsys>(
&self,
name: Arc<str>,
Expand Down
2 changes: 2 additions & 0 deletions src/tokio_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::future::Future;
use tokio::task::JoinHandle;

#[cfg(not(all(tokio_unstable, feature = "tracing")))]
#[track_caller]
pub(crate) fn spawn<F: Future + Send + 'static>(f: F, _name: &str) -> JoinHandle<F::Output>
where
<F as Future>::Output: Send + 'static,
Expand All @@ -10,6 +11,7 @@ where
}

#[cfg(all(tokio_unstable, feature = "tracing"))]
#[track_caller]
pub(crate) fn spawn<F: Future + Send + 'static>(f: F, name: &str) -> JoinHandle<F::Output>
where
<F as Future>::Output: Send + 'static,
Expand Down
2 changes: 2 additions & 0 deletions src/toplevel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
/// * `subsystem` - The subsystem that should be spawned as the root node.
/// Usually the job of this subsystem is to spawn further subsystems.
#[allow(clippy::new_without_default)]
#[track_caller]
pub fn new<Fut, Subsys>(subsystem: Subsys) -> Self
where
Subsys: 'static + FnOnce(SubsystemHandle<ErrType>) -> Fut + Send,
Expand Down Expand Up @@ -118,6 +119,7 @@ impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
///
/// Especially the caveats from [tokio::signal::unix::Signal] are important for Unix targets.
///
#[track_caller]
pub fn catch_signals(self) -> Self {
let shutdown_token = self.root_handle.get_cancellation_token().clone();

Expand Down

0 comments on commit bc5b151

Please sign in to comment.