Skip to content

Commit

Permalink
io: add SimplexStream (#6589)
Browse files Browse the repository at this point in the history
  • Loading branch information
wutchzone authored Aug 19, 2024
1 parent 5b9a290 commit ff3f2a8
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 17 deletions.
4 changes: 2 additions & 2 deletions tokio/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ cfg_io_util! {
pub(crate) mod seek;
pub(crate) mod util;
pub use util::{
copy, copy_bidirectional, copy_bidirectional_with_sizes, copy_buf, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt,
BufReader, BufStream, BufWriter, DuplexStream, Empty, Lines, Repeat, Sink, Split, Take,
copy, copy_bidirectional, copy_bidirectional_with_sizes, copy_buf, duplex, empty, repeat, sink, simplex, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt,
BufReader, BufStream, BufWriter, DuplexStream, Empty, Lines, Repeat, Sink, Split, Take, SimplexStream,
};
}

Expand Down
83 changes: 69 additions & 14 deletions tokio/src/io/util/mem.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! In-process memory IO types.
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
use crate::io::{split, AsyncRead, AsyncWrite, ReadBuf, ReadHalf, WriteHalf};
use crate::loom::sync::Mutex;

use bytes::{Buf, BytesMut};
Expand Down Expand Up @@ -47,15 +47,34 @@ use std::{
#[derive(Debug)]
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub struct DuplexStream {
read: Arc<Mutex<Pipe>>,
write: Arc<Mutex<Pipe>>,
read: Arc<Mutex<SimplexStream>>,
write: Arc<Mutex<SimplexStream>>,
}

/// A unidirectional IO over a piece of memory.
/// A unidirectional pipe to read and write bytes in memory.
///
/// Data can be written to the pipe, and reading will return that data.
/// It can be constructed by [`simplex`] function which will create a pair of
/// reader and writer or by calling [`SimplexStream::new_unsplit`] that will
/// create a handle for both reading and writing.
///
/// # Example
///
/// ```
/// # async fn ex() -> std::io::Result<()> {
/// # use tokio::io::{AsyncReadExt, AsyncWriteExt};
/// let (mut receiver, mut sender) = tokio::io::simplex(64);
///
/// sender.write_all(b"ping").await?;
///
/// let mut buf = [0u8; 4];
/// receiver.read_exact(&mut buf).await?;
/// assert_eq!(&buf, b"ping");
/// # Ok(())
/// # }
/// ```
#[derive(Debug)]
struct Pipe {
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub struct SimplexStream {
/// The buffer storing the bytes written, also read from.
///
/// Using a `BytesMut` because it has efficient `Buf` and `BufMut`
Expand Down Expand Up @@ -83,8 +102,8 @@ struct Pipe {
/// written to a side before the write returns `Poll::Pending`.
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub fn duplex(max_buf_size: usize) -> (DuplexStream, DuplexStream) {
let one = Arc::new(Mutex::new(Pipe::new(max_buf_size)));
let two = Arc::new(Mutex::new(Pipe::new(max_buf_size)));
let one = Arc::new(Mutex::new(SimplexStream::new_unsplit(max_buf_size)));
let two = Arc::new(Mutex::new(SimplexStream::new_unsplit(max_buf_size)));

(
DuplexStream {
Expand Down Expand Up @@ -161,11 +180,47 @@ impl Drop for DuplexStream {
}
}

// ===== impl Pipe =====
// ===== impl SimplexStream =====

/// Creates unidirectional buffer that acts like in memory pipe.
///
/// The `max_buf_size` argument is the maximum amount of bytes that can be
/// written to a buffer before the it returns `Poll::Pending`.
///
/// # Unify reader and writer
///
/// The reader and writer half can be unified into a single structure
/// of `SimplexStream` that supports both reading and writing or
/// the `SimplexStream` can be already created as unified structure
/// using [`SimplexStream::new_unsplit()`].
///
/// ```
/// # async fn ex() -> std::io::Result<()> {
/// # use tokio::io::{AsyncReadExt, AsyncWriteExt};
/// let (writer, reader) = tokio::io::simplex(64);
/// let mut simplex_stream = writer.unsplit(reader);
/// simplex_stream.write_all(b"hello").await?;
///
/// let mut buf = [0u8; 5];
/// simplex_stream.read_exact(&mut buf).await?;
/// assert_eq!(&buf, b"hello");
/// # Ok(())
/// # }
/// ```
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub fn simplex(max_buf_size: usize) -> (ReadHalf<SimplexStream>, WriteHalf<SimplexStream>) {
split(SimplexStream::new_unsplit(max_buf_size))
}

impl Pipe {
fn new(max_buf_size: usize) -> Self {
Pipe {
impl SimplexStream {
/// Creates unidirectional buffer that acts like in memory pipe. To create split
/// version with separate reader and writer you can use [`simplex`] function.
///
/// The `max_buf_size` argument is the maximum amount of bytes that can be
/// written to a buffer before the it returns `Poll::Pending`.
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub fn new_unsplit(max_buf_size: usize) -> SimplexStream {
SimplexStream {
buffer: BytesMut::new(),
is_closed: false,
max_buf_size,
Expand Down Expand Up @@ -269,7 +324,7 @@ impl Pipe {
}
}

impl AsyncRead for Pipe {
impl AsyncRead for SimplexStream {
cfg_coop! {
fn poll_read(
self: Pin<&mut Self>,
Expand Down Expand Up @@ -299,7 +354,7 @@ impl AsyncRead for Pipe {
}
}

impl AsyncWrite for Pipe {
impl AsyncWrite for SimplexStream {
cfg_coop! {
fn poll_write(
self: Pin<&mut Self>,
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/io/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ cfg_io_util! {
pub use lines::Lines;

mod mem;
pub use mem::{duplex, DuplexStream};
pub use mem::{duplex, simplex, DuplexStream, SimplexStream};

mod read;
mod read_buf;
Expand Down

0 comments on commit ff3f2a8

Please sign in to comment.