Skip to content

Commit

Permalink
io: add with_capacity for ReaderStream (#4086)
Browse files Browse the repository at this point in the history
  • Loading branch information
ttys3 authored Sep 2, 2021
1 parent 6778a7d commit d0dd74a
Showing 1 changed file with 18 additions and 2 deletions.
20 changes: 18 additions & 2 deletions tokio-util/src/io/reader_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncRead;

const CAPACITY: usize = 4096;
const DEFAULT_CAPACITY: usize = 4096;

pin_project! {
/// Convert an [`AsyncRead`] into a [`Stream`] of byte chunks.
Expand Down Expand Up @@ -50,6 +50,7 @@ pin_project! {
reader: Option<R>,
// Working buffer, used to optimize allocations.
buf: BytesMut,
capacity: usize,
}
}

Expand All @@ -63,6 +64,21 @@ impl<R: AsyncRead> ReaderStream<R> {
ReaderStream {
reader: Some(reader),
buf: BytesMut::new(),
capacity: DEFAULT_CAPACITY,
}
}

/// Convert an [`AsyncRead`] into a [`Stream`] with item type
/// `Result<Bytes, std::io::Error>`,
/// with a specific read buffer initial capacity.
///
/// [`AsyncRead`]: tokio::io::AsyncRead
/// [`Stream`]: futures_core::Stream
pub fn with_capacity(reader: R, capacity: usize) -> Self {
ReaderStream {
reader: Some(reader),
buf: BytesMut::with_capacity(capacity),
capacity,
}
}
}
Expand All @@ -80,7 +96,7 @@ impl<R: AsyncRead> Stream for ReaderStream<R> {
};

if this.buf.capacity() == 0 {
this.buf.reserve(CAPACITY);
this.buf.reserve(*this.capacity);
}

match poll_read_buf(reader, cx, &mut this.buf) {
Expand Down

0 comments on commit d0dd74a

Please sign in to comment.