From d0dd74a0583b3ed7c8c7f68a4279fe9709bbf158 Mon Sep 17 00:00:00 2001 From: ttys3 <41882455+ttys3@users.noreply.github.com> Date: Thu, 2 Sep 2021 15:53:23 +0800 Subject: [PATCH] io: add with_capacity for ReaderStream (#4086) --- tokio-util/src/io/reader_stream.rs | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/tokio-util/src/io/reader_stream.rs b/tokio-util/src/io/reader_stream.rs index 1f234c9f68f..866c11408d5 100644 --- a/tokio-util/src/io/reader_stream.rs +++ b/tokio-util/src/io/reader_stream.rs @@ -5,7 +5,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::AsyncRead; -const CAPACITY: usize = 4096; +const DEFAULT_CAPACITY: usize = 4096; pin_project! { /// Convert an [`AsyncRead`] into a [`Stream`] of byte chunks. @@ -50,6 +50,7 @@ pin_project! { reader: Option, // Working buffer, used to optimize allocations. buf: BytesMut, + capacity: usize, } } @@ -63,6 +64,21 @@ impl ReaderStream { ReaderStream { reader: Some(reader), buf: BytesMut::new(), + capacity: DEFAULT_CAPACITY, + } + } + + /// Convert an [`AsyncRead`] into a [`Stream`] with item type + /// `Result`, + /// with a specific read buffer initial capacity. + /// + /// [`AsyncRead`]: tokio::io::AsyncRead + /// [`Stream`]: futures_core::Stream + pub fn with_capacity(reader: R, capacity: usize) -> Self { + ReaderStream { + reader: Some(reader), + buf: BytesMut::with_capacity(capacity), + capacity, } } } @@ -80,7 +96,7 @@ impl Stream for ReaderStream { }; if this.buf.capacity() == 0 { - this.buf.reserve(CAPACITY); + this.buf.reserve(*this.capacity); } match poll_read_buf(reader, cx, &mut this.buf) {