From eddbdaaced54cc9b6d940c9496f3f4ab12a74a11 Mon Sep 17 00:00:00 2001 From: Fuyang Liu Date: Wed, 6 Jan 2021 17:01:33 +0100 Subject: [PATCH 01/19] tokio-stream: add wrapper for broadcast Signed-off-by: Fuyang Liu --- tokio-stream/Cargo.toml | 2 +- tokio-stream/src/wrappers.rs | 3 ++ tokio-stream/src/wrappers/broadcast.rs | 38 ++++++++++++++++++++++++++ 3 files changed, 42 insertions(+), 1 deletion(-) create mode 100644 tokio-stream/src/wrappers/broadcast.rs diff --git a/tokio-stream/Cargo.toml b/tokio-stream/Cargo.toml index d662c387fec..90dd4d1ab5a 100644 --- a/tokio-stream/Cargo.toml +++ b/tokio-stream/Cargo.toml @@ -28,13 +28,13 @@ fs = ["tokio/fs"] [dependencies] futures-core = { version = "0.3.0" } +async-stream = "0.3" pin-project-lite = "0.2.0" tokio = { version = "1.0", features = ["sync"] } [dev-dependencies] tokio = { version = "1.0", features = ["full", "test-util"] } tokio-test = { path = "../tokio-test" } -async-stream = "0.3" futures = { version = "0.3", default-features = false } proptest = "0.10.0" diff --git a/tokio-stream/src/wrappers.rs b/tokio-stream/src/wrappers.rs index c0ffb234a09..6d1d7fe8694 100644 --- a/tokio-stream/src/wrappers.rs +++ b/tokio-stream/src/wrappers.rs @@ -6,6 +6,9 @@ pub use mpsc_bounded::ReceiverStream; mod mpsc_unbounded; pub use mpsc_unbounded::UnboundedReceiverStream; +mod broadcast; +pub use broadcast::BroadcastStream; + cfg_time! { mod interval; pub use interval::IntervalStream; diff --git a/tokio-stream/src/wrappers/broadcast.rs b/tokio-stream/src/wrappers/broadcast.rs new file mode 100644 index 00000000000..268e479b992 --- /dev/null +++ b/tokio-stream/src/wrappers/broadcast.rs @@ -0,0 +1,38 @@ +use crate::Stream; +use async_stream::try_stream; +use std::pin::Pin; +use tokio::sync::broadcast::error::RecvError; +use tokio::sync::broadcast::Receiver; + +use std::task::{Context, Poll}; + +/// A wrapper around [`Receiver`] that implements [`Stream`]. Achieved by using the [`async-stream`] crate. +/// +/// [`Receiver`]: struct@tokio::sync::broadcast::Receiver +/// [`Stream`]: trait@crate::Stream +/// [`async-stream`]: https://docs.rs/async-stream +#[cfg_attr(docsrs, doc(cfg(feature = "net")))] +pub struct BroadcastStream { + inner: Pin>>>, +} + +impl BroadcastStream { + /// Create a new `BroadcastStream`. + pub fn new(mut rx: Receiver) -> Self { + let stream = try_stream! { + loop { + let item = rx.recv().await?; + yield item; + } + }; + Self { inner: Box::pin(stream) } + } +} + +impl Stream for BroadcastStream { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_next(cx) + } +} From f1094aa4b89ea5bc31dd3c12bee116072aea5f51 Mon Sep 17 00:00:00 2001 From: Fuyang Liu Date: Wed, 6 Jan 2021 17:18:07 +0100 Subject: [PATCH 02/19] Fix up Signed-off-by: Fuyang Liu --- tokio-stream/src/wrappers/broadcast.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tokio-stream/src/wrappers/broadcast.rs b/tokio-stream/src/wrappers/broadcast.rs index 268e479b992..eea23b1a7e1 100644 --- a/tokio-stream/src/wrappers/broadcast.rs +++ b/tokio-stream/src/wrappers/broadcast.rs @@ -13,10 +13,10 @@ use std::task::{Context, Poll}; /// [`async-stream`]: https://docs.rs/async-stream #[cfg_attr(docsrs, doc(cfg(feature = "net")))] pub struct BroadcastStream { - inner: Pin>>>, + inner: Pin> + Send + Sync >>, } -impl BroadcastStream { +impl BroadcastStream { /// Create a new `BroadcastStream`. pub fn new(mut rx: Receiver) -> Self { let stream = try_stream! { @@ -30,9 +30,9 @@ impl BroadcastStream { } impl Stream for BroadcastStream { - type Item = T; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_next(cx) + Pin::new(&mut self.inner).poll_next(cx) } } From 699591450b26ba2843a2d0d6bae6d3c11bdc3551 Mon Sep 17 00:00:00 2001 From: Fuyang Liu Date: Wed, 6 Jan 2021 17:26:40 +0100 Subject: [PATCH 03/19] Minor fixup - remove cfg_attr --- tokio-stream/src/wrappers/broadcast.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tokio-stream/src/wrappers/broadcast.rs b/tokio-stream/src/wrappers/broadcast.rs index eea23b1a7e1..3aa73e11fdc 100644 --- a/tokio-stream/src/wrappers/broadcast.rs +++ b/tokio-stream/src/wrappers/broadcast.rs @@ -11,7 +11,6 @@ use std::task::{Context, Poll}; /// [`Receiver`]: struct@tokio::sync::broadcast::Receiver /// [`Stream`]: trait@crate::Stream /// [`async-stream`]: https://docs.rs/async-stream -#[cfg_attr(docsrs, doc(cfg(feature = "net")))] pub struct BroadcastStream { inner: Pin> + Send + Sync >>, } From 5229cad8256f476791fcaaa5b7ccc5ca0404483a Mon Sep 17 00:00:00 2001 From: Fuyang Liu Date: Sat, 23 Jan 2021 13:31:04 +0100 Subject: [PATCH 04/19] Add #[derive(Debug)] --- tokio-stream/src/wrappers/broadcast.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tokio-stream/src/wrappers/broadcast.rs b/tokio-stream/src/wrappers/broadcast.rs index 3aa73e11fdc..bfd2fe2b9ae 100644 --- a/tokio-stream/src/wrappers/broadcast.rs +++ b/tokio-stream/src/wrappers/broadcast.rs @@ -11,6 +11,7 @@ use std::task::{Context, Poll}; /// [`Receiver`]: struct@tokio::sync::broadcast::Receiver /// [`Stream`]: trait@crate::Stream /// [`async-stream`]: https://docs.rs/async-stream +#[derive(Debug)] pub struct BroadcastStream { inner: Pin> + Send + Sync >>, } From c611d5c6a20238777af22b74ff773a0d21924442 Mon Sep 17 00:00:00 2001 From: Fuyang Liu Date: Sun, 24 Jan 2021 08:03:58 +0100 Subject: [PATCH 05/19] Update tokio-stream/src/wrappers/broadcast.rs Co-authored-by: Alice Ryhl --- tokio-stream/src/wrappers/broadcast.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tokio-stream/src/wrappers/broadcast.rs b/tokio-stream/src/wrappers/broadcast.rs index bfd2fe2b9ae..fbfa9a4c78d 100644 --- a/tokio-stream/src/wrappers/broadcast.rs +++ b/tokio-stream/src/wrappers/broadcast.rs @@ -6,11 +6,10 @@ use tokio::sync::broadcast::Receiver; use std::task::{Context, Poll}; -/// A wrapper around [`Receiver`] that implements [`Stream`]. Achieved by using the [`async-stream`] crate. +/// A wrapper around [`Receiver`] that implements [`Stream`]. /// /// [`Receiver`]: struct@tokio::sync::broadcast::Receiver /// [`Stream`]: trait@crate::Stream -/// [`async-stream`]: https://docs.rs/async-stream #[derive(Debug)] pub struct BroadcastStream { inner: Pin> + Send + Sync >>, From 1977be3621c05230d4ec679fe05b9b61b1706e8a Mon Sep 17 00:00:00 2001 From: Fuyang Liu Date: Sun, 24 Jan 2021 11:04:40 +0100 Subject: [PATCH 06/19] return Lagged error --- tokio-stream/src/wrappers/broadcast.rs | 40 ++++++++++++++++++++------ 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/tokio-stream/src/wrappers/broadcast.rs b/tokio-stream/src/wrappers/broadcast.rs index fbfa9a4c78d..b2ea85407df 100644 --- a/tokio-stream/src/wrappers/broadcast.rs +++ b/tokio-stream/src/wrappers/broadcast.rs @@ -1,37 +1,61 @@ use crate::Stream; -use async_stream::try_stream; +use async_stream::stream; use std::pin::Pin; use tokio::sync::broadcast::error::RecvError; use tokio::sync::broadcast::Receiver; +use std::fmt; use std::task::{Context, Poll}; /// A wrapper around [`Receiver`] that implements [`Stream`]. /// /// [`Receiver`]: struct@tokio::sync::broadcast::Receiver /// [`Stream`]: trait@crate::Stream -#[derive(Debug)] pub struct BroadcastStream { - inner: Pin> + Send + Sync >>, + inner: Pin> + Send + Sync>>, +} + +/// An error returned from the inner stream of a [`BroadcastStream`]. +#[derive(Debug, PartialEq)] +pub enum BroadcastStreamRecvError { + /// The receiver lagged too far behind. Attempting to receive again will + /// return the oldest message still retained by the channel. + /// + /// Includes the number of skipped messages. + Lagged(u64), } impl BroadcastStream { /// Create a new `BroadcastStream`. pub fn new(mut rx: Receiver) -> Self { - let stream = try_stream! { + let stream = stream! { loop { - let item = rx.recv().await?; - yield item; + match rx.recv().await { + Ok(item) => yield Ok(item), + Err(err) => + match err { + RecvError::Closed => break, + RecvError::Lagged(n) => yield Err(BroadcastStreamRecvError::Lagged(n)) + } + } } }; - Self { inner: Box::pin(stream) } + Self { + inner: Box::pin(stream), + } } } impl Stream for BroadcastStream { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.inner).poll_next(cx) } } + +impl fmt::Debug for BroadcastStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BroadcastStream").finish() + } +} From d9fd015d04844c5550bfd32b943917e6755a4dc7 Mon Sep 17 00:00:00 2001 From: Fuyang Liu Date: Sun, 24 Jan 2021 11:44:45 +0100 Subject: [PATCH 07/19] expose BroadcastStreamRecvError --- tokio-stream/src/wrappers.rs | 1 + tokio-stream/src/wrappers/broadcast.rs | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tokio-stream/src/wrappers.rs b/tokio-stream/src/wrappers.rs index 6d1d7fe8694..62cb8aef9bb 100644 --- a/tokio-stream/src/wrappers.rs +++ b/tokio-stream/src/wrappers.rs @@ -8,6 +8,7 @@ pub use mpsc_unbounded::UnboundedReceiverStream; mod broadcast; pub use broadcast::BroadcastStream; +pub use broadcast::BroadcastStreamRecvError; cfg_time! { mod interval; diff --git a/tokio-stream/src/wrappers/broadcast.rs b/tokio-stream/src/wrappers/broadcast.rs index b2ea85407df..8eec81e52db 100644 --- a/tokio-stream/src/wrappers/broadcast.rs +++ b/tokio-stream/src/wrappers/broadcast.rs @@ -11,7 +11,7 @@ use std::task::{Context, Poll}; /// /// [`Receiver`]: struct@tokio::sync::broadcast::Receiver /// [`Stream`]: trait@crate::Stream -pub struct BroadcastStream { +pub struct BroadcastStream { inner: Pin> + Send + Sync>>, } @@ -35,7 +35,8 @@ impl BroadcastStream { Err(err) => match err { RecvError::Closed => break, - RecvError::Lagged(n) => yield Err(BroadcastStreamRecvError::Lagged(n)) + RecvError::Lagged(n) => + yield Err(BroadcastStreamRecvError::Lagged(n)) } } } From 9513b719ff9d40d0ef9347dfe5439aae2baf9456 Mon Sep 17 00:00:00 2001 From: Fuyang Liu Date: Sun, 24 Jan 2021 11:45:57 +0100 Subject: [PATCH 08/19] reformat --- tokio-stream/src/wrappers/broadcast.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/tokio-stream/src/wrappers/broadcast.rs b/tokio-stream/src/wrappers/broadcast.rs index 8eec81e52db..08ef1c247b3 100644 --- a/tokio-stream/src/wrappers/broadcast.rs +++ b/tokio-stream/src/wrappers/broadcast.rs @@ -31,13 +31,11 @@ impl BroadcastStream { let stream = stream! { loop { match rx.recv().await { - Ok(item) => yield Ok(item), - Err(err) => - match err { - RecvError::Closed => break, - RecvError::Lagged(n) => - yield Err(BroadcastStreamRecvError::Lagged(n)) - } + Ok(item) => yield Ok(item), + Err(err) => match err { + RecvError::Closed => break, + RecvError::Lagged(n) => yield Err(BroadcastStreamRecvError::Lagged(n)), + }, } } }; From ed008036fa09961bba20f1e64e3e3adc4392a836 Mon Sep 17 00:00:00 2001 From: Fuyang Liu Date: Mon, 25 Jan 2021 11:10:11 +0100 Subject: [PATCH 09/19] add watch (#1) * add watch * Fix up --- tokio-stream/src/wrappers.rs | 3 ++ tokio-stream/src/wrappers/broadcast.rs | 4 +-- tokio-stream/src/wrappers/watch.rs | 50 ++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 2 deletions(-) create mode 100644 tokio-stream/src/wrappers/watch.rs diff --git a/tokio-stream/src/wrappers.rs b/tokio-stream/src/wrappers.rs index 62cb8aef9bb..405f35a5b61 100644 --- a/tokio-stream/src/wrappers.rs +++ b/tokio-stream/src/wrappers.rs @@ -10,6 +10,9 @@ mod broadcast; pub use broadcast::BroadcastStream; pub use broadcast::BroadcastStreamRecvError; +mod watch; +pub use watch::WatchStream; + cfg_time! { mod interval; pub use interval::IntervalStream; diff --git a/tokio-stream/src/wrappers/broadcast.rs b/tokio-stream/src/wrappers/broadcast.rs index 08ef1c247b3..435d361dca4 100644 --- a/tokio-stream/src/wrappers/broadcast.rs +++ b/tokio-stream/src/wrappers/broadcast.rs @@ -7,9 +7,9 @@ use tokio::sync::broadcast::Receiver; use std::fmt; use std::task::{Context, Poll}; -/// A wrapper around [`Receiver`] that implements [`Stream`]. +/// A wrapper around [`tokio::sync::broadcast::Receiver`] that implements [`Stream`]. /// -/// [`Receiver`]: struct@tokio::sync::broadcast::Receiver +/// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver /// [`Stream`]: trait@crate::Stream pub struct BroadcastStream { inner: Pin> + Send + Sync>>, diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs new file mode 100644 index 00000000000..96e1a744fc4 --- /dev/null +++ b/tokio-stream/src/wrappers/watch.rs @@ -0,0 +1,50 @@ +use crate::Stream; +use async_stream::stream; +use std::pin::Pin; +use tokio::sync::watch::Receiver; + +use std::fmt; +use std::task::{Context, Poll}; + +/// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`]. +/// +/// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver +/// [`Stream`]: trait@crate::Stream +pub struct WatchStream { + inner: Pin>>, + _marker: std::marker::PhantomData, +} + +impl WatchStream { + /// Create a new `WatchStream`. + pub fn new(mut rx: Receiver) -> Self { + let stream = stream! { + loop { + match rx.changed().await { + Ok(item) => yield item, + Err(_) => break, + } + } + }; + Self { + inner: Box::pin(stream), + _marker: std::marker::PhantomData, + } + } +} + +impl Stream for WatchStream { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_next(cx) + } +} + +impl Unpin for WatchStream {} + +impl fmt::Debug for WatchStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WatchStream").finish() + } +} From ef1228685c239dfdc6ea0c74bd2d5258cf47eae1 Mon Sep 17 00:00:00 2001 From: Fuyang Liu Date: Tue, 26 Jan 2021 09:53:14 +0100 Subject: [PATCH 10/19] return cloned value when warpped recv changed --- tokio-stream/src/wrappers/watch.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index 96e1a744fc4..6e4ae71cc6d 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -11,30 +11,28 @@ use std::task::{Context, Poll}; /// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver /// [`Stream`]: trait@crate::Stream pub struct WatchStream { - inner: Pin>>, - _marker: std::marker::PhantomData, + inner: Pin>>, } -impl WatchStream { +impl WatchStream { /// Create a new `WatchStream`. pub fn new(mut rx: Receiver) -> Self { let stream = stream! { loop { match rx.changed().await { - Ok(item) => yield item, + Ok(_) => yield (*rx.borrow()).clone(), Err(_) => break, } } }; Self { inner: Box::pin(stream), - _marker: std::marker::PhantomData, } } } impl Stream for WatchStream { - type Item = (); + type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.inner).poll_next(cx) From 34eab7361a533fb9efc8d1c889ae1b2f8a16cc71 Mon Sep 17 00:00:00 2001 From: Fuyang Liu Date: Sun, 31 Jan 2021 16:28:28 +0100 Subject: [PATCH 11/19] remove dependency of async_stream --- tokio-stream/Cargo.toml | 2 +- tokio-stream/src/wrappers/broadcast.rs | 43 ++++++++++++++------------ tokio-stream/src/wrappers/watch.rs | 40 ++++++++++++++---------- 3 files changed, 49 insertions(+), 36 deletions(-) diff --git a/tokio-stream/Cargo.toml b/tokio-stream/Cargo.toml index 90dd4d1ab5a..f39edc614b1 100644 --- a/tokio-stream/Cargo.toml +++ b/tokio-stream/Cargo.toml @@ -28,9 +28,9 @@ fs = ["tokio/fs"] [dependencies] futures-core = { version = "0.3.0" } -async-stream = "0.3" pin-project-lite = "0.2.0" tokio = { version = "1.0", features = ["sync"] } +tokio-util = { path = "../tokio-util" } [dev-dependencies] tokio = { version = "1.0", features = ["full", "test-util"] } diff --git a/tokio-stream/src/wrappers/broadcast.rs b/tokio-stream/src/wrappers/broadcast.rs index 435d361dca4..4af3644426a 100644 --- a/tokio-stream/src/wrappers/broadcast.rs +++ b/tokio-stream/src/wrappers/broadcast.rs @@ -1,9 +1,10 @@ -use crate::Stream; -use async_stream::stream; use std::pin::Pin; use tokio::sync::broadcast::error::RecvError; use tokio::sync::broadcast::Receiver; +use futures_core::Stream; +use tokio_util::sync::ReusableBoxFuture; + use std::fmt; use std::task::{Context, Poll}; @@ -12,7 +13,7 @@ use std::task::{Context, Poll}; /// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver /// [`Stream`]: trait@crate::Stream pub struct BroadcastStream { - inner: Pin> + Send + Sync>>, + inner: ReusableBoxFuture), RecvError>>, } /// An error returned from the inner stream of a [`BroadcastStream`]. @@ -25,31 +26,35 @@ pub enum BroadcastStreamRecvError { Lagged(u64), } +async fn make_future( + mut rx: Receiver, +) -> Result<(T, Receiver), RecvError> { + let item = rx.recv().await?; + Ok((item, rx)) +} + impl BroadcastStream { /// Create a new `BroadcastStream`. - pub fn new(mut rx: Receiver) -> Self { - let stream = stream! { - loop { - match rx.recv().await { - Ok(item) => yield Ok(item), - Err(err) => match err { - RecvError::Closed => break, - RecvError::Lagged(n) => yield Err(BroadcastStreamRecvError::Lagged(n)), - }, - } - } - }; + pub fn new(rx: Receiver) -> Self { Self { - inner: Box::pin(stream), + inner: ReusableBoxFuture::new(make_future(rx)), } } } -impl Stream for BroadcastStream { +impl Stream for BroadcastStream { type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner).poll_next(cx) + match ready!(self.inner.poll(cx)) { + Ok((item, rx)) => { + self.inner.set(make_future(rx)); + Poll::Ready(Some(Ok(item))) + } + Err(err) => match err { + RecvError::Closed => Poll::Ready(None), + RecvError::Lagged(n) => Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))), + }, + } } } diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index 6e4ae71cc6d..b68d057cd56 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -1,41 +1,49 @@ -use crate::Stream; -use async_stream::stream; use std::pin::Pin; use tokio::sync::watch::Receiver; +use futures_core::Stream; +use tokio_util::sync::ReusableBoxFuture; + use std::fmt; use std::task::{Context, Poll}; +use tokio::sync::watch::error::RecvError; /// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`]. /// /// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver /// [`Stream`]: trait@crate::Stream pub struct WatchStream { - inner: Pin>>, + inner: ReusableBoxFuture), RecvError>>, +} + +async fn make_future( + mut rx: Receiver, +) -> Result<((), Receiver), RecvError> { + let signal = rx.changed().await?; + Ok((signal, rx)) } -impl WatchStream { +impl WatchStream { /// Create a new `WatchStream`. - pub fn new(mut rx: Receiver) -> Self { - let stream = stream! { - loop { - match rx.changed().await { - Ok(_) => yield (*rx.borrow()).clone(), - Err(_) => break, - } - } - }; + pub fn new(rx: Receiver) -> Self { Self { - inner: Box::pin(stream), + inner: ReusableBoxFuture::new(make_future(rx)), } } } -impl Stream for WatchStream { +impl Stream for WatchStream { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner).poll_next(cx) + match ready!(self.inner.poll(cx)) { + Ok((_, rx)) => { + let received = (*rx.borrow()).clone(); + self.inner.set(make_future(rx)); + Poll::Ready(Some(received)) + } + Err(_) => Poll::Ready(None), + } } } From c6f9ee9a0b6498765f637952adf868d9d5ba7420 Mon Sep 17 00:00:00 2001 From: Fuyang Liu Date: Sun, 31 Jan 2021 16:51:23 +0100 Subject: [PATCH 12/19] Update tokio-stream/Cargo.toml Co-authored-by: Alice Ryhl --- tokio-stream/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-stream/Cargo.toml b/tokio-stream/Cargo.toml index f39edc614b1..8933572d2a4 100644 --- a/tokio-stream/Cargo.toml +++ b/tokio-stream/Cargo.toml @@ -30,7 +30,7 @@ fs = ["tokio/fs"] futures-core = { version = "0.3.0" } pin-project-lite = "0.2.0" tokio = { version = "1.0", features = ["sync"] } -tokio-util = { path = "../tokio-util" } +tokio-util = { version = "0.6.3" } [dev-dependencies] tokio = { version = "1.0", features = ["full", "test-util"] } From 35a8502f12e8b3942a8b4ccaf7faf9dc05e23d72 Mon Sep 17 00:00:00 2001 From: Fuyang Liu Date: Sun, 31 Jan 2021 17:11:12 +0100 Subject: [PATCH 13/19] wrap the receiver in an inner error --- tokio-stream/src/wrappers/broadcast.rs | 28 ++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/tokio-stream/src/wrappers/broadcast.rs b/tokio-stream/src/wrappers/broadcast.rs index 4af3644426a..2ae2753c58a 100644 --- a/tokio-stream/src/wrappers/broadcast.rs +++ b/tokio-stream/src/wrappers/broadcast.rs @@ -26,14 +26,23 @@ pub enum BroadcastStreamRecvError { Lagged(u64), } -async fn make_future( +#[derive(Debug, PartialEq)] +enum WrappedRecvError { + Lagged(u64, Receiver), + Closed, +} + +async fn make_future( mut rx: Receiver, -) -> Result<(T, Receiver), RecvError> { - let item = rx.recv().await?; - Ok((item, rx)) +) -> Result<(T, Receiver), WrappedRecvError> { + match rx.recv().await { + Ok(item) => Ok((item, rx)), + Err(RecvError::Lagged(n)) => WrappedRecvError::Lagged(n, rx), + Err(RecvError::Closed) => WrappedRecvError::Closed, + } } -impl BroadcastStream { +impl BroadcastStream { /// Create a new `BroadcastStream`. pub fn new(rx: Receiver) -> Self { Self { @@ -42,7 +51,7 @@ impl BroadcastStream { } } -impl Stream for BroadcastStream { +impl Stream for BroadcastStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match ready!(self.inner.poll(cx)) { @@ -51,8 +60,11 @@ impl Stream for BroadcastStream { Poll::Ready(Some(Ok(item))) } Err(err) => match err { - RecvError::Closed => Poll::Ready(None), - RecvError::Lagged(n) => Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))), + WrappedRecvError::Closed => Poll::Ready(None), + WrappedRecvError::Lagged(n, rx) => { + self.inner.set(make_future(rx)); + Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))) + } }, } } From 2cbf56039173f2795067a385e044da062bb812e6 Mon Sep 17 00:00:00 2001 From: Fuyang Liu Date: Sun, 31 Jan 2021 17:23:59 +0100 Subject: [PATCH 14/19] fix up --- examples/Cargo.toml | 2 +- tokio-stream/src/wrappers/broadcast.rs | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 1a86c81ef72..802930d820a 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" # [dependencies] instead. [dev-dependencies] tokio = { version = "1.0.0", features = ["full", "tracing"] } -tokio-util = { version = "0.6.1", features = ["full"] } +tokio-util = { version = "0.6.3", features = ["full"] } tokio-stream = { version = "0.1" } async-stream = "0.3" diff --git a/tokio-stream/src/wrappers/broadcast.rs b/tokio-stream/src/wrappers/broadcast.rs index 2ae2753c58a..aad73e8d337 100644 --- a/tokio-stream/src/wrappers/broadcast.rs +++ b/tokio-stream/src/wrappers/broadcast.rs @@ -13,7 +13,7 @@ use std::task::{Context, Poll}; /// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver /// [`Stream`]: trait@crate::Stream pub struct BroadcastStream { - inner: ReusableBoxFuture), RecvError>>, + inner: ReusableBoxFuture), WrappedRecvError>>, } /// An error returned from the inner stream of a [`BroadcastStream`]. @@ -26,7 +26,7 @@ pub enum BroadcastStreamRecvError { Lagged(u64), } -#[derive(Debug, PartialEq)] +#[derive(Debug)] enum WrappedRecvError { Lagged(u64, Receiver), Closed, @@ -37,12 +37,12 @@ async fn make_future( ) -> Result<(T, Receiver), WrappedRecvError> { match rx.recv().await { Ok(item) => Ok((item, rx)), - Err(RecvError::Lagged(n)) => WrappedRecvError::Lagged(n, rx), - Err(RecvError::Closed) => WrappedRecvError::Closed, + Err(RecvError::Lagged(n)) => Err(WrappedRecvError::Lagged(n, rx)), + Err(RecvError::Closed) => Err(WrappedRecvError::Closed), } } -impl BroadcastStream { +impl BroadcastStream { /// Create a new `BroadcastStream`. pub fn new(rx: Receiver) -> Self { Self { @@ -51,7 +51,7 @@ impl BroadcastStream { } } -impl Stream for BroadcastStream { +impl Stream for BroadcastStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match ready!(self.inner.poll(cx)) { From e2d8214414099ad97376372ff58bd05236058ad6 Mon Sep 17 00:00:00 2001 From: Fuyang Liu Date: Sun, 31 Jan 2021 18:19:27 +0100 Subject: [PATCH 15/19] simplify code --- tokio-stream/src/wrappers/broadcast.rs | 27 ++++++++------------------ 1 file changed, 8 insertions(+), 19 deletions(-) diff --git a/tokio-stream/src/wrappers/broadcast.rs b/tokio-stream/src/wrappers/broadcast.rs index aad73e8d337..48c4d23bd39 100644 --- a/tokio-stream/src/wrappers/broadcast.rs +++ b/tokio-stream/src/wrappers/broadcast.rs @@ -13,7 +13,7 @@ use std::task::{Context, Poll}; /// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver /// [`Stream`]: trait@crate::Stream pub struct BroadcastStream { - inner: ReusableBoxFuture), WrappedRecvError>>, + inner: ReusableBoxFuture<(Result, Receiver)>, } /// An error returned from the inner stream of a [`BroadcastStream`]. @@ -26,20 +26,9 @@ pub enum BroadcastStreamRecvError { Lagged(u64), } -#[derive(Debug)] -enum WrappedRecvError { - Lagged(u64, Receiver), - Closed, -} - -async fn make_future( - mut rx: Receiver, -) -> Result<(T, Receiver), WrappedRecvError> { - match rx.recv().await { - Ok(item) => Ok((item, rx)), - Err(RecvError::Lagged(n)) => Err(WrappedRecvError::Lagged(n, rx)), - Err(RecvError::Closed) => Err(WrappedRecvError::Closed), - } +async fn make_future(mut rx: Receiver) -> (Result, Receiver) { + let result = rx.recv().await; + (result, rx) } impl BroadcastStream { @@ -55,13 +44,13 @@ impl Stream for BroadcastStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match ready!(self.inner.poll(cx)) { - Ok((item, rx)) => { + (Ok(item), rx) => { self.inner.set(make_future(rx)); Poll::Ready(Some(Ok(item))) } - Err(err) => match err { - WrappedRecvError::Closed => Poll::Ready(None), - WrappedRecvError::Lagged(n, rx) => { + (Err(err), rx) => match err { + RecvError::Closed => Poll::Ready(None), + RecvError::Lagged(n) => { self.inner.set(make_future(rx)); Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))) } From f7f2f00763fa8a6595f5d67c40bec814f5d24780 Mon Sep 17 00:00:00 2001 From: Fuyang Liu Date: Sun, 31 Jan 2021 18:36:54 +0100 Subject: [PATCH 16/19] alwasy refresh inner future --- tokio-stream/src/wrappers/broadcast.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/tokio-stream/src/wrappers/broadcast.rs b/tokio-stream/src/wrappers/broadcast.rs index 48c4d23bd39..7aafd8312d8 100644 --- a/tokio-stream/src/wrappers/broadcast.rs +++ b/tokio-stream/src/wrappers/broadcast.rs @@ -43,17 +43,13 @@ impl BroadcastStream { impl Stream for BroadcastStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match ready!(self.inner.poll(cx)) { - (Ok(item), rx) => { - self.inner.set(make_future(rx)); - Poll::Ready(Some(Ok(item))) - } - (Err(err), rx) => match err { + let (result, rx) = ready!(self.inner.poll(cx)); + self.inner.set(make_future(rx)); + match result { + Ok(item) => Poll::Ready(Some(Ok(item))), + Err(err) => match err { RecvError::Closed => Poll::Ready(None), - RecvError::Lagged(n) => { - self.inner.set(make_future(rx)); - Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))) - } + RecvError::Lagged(n) => Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))), }, } } From fb244b1b234383d83d04188b4acd1f4fef506e46 Mon Sep 17 00:00:00 2001 From: Fuyang Liu Date: Sun, 31 Jan 2021 18:39:10 +0100 Subject: [PATCH 17/19] add async-stream back for dev dep --- tokio-stream/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/tokio-stream/Cargo.toml b/tokio-stream/Cargo.toml index 8933572d2a4..0bc03ac0331 100644 --- a/tokio-stream/Cargo.toml +++ b/tokio-stream/Cargo.toml @@ -34,6 +34,7 @@ tokio-util = { version = "0.6.3" } [dev-dependencies] tokio = { version = "1.0", features = ["full", "test-util"] } +async-stream = "0.3" tokio-test = { path = "../tokio-test" } futures = { version = "0.3", default-features = false } From 3b045dcb876865d50f956f268aea4d05eac9afd1 Mon Sep 17 00:00:00 2001 From: Fuyang Liu Date: Sun, 31 Jan 2021 19:09:03 +0100 Subject: [PATCH 18/19] simplify code --- tokio-stream/src/wrappers/broadcast.rs | 10 +++++----- tokio-stream/src/wrappers/watch.rs | 13 +++++++------ 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/tokio-stream/src/wrappers/broadcast.rs b/tokio-stream/src/wrappers/broadcast.rs index 7aafd8312d8..f3ff002355c 100644 --- a/tokio-stream/src/wrappers/broadcast.rs +++ b/tokio-stream/src/wrappers/broadcast.rs @@ -47,15 +47,15 @@ impl Stream for BroadcastStream { self.inner.set(make_future(rx)); match result { Ok(item) => Poll::Ready(Some(Ok(item))), - Err(err) => match err { - RecvError::Closed => Poll::Ready(None), - RecvError::Lagged(n) => Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))), - }, + Err(RecvError::Closed) => Poll::Ready(None), + Err(RecvError::Lagged(n)) => { + Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))) + } } } } -impl fmt::Debug for BroadcastStream { +impl fmt::Debug for BroadcastStream { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("BroadcastStream").finish() } diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index b68d057cd56..308412574f7 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -13,14 +13,14 @@ use tokio::sync::watch::error::RecvError; /// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver /// [`Stream`]: trait@crate::Stream pub struct WatchStream { - inner: ReusableBoxFuture), RecvError>>, + inner: ReusableBoxFuture<(Result<(), RecvError>, Receiver)>, } async fn make_future( mut rx: Receiver, -) -> Result<((), Receiver), RecvError> { +) -> (Result<(), RecvError>, Receiver) { let signal = rx.changed().await?; - Ok((signal, rx)) + (Ok(signal), rx) } impl WatchStream { @@ -36,10 +36,11 @@ impl Stream for WatchStream { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match ready!(self.inner.poll(cx)) { - Ok((_, rx)) => { + let (result, rx) = ready!(self.inner.poll(cx)); + self.inner.set(make_future(rx)); + match result { + Ok(_) => { let received = (*rx.borrow()).clone(); - self.inner.set(make_future(rx)); Poll::Ready(Some(received)) } Err(_) => Poll::Ready(None), From d44d54a7778c143dcaa2fd8a9721286dd124efcc Mon Sep 17 00:00:00 2001 From: Fuyang Liu Date: Sun, 31 Jan 2021 19:23:11 +0100 Subject: [PATCH 19/19] fix up --- tokio-stream/src/wrappers/watch.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index 308412574f7..e58de918d29 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -19,8 +19,8 @@ pub struct WatchStream { async fn make_future( mut rx: Receiver, ) -> (Result<(), RecvError>, Receiver) { - let signal = rx.changed().await?; - (Ok(signal), rx) + let result = rx.changed().await; + (result, rx) } impl WatchStream { @@ -37,13 +37,16 @@ impl Stream for WatchStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let (result, rx) = ready!(self.inner.poll(cx)); - self.inner.set(make_future(rx)); match result { Ok(_) => { let received = (*rx.borrow()).clone(); + self.inner.set(make_future(rx)); Poll::Ready(Some(received)) } - Err(_) => Poll::Ready(None), + Err(_) => { + self.inner.set(make_future(rx)); + Poll::Ready(None) + } } } }