Skip to content

Commit

Permalink
Increase test coverage (#67)
Browse files Browse the repository at this point in the history
... and do some refactoring along the way
  • Loading branch information
Finomnis authored Oct 22, 2023
1 parent c8722d7 commit 374e241
Show file tree
Hide file tree
Showing 10 changed files with 294 additions and 54 deletions.
34 changes: 31 additions & 3 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,21 @@ use std::sync::Arc;

use miette::Diagnostic;
use thiserror::Error;
use tokio::sync::mpsc;

use crate::ErrTypeTraits;

/// This enum contains all the possible errors that could be returned
/// by [`handle_shutdown_requests()`](crate::Toplevel::handle_shutdown_requests).
#[derive(Error, Debug, Diagnostic)]
#[derive(Debug, Error, Diagnostic)]
pub enum GracefulShutdownError<ErrType: ErrTypeTraits = crate::BoxedError> {
/// At least one subsystem caused an error.
#[error("at least one subsystem returned an error")]
#[diagnostic(code(graceful_shutdown::failed))]
#[error("at least one subsystem returned an error")]
SubsystemsFailed(#[related] Box<[SubsystemError<ErrType>]>),
/// The shutdown did not finish within the given timeout.
#[error("shutdown timed out")]
#[diagnostic(code(graceful_shutdown::timeout))]
#[error("shutdown timed out")]
ShutdownTimeout(#[related] Box<[SubsystemError<ErrType>]>),
}

Expand Down Expand Up @@ -124,7 +125,34 @@ impl<ErrType: ErrTypeTraits> SubsystemError<ErrType> {
/// [`cancel_on_shutdown()`](crate::FutureExt::cancel_on_shutdown).
#[derive(Error, Debug, Diagnostic)]
#[error("A shutdown request caused this task to be cancelled")]
#[diagnostic(code(graceful_shutdown::future::cancelled_by_shutdown))]
pub struct CancelledByShutdown;

// This function contains code that stems from the principle
// of defensive coding - meaning, handle potential errors
// gracefully, even if they should not happen.
// Therefore it is in this special function, so we don't
// get coverage problems.
pub(crate) fn handle_dropped_error<ErrType: ErrTypeTraits>(
result: Result<(), mpsc::error::SendError<ErrType>>,
) {
if let Err(mpsc::error::SendError(e)) = result {
tracing::warn!("An error got dropped: {e:?}");
}
}

// This function contains code that stems from the principle
// of defensive coding - meaning, handle potential errors
// gracefully, even if they should not happen.
// Therefore it is in this special function, so we don't
// get coverage problems.
pub(crate) fn handle_unhandled_stopreason<ErrType: ErrTypeTraits>(
maybe_stop_reason: Option<SubsystemError<ErrType>>,
) {
if let Some(stop_reason) = maybe_stop_reason {
tracing::warn!("Unhandled stop reason: {:?}", stop_reason);
}
}

#[cfg(test)]
mod tests;
55 changes: 46 additions & 9 deletions src/errors/tests.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
use tracing_test::traced_test;

use crate::BoxedError;

use super::*;

fn examine_report(report: miette::Report) {
fn examine_report(
error: impl miette::Diagnostic + std::error::Error + std::fmt::Debug + Sync + Send + 'static,
) {
println!("{}", error);
println!("{:?}", error);
println!("{:?}", error.source());
println!("{}", error.code().unwrap());
// Convert to report
let report: miette::Report = error.into();
println!("{}", report);
println!("{:?}", report);
// Convert to std::error::Error
Expand All @@ -13,14 +23,21 @@ fn examine_report(report: miette::Report) {

#[test]
fn errors_can_be_converted_to_diagnostic() {
examine_report(GracefulShutdownError::ShutdownTimeout::<BoxedError>(Box::new([])).into());
examine_report(GracefulShutdownError::SubsystemsFailed::<BoxedError>(Box::new([])).into());
examine_report(SubsystemJoinError::SubsystemsFailed::<BoxedError>(Arc::new([])).into());
examine_report(SubsystemError::Panicked::<BoxedError>("".into()).into());
examine_report(
SubsystemError::Failed::<BoxedError>("".into(), SubsystemFailure("".into())).into(),
);
examine_report(CancelledByShutdown.into());
examine_report(GracefulShutdownError::ShutdownTimeout::<BoxedError>(
Box::new([]),
));
examine_report(GracefulShutdownError::SubsystemsFailed::<BoxedError>(
Box::new([]),
));
examine_report(SubsystemJoinError::SubsystemsFailed::<BoxedError>(
Arc::new([]),
));
examine_report(SubsystemError::Panicked::<BoxedError>("".into()));
examine_report(SubsystemError::Failed::<BoxedError>(
"".into(),
SubsystemFailure("".into()),
));
examine_report(CancelledByShutdown);
}

#[test]
Expand Down Expand Up @@ -61,3 +78,23 @@ fn extract_contained_error_from_convert_subsystem_failure() {
assert_eq!(msg, *failure);
assert_eq!(msg, failure.into_error());
}

#[test]
#[traced_test]
fn handle_dropped_errors() {
handle_dropped_error(Err(mpsc::error::SendError(BoxedError::from(String::from(
"ABC",
)))));

assert!(logs_contain("An error got dropped: \"ABC\""));
}

#[test]
#[traced_test]
fn handle_unhandled_stopreasons() {
handle_unhandled_stopreason(Some(SubsystemError::<BoxedError>::Panicked(Arc::from(
"def",
))));

assert!(logs_contain("Unhandled stop reason: Panicked(\"def\")"));
}
18 changes: 8 additions & 10 deletions src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,10 @@ async fn run_subsystem<Fut, Subsys, ErrType: ErrTypeTraits, Err>(
Ok(Ok(())) => None,
Ok(Err(e)) => Some(SubsystemError::Failed(name, SubsystemFailure(e))),
Err(e) => {
if e.is_panic() {
Some(SubsystemError::Panicked(name))
} else {
// Don't do anything in case of a cancellation;
// cancellations can't be forwarded (because the
// current function we are in will be cancelled
// simultaneously)
None
}
// We can assume that this is a panic, because a cancellation
// can never happen as long as we still hold `guard`.
assert!(e.is_panic());
Some(SubsystemError::Panicked(name))
}
};

Expand All @@ -95,7 +90,10 @@ async fn run_subsystem<Fut, Subsys, ErrType: ErrTypeTraits, Err>(
// It is still important that the handle does not leak out of the subsystem.
let subsystem_handle = match redirected_subsystem_handle.try_recv() {
Ok(s) => s,
Err(_) => panic!("The SubsystemHandle object must not be leaked out of the subsystem!"),
Err(_) => {
tracing::error!("The SubsystemHandle object must not be leaked out of the subsystem!");
panic!("The SubsystemHandle object must not be leaked out of the subsystem!");
}
};

// Raise potential errors
Expand Down
8 changes: 3 additions & 5 deletions src/subsystem/subsystem_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;

use crate::{
errors::SubsystemError,
errors::{handle_dropped_error, SubsystemError},
runner::{AliveGuard, SubsystemRunner},
utils::{remote_drop_collection::RemotelyDroppableItems, JoinerToken},
BoxedError, ErrTypeTraits, ErrorAction, NestedSubsystem, SubsystemBuilder,
Expand Down Expand Up @@ -124,9 +124,7 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {
match error_action {
ErrorAction::Forward => Some(e),
ErrorAction::CatchAndLocalShutdown => {
if let Err(mpsc::error::SendError(e)) = error_sender.send(e) {
tracing::warn!("An error got dropped: {e:?}");
};
handle_dropped_error(error_sender.send(e));
cancellation_token.cancel();
None
}
Expand Down Expand Up @@ -167,7 +165,7 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {
}

/// Waits until all the children of this subsystem are finished.
pub async fn wait_for_children(&mut self) {
pub async fn wait_for_children(&self) {
self.inner.joiner_token.join_children().await
}

Expand Down
17 changes: 8 additions & 9 deletions src/toplevel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

use crate::{
errors::{GracefulShutdownError, SubsystemError},
errors::{handle_dropped_error, GracefulShutdownError, SubsystemError},
signal_handling::wait_for_signal,
subsystem::{self, ErrorActions},
BoxedError, ErrTypeTraits, ErrorAction, NestedSubsystem, SubsystemHandle,
Expand Down Expand Up @@ -74,9 +74,7 @@ impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
}
};

if let Err(mpsc::error::SendError(e)) = error_sender.send(e) {
tracing::warn!("An error got dropped: {e:?}");
};
handle_dropped_error(error_sender.send(e));
});

let toplevel_subsys = root_handle.start_with_abs_name(
Expand Down Expand Up @@ -181,7 +179,12 @@ impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
);

match tokio::time::timeout(shutdown_timeout, self.toplevel_subsys.join()).await {
Ok(Ok(())) => {
Ok(result) => {
// An `Err` here would indicate a programming error,
// because the toplevel subsys doesn't catch any errors;
// it only forwards them.
assert!(result.is_ok());

let errors = collect_errors();
if errors.is_empty() {
tracing::info!("Shutdown finished.");
Expand All @@ -191,10 +194,6 @@ impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
Err(GracefulShutdownError::SubsystemsFailed(errors))
}
}
Ok(Err(_)) => {
// This can't happen because the toplevel subsys doesn't catch any errors; it only forwards them.
unreachable!();
}
Err(_) => {
tracing::error!("Shutdown timed out!");
Err(GracefulShutdownError::ShutdownTimeout(collect_errors()))
Expand Down
13 changes: 6 additions & 7 deletions src/utils/joiner_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use std::{fmt::Debug, sync::Arc};

use tokio::sync::watch;

use crate::{errors::SubsystemError, ErrTypeTraits};
use crate::{
errors::{handle_unhandled_stopreason, SubsystemError},
ErrTypeTraits,
};

struct Inner<ErrType: ErrTypeTraits> {
counter: watch::Sender<(bool, u32)>,
Expand Down Expand Up @@ -67,9 +70,7 @@ impl<ErrType: ErrTypeTraits> JoinerToken<ErrType> {
(Self { inner }, weak_ref)
}

// Requires `mut` access to prevent children from being spawned
// while waiting
pub(crate) async fn join_children(&mut self) {
pub(crate) async fn join_children(&self) {
let mut subscriber = self.inner.counter.subscribe();

// Ignore errors; if the channel got closed, that definitely means
Expand Down Expand Up @@ -126,9 +127,7 @@ impl<ErrType: ErrTypeTraits> JoinerToken<ErrType> {
maybe_parent = parent.parent.as_ref();
}

if let Some(stop_reason) = maybe_stop_reason {
tracing::warn!("Unhandled stop reason: {:?}", stop_reason);
}
handle_unhandled_stopreason(maybe_stop_reason);
}

pub(crate) fn downgrade(self) -> JoinerTokenRef {
Expand Down
2 changes: 1 addition & 1 deletion src/utils/joiner_token/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ fn counters_weak() {
async fn join() {
let (superroot, _) = JoinerToken::<BoxedError>::new(|_| None);

let (mut root, _) = superroot.child_token(|_| None);
let (root, _) = superroot.child_token(|_| None);

let (child1, _) = root.child_token(|_| None);
let (child2, _) = child1.child_token(|_| None);
Expand Down
24 changes: 14 additions & 10 deletions src/utils/remote_drop_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,22 @@ impl<T> Drop for RemoteDrop<T> {
// Important: lock first, then read the offset.
let mut data = data.lock().unwrap();

if let Some(offset) = self.offset.upgrade() {
let offset = offset.load(Ordering::Acquire);
let offset = self
.offset
.upgrade()
.expect("Trying to delete non-existent item! Please report this.")
.load(Ordering::Acquire);

if let Some(last_item) = data.pop() {
if offset != data.len() {
// There must have been at least two items, and we are not at the end.
// So swap first before dropping.
let last_item = data
.pop()
.expect("Trying to delete non-existent item! Please report this.");

last_item.offset.store(offset, Ordering::Release);
data[offset] = last_item;
}
}
if offset != data.len() {
// There must have been at least two items, and we are not at the end.
// So swap first before dropping.

last_item.offset.store(offset, Ordering::Release);
data[offset] = last_item;
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/utils/remote_drop_collection/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
use super::*;
use crate::{utils::JoinerToken, BoxedError};

#[test]
fn single_item() {
let items = RemotelyDroppableItems::new();

let (count1, _) = JoinerToken::<BoxedError>::new(|_| None);
assert_eq!(0, count1.count());

let token1 = items.insert(count1.child_token(|_| None));
assert_eq!(1, count1.count());

drop(token1);
assert_eq!(0, count1.count());
}

#[test]
fn insert_and_drop() {
let items = RemotelyDroppableItems::new();
Expand Down
Loading

0 comments on commit 374e241

Please sign in to comment.