From 88f01793bec5830370cb88f74a64a2e20a440c17 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 7 Feb 2018 13:12:33 -0800 Subject: [PATCH] fix(client): send an `Error::Cancel` if a queued request is dropped Adds `Error::Cancel` variant. --- src/client/cancel.rs | 8 ++++++- src/client/dispatch.rs | 53 ++++++++++++++++++++++++++++++++++++++---- src/error.rs | 48 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 104 insertions(+), 5 deletions(-) diff --git a/src/client/cancel.rs b/src/client/cancel.rs index 6ebc01a902..bc29dbf60c 100644 --- a/src/client/cancel.rs +++ b/src/client/cancel.rs @@ -57,6 +57,12 @@ impl Cancel { } } +impl Canceled { + pub fn cancel(&self) { + self.inner.is_canceled.store(true, Ordering::SeqCst); + } +} + impl Future for Canceled { type Item = (); type Error = Never; @@ -87,7 +93,7 @@ impl Future for Canceled { impl Drop for Canceled { fn drop(&mut self) { - self.inner.is_canceled.store(true, Ordering::SeqCst); + self.cancel(); } } diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index c34665cf9e..12ce64bb28 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -69,21 +69,66 @@ impl Stream for Receiver { } } -//TODO: Drop for Receiver should consume inner +impl Drop for Receiver { + fn drop(&mut self) { + self.canceled.cancel(); + self.inner.close(); + + // This poll() is safe to call in `Drop`, because we've + // called, `close`, which promises that no new messages + // will arrive, and thus, once we reach the end, we won't + // see a `NotReady` (and try to park), but a Ready(None). + // + // All other variants: + // - Ready(None): the end. we want to stop looping + // - NotReady: unreachable + // - Err: unreachable + while let Ok(Async::Ready(Some((_val, cb)))) = self.inner.poll() { + // maybe in future, we pass the value along with the error? + let _ = cb.send(Err(::Error::new_canceled())); + } + } + +} #[cfg(test)] mod tests { - + extern crate pretty_env_logger; #[cfg(feature = "nightly")] extern crate test; + use futures::{future, Future}; + #[cfg(feature = "nightly")] - use futures::{Future, Stream}; + use futures::{Stream}; + + #[test] + fn drop_receiver_sends_cancel_errors() { + let _ = pretty_env_logger::try_init(); + + future::lazy(|| { + #[derive(Debug)] + struct Custom(i32); + let (tx, rx) = super::channel::(); + + let promise = tx.send(Custom(43)).unwrap(); + drop(rx); + + promise.then(|fulfilled| { + let res = fulfilled.expect("fulfilled"); + match res.unwrap_err() { + ::Error::Cancel(_) => (), + e => panic!("expected Error::Cancel(_), found {:?}", e), + } + + Ok::<(), ()>(()) + }) + }).wait().unwrap(); + } #[cfg(feature = "nightly")] #[bench] fn cancelable_queue_throughput(b: &mut test::Bencher) { - let (tx, mut rx) = super::channel::(); b.iter(move || { diff --git a/src/error.rs b/src/error.rs index 5a2d448869..a7f32c0d62 100644 --- a/src/error.rs +++ b/src/error.rs @@ -17,6 +17,7 @@ use self::Error::{ Status, Timeout, Upgrade, + Cancel, Io, TooLarge, Incomplete, @@ -47,6 +48,8 @@ pub enum Error { Timeout, /// A protocol upgrade was encountered, but not yet supported in hyper. Upgrade, + /// A pending item was dropped before ever being processed. + Cancel(Canceled), /// An `io::Error` that occurred while trying to read or write to a network stream. Io(IoError), /// Parsing a field as string failed @@ -56,6 +59,45 @@ pub enum Error { __Nonexhaustive(Void) } +impl Error { + pub(crate) fn new_canceled() -> Error { + Error::Cancel(Canceled { + _inner: (), + }) + } +} + +/// A pending item was dropped before ever being processed. +/// +/// For example, a `Request` could be queued in the `Client`, *just* +/// as the related connection gets closed by the remote. In that case, +/// when the connection drops, the pending response future will be +/// fulfilled with this error, signaling the `Request` was never started. +pub struct Canceled { + // maybe in the future this contains an optional value of + // what was canceled? + _inner: (), +} + +impl Canceled { + fn description(&self) -> &str { + "an operation was canceled internally before starting" + } +} + +impl fmt::Debug for Canceled { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Canceled") + .finish() + } +} + +impl fmt::Display for Canceled { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad(self.description()) + } +} + #[doc(hidden)] pub struct Void(()); @@ -87,6 +129,7 @@ impl StdError for Error { Incomplete => "message is incomplete", Timeout => "timeout", Upgrade => "unsupported protocol upgrade", + Cancel(ref e) => e.description(), Uri(ref e) => e.description(), Io(ref e) => e.description(), Utf8(ref e) => e.description(), @@ -143,6 +186,11 @@ impl From for Error { } } +#[doc(hidden)] +trait AssertSendSync: Send + Sync + 'static {} +#[doc(hidden)] +impl AssertSendSync for Error {} + #[cfg(test)] mod tests { use std::error::Error as StdError;