diff --git a/tokio-stream/src/wrappers/broadcast.rs b/tokio-stream/src/wrappers/broadcast.rs index b3900db8ff6..3474cff7718 100644 --- a/tokio-stream/src/wrappers/broadcast.rs +++ b/tokio-stream/src/wrappers/broadcast.rs @@ -10,6 +10,29 @@ use std::task::{ready, Context, Poll}; /// A wrapper around [`tokio::sync::broadcast::Receiver`] that implements [`Stream`]. /// +/// # Example +/// +/// ``` +/// use tokio::sync::broadcast; +/// use tokio_stream::wrappers::BroadcastStream; +/// use tokio_stream::StreamExt; +/// +/// # #[tokio::main(flavor = "current_thread")] +/// # async fn main() -> Result<(), tokio::sync::broadcast::error::SendError> { +/// let (tx, rx) = broadcast::channel(16); +/// tx.send(10)?; +/// tx.send(20)?; +/// # // prevent the doc test from hanging +/// drop(tx); +/// +/// let mut stream = BroadcastStream::new(rx); +/// assert_eq!(stream.next().await, Some(Ok(10))); +/// assert_eq!(stream.next().await, Some(Ok(20))); +/// assert_eq!(stream.next().await, None); +/// # Ok(()) +/// # } +/// ``` +/// /// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver /// [`Stream`]: trait@futures_core::Stream #[cfg_attr(docsrs, doc(cfg(feature = "sync")))] diff --git a/tokio-stream/src/wrappers/interval.rs b/tokio-stream/src/wrappers/interval.rs index c7a0b1f1e2a..faac5e78a3e 100644 --- a/tokio-stream/src/wrappers/interval.rs +++ b/tokio-stream/src/wrappers/interval.rs @@ -5,6 +5,26 @@ use tokio::time::{Instant, Interval}; /// A wrapper around [`Interval`] that implements [`Stream`]. /// +/// # Example +/// +/// ``` +/// use tokio::time::{Duration, Instant, interval}; +/// use tokio_stream::wrappers::IntervalStream; +/// use tokio_stream::StreamExt; +/// +/// # #[tokio::main(flavor = "current_thread")] +/// # async fn main() { +/// let start = Instant::now(); +/// let interval = interval(Duration::from_millis(10)); +/// let mut stream = IntervalStream::new(interval); +/// for _ in 0..3 { +/// if let Some(instant) = stream.next().await { +/// println!("elapsed: {:.1?}", instant.duration_since(start)); +/// } +/// } +/// # } +/// ``` +/// /// [`Interval`]: struct@tokio::time::Interval /// [`Stream`]: trait@crate::Stream #[derive(Debug)] diff --git a/tokio-stream/src/wrappers/lines.rs b/tokio-stream/src/wrappers/lines.rs index 4850429a72d..57b41fbc736 100644 --- a/tokio-stream/src/wrappers/lines.rs +++ b/tokio-stream/src/wrappers/lines.rs @@ -8,6 +8,24 @@ use tokio::io::{AsyncBufRead, Lines}; pin_project! { /// A wrapper around [`tokio::io::Lines`] that implements [`Stream`]. /// + /// # Example + /// + /// ``` + /// use tokio::io::AsyncBufReadExt; + /// use tokio_stream::wrappers::LinesStream; + /// use tokio_stream::StreamExt; + /// + /// # #[tokio::main(flavor = "current_thread")] + /// # async fn main() -> std::io::Result<()> { + /// let input = b"Hello\nWorld\n"; + /// let mut stream = LinesStream::new(input.lines()); + /// while let Some(line) = stream.next().await { + /// println!("{}", line?); + /// } + /// # Ok(()) + /// # } + /// ``` + /// /// [`tokio::io::Lines`]: struct@tokio::io::Lines /// [`Stream`]: trait@crate::Stream #[derive(Debug)] diff --git a/tokio-stream/src/wrappers/mpsc_bounded.rs b/tokio-stream/src/wrappers/mpsc_bounded.rs index 18d799e98b1..34b2e020d75 100644 --- a/tokio-stream/src/wrappers/mpsc_bounded.rs +++ b/tokio-stream/src/wrappers/mpsc_bounded.rs @@ -5,6 +5,29 @@ use tokio::sync::mpsc::Receiver; /// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`]. /// +/// # Example +/// +/// ``` +/// use tokio::sync::mpsc; +/// use tokio_stream::wrappers::ReceiverStream; +/// use tokio_stream::StreamExt; +/// +/// # #[tokio::main(flavor = "current_thread")] +/// # async fn main() -> Result<(), tokio::sync::mpsc::error::SendError> { +/// let (tx, rx) = mpsc::channel(2); +/// tx.send(10).await?; +/// tx.send(20).await?; +/// # // prevent the doc test from hanging +/// drop(tx); +/// +/// let mut stream = ReceiverStream::new(rx); +/// assert_eq!(stream.next().await, Some(10)); +/// assert_eq!(stream.next().await, Some(20)); +/// assert_eq!(stream.next().await, None); +/// # Ok(()) +/// # } +/// ``` +/// /// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver /// [`Stream`]: trait@crate::Stream #[derive(Debug)] diff --git a/tokio-stream/src/wrappers/mpsc_unbounded.rs b/tokio-stream/src/wrappers/mpsc_unbounded.rs index 6945b08717c..904c98bef95 100644 --- a/tokio-stream/src/wrappers/mpsc_unbounded.rs +++ b/tokio-stream/src/wrappers/mpsc_unbounded.rs @@ -5,6 +5,29 @@ use tokio::sync::mpsc::UnboundedReceiver; /// A wrapper around [`tokio::sync::mpsc::UnboundedReceiver`] that implements [`Stream`]. /// +/// # Example +/// +/// ``` +/// use tokio::sync::mpsc; +/// use tokio_stream::wrappers::UnboundedReceiverStream; +/// use tokio_stream::StreamExt; +/// +/// # #[tokio::main(flavor = "current_thread")] +/// # async fn main() -> Result<(), tokio::sync::mpsc::error::SendError> { +/// let (tx, rx) = mpsc::unbounded_channel(); +/// tx.send(10)?; +/// tx.send(20)?; +/// # // prevent the doc test from hanging +/// drop(tx); +/// +/// let mut stream = UnboundedReceiverStream::new(rx); +/// assert_eq!(stream.next().await, Some(10)); +/// assert_eq!(stream.next().await, Some(20)); +/// assert_eq!(stream.next().await, None); +/// # Ok(()) +/// # } +/// ``` +/// /// [`tokio::sync::mpsc::UnboundedReceiver`]: struct@tokio::sync::mpsc::UnboundedReceiver /// [`Stream`]: trait@crate::Stream #[derive(Debug)] diff --git a/tokio-stream/src/wrappers/read_dir.rs b/tokio-stream/src/wrappers/read_dir.rs index b5cf54f79e1..21522c03d2d 100644 --- a/tokio-stream/src/wrappers/read_dir.rs +++ b/tokio-stream/src/wrappers/read_dir.rs @@ -6,6 +6,24 @@ use tokio::fs::{DirEntry, ReadDir}; /// A wrapper around [`tokio::fs::ReadDir`] that implements [`Stream`]. /// +/// # Example +/// +/// ``` +/// use tokio::fs::read_dir; +/// use tokio_stream::{StreamExt, wrappers::ReadDirStream}; +/// +/// # #[tokio::main(flavor = "current_thread")] +/// # async fn main() -> std::io::Result<()> { +/// let dirs = read_dir(".").await?; +/// let mut dirs = ReadDirStream::new(dirs); +/// while let Some(dir) = dirs.next().await { +/// let dir = dir?; +/// println!("{}", dir.path().display()); +/// } +/// # Ok(()) +/// # } +/// ``` +/// /// [`tokio::fs::ReadDir`]: struct@tokio::fs::ReadDir /// [`Stream`]: trait@crate::Stream #[derive(Debug)] diff --git a/tokio-stream/src/wrappers/signal_unix.rs b/tokio-stream/src/wrappers/signal_unix.rs index 6dcdff7fc55..ac873f92b3f 100644 --- a/tokio-stream/src/wrappers/signal_unix.rs +++ b/tokio-stream/src/wrappers/signal_unix.rs @@ -5,6 +5,22 @@ use tokio::signal::unix::Signal; /// A wrapper around [`Signal`] that implements [`Stream`]. /// +/// # Example +/// +/// ```no_run +/// use tokio::signal::unix::{signal, SignalKind}; +/// use tokio_stream::{StreamExt, wrappers::SignalStream}; +/// +/// # #[tokio::main(flavor = "current_thread")] +/// # async fn main() -> std::io::Result<()> { +/// let signals = signal(SignalKind::hangup())?; +/// let mut stream = SignalStream::new(signals); +/// while stream.next().await.is_some() { +/// println!("hangup signal received"); +/// } +/// # Ok(()) +/// # } +/// ``` /// [`Signal`]: struct@tokio::signal::unix::Signal /// [`Stream`]: trait@crate::Stream #[derive(Debug)] diff --git a/tokio-stream/src/wrappers/signal_windows.rs b/tokio-stream/src/wrappers/signal_windows.rs index 4631fbad8dc..168ed181d34 100644 --- a/tokio-stream/src/wrappers/signal_windows.rs +++ b/tokio-stream/src/wrappers/signal_windows.rs @@ -7,6 +7,23 @@ use tokio::signal::windows::{CtrlBreak, CtrlC}; /// /// [`CtrlC`]: struct@tokio::signal::windows::CtrlC /// [`Stream`]: trait@crate::Stream +/// +/// # Example +/// +/// ```no_run +/// use tokio::signal::windows::ctrl_c; +/// use tokio_stream::{StreamExt, wrappers::CtrlCStream}; +/// +/// # #[tokio::main(flavor = "current_thread")] +/// # async fn main() -> std::io::Result<()> { +/// let signals = ctrl_c()?; +/// let mut stream = CtrlCStream::new(signals); +/// while stream.next().await.is_some() { +/// println!("ctrl-c received"); +/// } +/// # Ok(()) +/// # } +/// ``` #[derive(Debug)] #[cfg_attr(docsrs, doc(cfg(all(windows, feature = "signal"))))] pub struct CtrlCStream { @@ -47,6 +64,23 @@ impl AsMut for CtrlCStream { /// A wrapper around [`CtrlBreak`] that implements [`Stream`]. /// +/// # Example +/// +/// ```no_run +/// use tokio::signal::windows::ctrl_break; +/// use tokio_stream::{StreamExt, wrappers::CtrlBreakStream}; +/// +/// # #[tokio::main(flavor = "current_thread")] +/// # async fn main() -> std::io::Result<()> { +/// let signals = ctrl_break()?; +/// let mut stream = CtrlBreakStream::new(signals); +/// while stream.next().await.is_some() { +/// println!("ctrl-break received"); +/// } +/// # Ok(()) +/// # } +/// ``` +/// /// [`CtrlBreak`]: struct@tokio::signal::windows::CtrlBreak /// [`Stream`]: trait@crate::Stream #[derive(Debug)] diff --git a/tokio-stream/src/wrappers/split.rs b/tokio-stream/src/wrappers/split.rs index ac46a8ba6ff..5d6d77b6787 100644 --- a/tokio-stream/src/wrappers/split.rs +++ b/tokio-stream/src/wrappers/split.rs @@ -8,6 +8,24 @@ use tokio::io::{AsyncBufRead, Split}; pin_project! { /// A wrapper around [`tokio::io::Split`] that implements [`Stream`]. /// + /// # Example + /// + /// ``` + /// use tokio::io::AsyncBufReadExt; + /// use tokio_stream::{StreamExt, wrappers::SplitStream}; + /// + /// # #[tokio::main(flavor = "current_thread")] + /// # async fn main() -> std::io::Result<()> { + /// let input = "Hello\nWorld\n".as_bytes(); + /// let lines = AsyncBufReadExt::split(input, b'\n'); + /// + /// let mut stream = SplitStream::new(lines); + /// while let Some(line) = stream.next().await { + /// println!("length = {}", line?.len()) + /// } + /// # Ok(()) + /// # } + /// ``` /// [`tokio::io::Split`]: struct@tokio::io::Split /// [`Stream`]: trait@crate::Stream #[derive(Debug)] diff --git a/tokio-stream/src/wrappers/tcp_listener.rs b/tokio-stream/src/wrappers/tcp_listener.rs index ce7cb163507..c463ef1426c 100644 --- a/tokio-stream/src/wrappers/tcp_listener.rs +++ b/tokio-stream/src/wrappers/tcp_listener.rs @@ -6,6 +6,33 @@ use tokio::net::{TcpListener, TcpStream}; /// A wrapper around [`TcpListener`] that implements [`Stream`]. /// +/// # Example +/// +/// Accept connections from both IPv4 and IPv6 listeners in the same loop: +/// +/// ```no_run +/// use std::net::{Ipv4Addr, Ipv6Addr}; +/// +/// use tokio::net::TcpListener; +/// use tokio_stream::{StreamExt, wrappers::TcpListenerStream}; +/// +/// # #[tokio::main(flavor = "current_thread")] +/// # async fn main() -> std::io::Result<()> { +/// let ipv4_listener = TcpListener::bind((Ipv6Addr::LOCALHOST, 8080)).await?; +/// let ipv6_listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 8080)).await?; +/// let ipv4_connections = TcpListenerStream::new(ipv4_listener); +/// let ipv6_connections = TcpListenerStream::new(ipv6_listener); +/// +/// let mut connections = ipv4_connections.chain(ipv6_connections); +/// while let Some(tcp_stream) = connections.next().await { +/// let stream = tcp_stream?; +/// let peer_addr = stream.peer_addr()?; +/// println!("accepted connection; peer address = {peer_addr}"); +/// } +/// # Ok(()) +/// # } +/// ``` +/// /// [`TcpListener`]: struct@tokio::net::TcpListener /// [`Stream`]: trait@crate::Stream #[derive(Debug)] diff --git a/tokio-stream/src/wrappers/unix_listener.rs b/tokio-stream/src/wrappers/unix_listener.rs index 0beba588c20..6c4cd43eb12 100644 --- a/tokio-stream/src/wrappers/unix_listener.rs +++ b/tokio-stream/src/wrappers/unix_listener.rs @@ -6,6 +6,25 @@ use tokio::net::{UnixListener, UnixStream}; /// A wrapper around [`UnixListener`] that implements [`Stream`]. /// +/// # Example +/// +/// ```no_run +/// use tokio::net::UnixListener; +/// use tokio_stream::{StreamExt, wrappers::UnixListenerStream}; +/// +/// # #[tokio::main(flavor = "current_thread")] +/// # async fn main() -> std::io::Result<()> { +/// let listener = UnixListener::bind("/tmp/sock")?; +/// let mut incoming = UnixListenerStream::new(listener); +/// +/// while let Some(stream) = incoming.next().await { +/// let stream = stream?; +/// let peer_addr = stream.peer_addr()?; +/// println!("Accepted connection from: {peer_addr:?}"); +/// } +/// # Ok(()) +/// # } +/// ``` /// [`UnixListener`]: struct@tokio::net::UnixListener /// [`Stream`]: trait@crate::Stream #[derive(Debug)]