From ee9c5910f2a417ac298e64815d8e3f4c74579a75 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 26 Jun 2019 12:39:40 -0700 Subject: [PATCH] tokio: move I/O helpers to ext traits Refs: #1203 --- tokio/src/io/async_read_ext.rs | 64 +++++++++++++ tokio/src/io/async_write_ext.rs | 24 +++++ tokio/src/io/copy.rs | 38 ++------ tokio/src/io/mod.rs | 21 ++--- tokio/src/io/read.rs | 2 +- tokio/src/io/read_exact.rs | 12 +-- tokio/src/io/write.rs | 2 +- tokio/src/lib.rs | 9 ++ tokio/tests/io.rs | 158 -------------------------------- tokio/tests/io_copy.rs | 71 ++++++++++++++ tokio/tests/io_read.rs | 41 +++++++++ tokio/tests/io_read_exact.rs | 46 ++++++++++ tokio/tests/io_write.rs | 48 ++++++++++ 13 files changed, 322 insertions(+), 214 deletions(-) create mode 100644 tokio/src/io/async_read_ext.rs create mode 100644 tokio/src/io/async_write_ext.rs create mode 100644 tokio/tests/io_copy.rs create mode 100644 tokio/tests/io_read.rs create mode 100644 tokio/tests/io_read_exact.rs create mode 100644 tokio/tests/io_write.rs diff --git a/tokio/src/io/async_read_ext.rs b/tokio/src/io/async_read_ext.rs new file mode 100644 index 00000000000..cca0906a191 --- /dev/null +++ b/tokio/src/io/async_read_ext.rs @@ -0,0 +1,64 @@ +use crate::io::copy::{copy, Copy}; +use crate::io::read::{read, Read}; +use crate::io::read_exact::{read_exact, ReadExact}; + +use tokio_io::{AsyncRead, AsyncWrite}; + +/// An extension trait which adds utility methods to `AsyncRead` types. +pub trait AsyncReadExt: AsyncRead { + + /// Copy all data from `self` into the provided `AsyncWrite`. + /// + /// The returned future will copy all the bytes read from `reader` into the + /// `writer` specified. This future will only complete once the `reader` + /// has hit EOF and all bytes have been written to and flushed from the + /// `writer` provided. + /// + /// On success the number of bytes is returned and the `reader` and `writer` + /// are consumed. On error the error is returned and the I/O objects are + /// consumed as well. + /// + /// # Examples + /// + /// ``` + /// unimplemented!(); + /// ``` + fn copy<'a, W>(&'a mut self, dst: &'a mut W) -> Copy<'a, Self, W> + where + Self: Unpin, + W: AsyncWrite + Unpin + ?Sized, + { + copy(self, dst) + } + + /// Read data into the provided buffer. + /// + /// The returned future will resolve to the number of bytes read once the + /// read operation is completed. + /// + /// # Examples + /// + /// ``` + /// unimplemented!(); + /// ``` + fn read<'a>(&'a mut self, dst: &'a mut [u8]) -> Read<'a, Self> + where Self: Unpin, + { + read(self, dst) + } + + /// Read exactly the amount of data needed to fill the provided buffer. + /// + /// # Examples + /// + /// ``` + /// unimplemented!(); + /// ``` + fn read_exact<'a>(&'a mut self, dst: &'a mut [u8]) -> ReadExact<'a, Self> + where Self: Unpin, + { + read_exact(self, dst) + } +} + +impl AsyncReadExt for R {} diff --git a/tokio/src/io/async_write_ext.rs b/tokio/src/io/async_write_ext.rs new file mode 100644 index 00000000000..759dac991d1 --- /dev/null +++ b/tokio/src/io/async_write_ext.rs @@ -0,0 +1,24 @@ +use crate::io::write::{write, Write}; + +use tokio_io::AsyncWrite; + +/// An extension trait which adds utility methods to `AsyncWrite` types. +pub trait AsyncWriteExt: AsyncWrite { + /// Write the provided data into `self`. + /// + /// The returned future will resolve to the number of bytes written once the + /// write operation is completed. + /// + /// # Examples + /// + /// ``` + /// unimplemented!(); + /// ```` + fn write<'a>(&'a mut self, src: &'a [u8]) -> Write<'a, Self> + where Self: Unpin, + { + write(self, src) + } +} + +impl AsyncWriteExt for W {} diff --git a/tokio/src/io/copy.rs b/tokio/src/io/copy.rs index b407c193b85..cc8e71dd6d0 100644 --- a/tokio/src/io/copy.rs +++ b/tokio/src/io/copy.rs @@ -4,23 +4,8 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; -macro_rules! ready { - ($e:expr) => { - match $e { - ::std::task::Poll::Ready(t) => t, - ::std::task::Poll::Pending => return ::std::task::Poll::Pending, - } - }; -} - -/// A future which will copy all data from a reader into a writer. -/// -/// Created by the [`copy`] function, this future will resolve to the number of -/// bytes copied or an error if one happens. -/// -/// [`copy`]: fn.copy.html #[derive(Debug)] -pub struct Copy<'a, R, W> { +pub struct Copy<'a, R: ?Sized, W: ?Sized> { reader: &'a mut R, read_done: bool, writer: &'a mut W, @@ -30,21 +15,10 @@ pub struct Copy<'a, R, W> { buf: Box<[u8]>, } -/// Creates a future which represents copying all the bytes from one object to -/// another. -/// -/// The returned future will copy all the bytes read from `reader` into the -/// `writer` specified. This future will only complete once the `reader` has hit -/// EOF and all bytes have been written to and flushed from the `writer` -/// provided. -/// -/// On success the number of bytes is returned and the `reader` and `writer` are -/// consumed. On error the error is returned and the I/O objects are consumed as -/// well. -pub fn copy<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> Copy<'a, R, W> +pub(crate) fn copy<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> Copy<'a, R, W> where - R: AsyncRead + Unpin, - W: AsyncWrite + Unpin, + R: AsyncRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, { Copy { reader, @@ -59,8 +33,8 @@ where impl<'a, R, W> Future for Copy<'a, R, W> where - R: AsyncRead + Unpin, - W: AsyncWrite + Unpin, + R: AsyncRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, { type Output = io::Result; diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 9b3d854ff2f..20730811c55 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -36,22 +36,21 @@ //! [`ErrorKind`]: enum.ErrorKind.html //! [`Result`]: type.Result.html -pub use tokio_io::{AsyncRead, AsyncWrite}; +mod async_read_ext; +mod async_write_ext; +mod copy; +mod read; +mod write; +mod read_exact; + +pub use self::async_read_ext::AsyncReadExt; +pub use self::async_write_ext::AsyncWriteExt; // standard input, output, and error #[cfg(feature = "fs")] pub use tokio_fs::{stderr, stdin, stdout, Stderr, Stdin, Stdout}; +pub use tokio_io::{AsyncRead, AsyncWrite}; // Re-export io::Error so that users don't have to deal // with conflicts when `use`ing `futures::io` and `std::io`. pub use std::io::{Error, ErrorKind, Result}; - -mod copy; -mod read; -mod write; -mod read_exact; - -pub use self::copy::{copy, Copy}; -pub use self::read::{read, Read}; -pub use self::write::{write, Write}; -pub use self::read_exact::{read_exact, ReadExact}; diff --git a/tokio/src/io/read.rs b/tokio/src/io/read.rs index 5713757150c..320c5edd8e7 100644 --- a/tokio/src/io/read.rs +++ b/tokio/src/io/read.rs @@ -10,7 +10,7 @@ use tokio_io::AsyncRead; /// /// The returned future will resolve to both the I/O stream and the buffer /// as well as the number of bytes read once the read operation is completed. -pub fn read<'a, R>(reader: &'a mut R, buf: &'a mut [u8]) -> Read<'a, R> +pub(crate) fn read<'a, R>(reader: &'a mut R, buf: &'a mut [u8]) -> Read<'a, R> where R: AsyncRead + Unpin + ?Sized, { diff --git a/tokio/src/io/read_exact.rs b/tokio/src/io/read_exact.rs index e8a66a47dd1..c8b2f88460e 100644 --- a/tokio/src/io/read_exact.rs +++ b/tokio/src/io/read_exact.rs @@ -5,23 +5,13 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tokio_io::AsyncRead; -macro_rules! ready { - ($e:expr) => { - match $e { - ::std::task::Poll::Ready(t) => t, - ::std::task::Poll::Pending => return ::std::task::Poll::Pending, - } - }; -} - - /// A future which can be used to easily read exactly enough bytes to fill /// a buffer. /// /// Created by the [`read_exact`] function. /// /// [`read_exact`]: fn.read_exact.html -pub fn read_exact<'a, A>(reader: &'a mut A, buf: &'a mut[u8]) -> ReadExact<'a, A> +pub(crate) fn read_exact<'a, A>(reader: &'a mut A, buf: &'a mut[u8]) -> ReadExact<'a, A> where A: AsyncRead + Unpin + ?Sized { diff --git a/tokio/src/io/write.rs b/tokio/src/io/write.rs index 424f478ded6..2ccc61b5bea 100644 --- a/tokio/src/io/write.rs +++ b/tokio/src/io/write.rs @@ -14,7 +14,7 @@ pub struct Write<'a, W: ?Sized> { /// Tries to write some bytes from the given `buf` to the writer in an /// asynchronous manner, returning a future. -pub fn write<'a, W>(writer: &'a mut W, buf: &'a [u8]) -> Write<'a, W> +pub(crate) fn write<'a, W>(writer: &'a mut W, buf: &'a [u8]) -> Write<'a, W> where W: AsyncWrite + Unpin + ?Sized, { diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 89f051334f1..d554ab00779 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -74,6 +74,15 @@ macro_rules! if_runtime { )*) } +macro_rules! ready { + ($e:expr) => { + match $e { + ::std::task::Poll::Ready(t) => t, + ::std::task::Poll::Pending => return ::std::task::Poll::Pending, + } + }; +} + #[cfg(feature = "timer")] pub mod clock; #[cfg(feature = "codec")] diff --git a/tokio/tests/io.rs b/tokio/tests/io.rs index 5bf52d60f52..8224c59825e 100644 --- a/tokio/tests/io.rs +++ b/tokio/tests/io.rs @@ -8,162 +8,4 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio_test::assert_ready_ok; use tokio_test::task::MockTask; -#[test] -fn write() { - struct Wr(BytesMut); - impl AsyncWrite for Wr { - fn poll_write( - mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - self.0.extend(buf); - Ok(buf.len()).into() - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Ok(()).into() - } - - fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Ok(()).into() - } - } - - let mut task = MockTask::new(); - - task.enter(|cx| { - let mut wr = Wr(BytesMut::with_capacity(64)); - - let write = tokio::io::write(&mut wr, "hello world".as_bytes()); - pin_mut!(write); - - let n = assert_ready_ok!(write.poll(cx)); - assert_eq!(n, 11); - }); -} - -#[test] -fn read() { - struct Rd; - - impl AsyncRead for Rd { - fn poll_read( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - buf[0..11].copy_from_slice(b"hello world"); - Poll::Ready(Ok(11)) - } - } - - let mut buf = Box::new([0; 11]); - let mut task = MockTask::new(); - - task.enter(|cx| { - let mut rd = Rd; - - let read = tokio::io::read(&mut rd, &mut buf[..]); - pin_mut!(read); - - let n = assert_ready_ok!(read.poll(cx)); - assert_eq!(n, 11); - assert_eq!(buf[..], b"hello world"[..]); - }); -} - -#[test] -fn copy() { - struct Rd(bool); - - impl AsyncRead for Rd { - fn poll_read( - mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - if self.0 { - buf[0..11].copy_from_slice(b"hello world"); - self.0 = false; - Poll::Ready(Ok(11)) - } else { - Poll::Ready(Ok(0)) - } - } - } - - struct Wr(BytesMut); - - impl Unpin for Wr {} - impl AsyncWrite for Wr { - fn poll_write( - mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - self.0.extend(buf); - Ok(buf.len()).into() - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Ok(()).into() - } - - fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Ok(()).into() - } - } - - let buf = BytesMut::with_capacity(64); - let mut task = MockTask::new(); - - task.enter(|cx| { - let mut rd = Rd(true); - let mut wr = Wr(buf); - - let copy = tokio::io::copy(&mut rd, &mut wr); - pin_mut!(copy); - - let n = assert_ready_ok!(copy.poll(cx)); - - assert_eq!(n, 11); - assert_eq!(wr.0[..], b"hello world"[..]); - }); -} - -#[test] -fn read_exact() { - struct Rd { - val: &'static [u8; 11], - } - - impl AsyncRead for Rd { - fn poll_read( - mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, - buf: &mut [u8] - ) -> Poll> { - let me = &mut *self; - let len = buf.len(); - - buf[..].copy_from_slice(&me.val[..len]); - Poll::Ready(Ok(buf.len())) - } - } - - let mut buf = Box::new([0; 8]); - let mut task = MockTask::new(); - - task.enter(|cx| { - let mut rd = Rd { val: b"hello world" }; - - let read = tokio::io::read_exact(&mut rd, &mut buf[..]); - pin_mut!(read); - - let n = assert_ready_ok!(read.poll(cx)); - assert_eq!(n, 8); - assert_eq!(buf[..], b"hello wo"[..]); - }); -} diff --git a/tokio/tests/io_copy.rs b/tokio/tests/io_copy.rs new file mode 100644 index 00000000000..ee9726812fb --- /dev/null +++ b/tokio/tests/io_copy.rs @@ -0,0 +1,71 @@ +#![deny(warnings, rust_2018_idioms)] + +use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt}; +use tokio_test::assert_ready_ok; +use tokio_test::task::MockTask; + +use bytes::BytesMut; +use pin_utils::pin_mut; +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[test] +fn copy() { + struct Rd(bool); + + impl AsyncRead for Rd { + fn poll_read( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + if self.0 { + buf[0..11].copy_from_slice(b"hello world"); + self.0 = false; + Poll::Ready(Ok(11)) + } else { + Poll::Ready(Ok(0)) + } + } + } + + struct Wr(BytesMut); + + impl Unpin for Wr {} + impl AsyncWrite for Wr { + fn poll_write( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.0.extend(buf); + Ok(buf.len()).into() + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Ok(()).into() + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Ok(()).into() + } + } + + let buf = BytesMut::with_capacity(64); + let mut task = MockTask::new(); + + task.enter(|cx| { + let mut rd = Rd(true); + let mut wr = Wr(buf); + + let copy = rd.copy(&mut wr); + pin_mut!(copy); + + let n = assert_ready_ok!(copy.poll(cx)); + + assert_eq!(n, 11); + assert_eq!(wr.0[..], b"hello world"[..]); + }); +} diff --git a/tokio/tests/io_read.rs b/tokio/tests/io_read.rs new file mode 100644 index 00000000000..8544a6c614c --- /dev/null +++ b/tokio/tests/io_read.rs @@ -0,0 +1,41 @@ +#![deny(warnings, rust_2018_idioms)] + +use tokio::io::{AsyncRead, AsyncReadExt}; +use tokio_test::assert_ready_ok; +use tokio_test::task::MockTask; + +use pin_utils::pin_mut; +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[test] +fn read() { + struct Rd; + + impl AsyncRead for Rd { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + buf[0..11].copy_from_slice(b"hello world"); + Poll::Ready(Ok(11)) + } + } + + let mut buf = Box::new([0; 11]); + let mut task = MockTask::new(); + + task.enter(|cx| { + let mut rd = Rd; + + let read = rd.read(&mut buf[..]); + pin_mut!(read); + + let n = assert_ready_ok!(read.poll(cx)); + assert_eq!(n, 11); + assert_eq!(buf[..], b"hello world"[..]); + }); +} diff --git a/tokio/tests/io_read_exact.rs b/tokio/tests/io_read_exact.rs new file mode 100644 index 00000000000..94e355140dc --- /dev/null +++ b/tokio/tests/io_read_exact.rs @@ -0,0 +1,46 @@ +#![deny(warnings, rust_2018_idioms)] + +use tokio::io::{AsyncRead, AsyncReadExt}; +use tokio_test::assert_ready_ok; +use tokio_test::task::MockTask; + +use pin_utils::pin_mut; +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[test] +fn read_exact() { + struct Rd { + val: &'static [u8; 11], + } + + impl AsyncRead for Rd { + fn poll_read( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8] + ) -> Poll> { + let me = &mut *self; + let len = buf.len(); + + buf[..].copy_from_slice(&me.val[..len]); + Poll::Ready(Ok(buf.len())) + } + } + + let mut buf = Box::new([0; 8]); + let mut task = MockTask::new(); + + task.enter(|cx| { + let mut rd = Rd { val: b"hello world" }; + + let read = rd.read_exact(&mut buf[..]); + pin_mut!(read); + + let n = assert_ready_ok!(read.poll(cx)); + assert_eq!(n, 8); + assert_eq!(buf[..], b"hello wo"[..]); + }); +} diff --git a/tokio/tests/io_write.rs b/tokio/tests/io_write.rs new file mode 100644 index 00000000000..7d80ca55268 --- /dev/null +++ b/tokio/tests/io_write.rs @@ -0,0 +1,48 @@ +#![deny(warnings, rust_2018_idioms)] + +use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio_test::assert_ready_ok; +use tokio_test::task::MockTask; + +use bytes::BytesMut; +use pin_utils::pin_mut; +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[test] +fn write() { + struct Wr(BytesMut); + + impl AsyncWrite for Wr { + fn poll_write( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.0.extend(buf); + Ok(buf.len()).into() + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Ok(()).into() + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Ok(()).into() + } + } + + let mut task = MockTask::new(); + + task.enter(|cx| { + let mut wr = Wr(BytesMut::with_capacity(64)); + + let write = wr.write(b"hello world"); + pin_mut!(write); + + let n = assert_ready_ok!(write.poll(cx)); + assert_eq!(n, 11); + }); +}