Skip to content

Commit

Permalink
Auto merge of #23820 - sfackler:fast_read_to_end, r=alexcrichton
Browse files Browse the repository at this point in the history
with_end_to_cap is enormously expensive now that it's initializing
memory since it involves 64k allocation + memset on every call. This is
most noticable when calling read_to_end on very small readers, where the
new version if **4 orders of magnitude** faster.

BufReader also depended on with_end_to_cap so I've rewritten it in its
original form.

As a bonus, converted the buffered IO struct Debug impls to use the
debug builders.

I first came across this in sfackler/rust-postgres#106 where a user reported a 10x performance regression. A call to read_to_end turned out to be the culprit: sfackler/rust-postgres@9cd413d.

The new version differs from the old in a couple of ways. The buffer size used is now adaptive. It starts at 32 bytes and doubles each time EOF hasn't been reached up to a limit of 64k. In addition, the buffer is only truncated when EOF or an error has been reached, rather than after every call to read as was the case for the old implementation.

I wrote up a benchmark to compare the old version and new version: https://gist.github.com/sfackler/e979711b0ee2f2063462

It tests a couple of different cases: a high bandwidth reader, a low bandwidth reader, and a low bandwidth reader that won't return more than 10k per call to `read`. The high bandwidth reader should be analagous to use cases when reading from e.g. a `BufReader` or `Vec`, and the low bandwidth readers should be analogous to reading from something like a `TcpStream`.

Of special note, reads from a high bandwith reader containing 4 bytes are now *4,495 times faster*. 
```
~/foo ❯ cargo bench
   Compiling foo v0.0.1 (file:///home/sfackler/foo)
     Running target/release/foo-7498d7dd7faecf5c

running 13 tests
test test_new ... ignored
test new_delay_4      ... bench:    230768 ns/iter (+/- 14812)
test new_delay_4_cap  ... bench:    231421 ns/iter (+/- 7211)
test new_delay_5m     ... bench:  14495370 ns/iter (+/- 4008648)
test new_delay_5m_cap ... bench:  73127954 ns/iter (+/- 59908587)
test new_nodelay_4    ... bench:        83 ns/iter (+/- 2)
test new_nodelay_5m   ... bench:  12527237 ns/iter (+/- 335243)
test std_delay_4      ... bench:    373095 ns/iter (+/- 12613)
test std_delay_4_cap  ... bench:    374190 ns/iter (+/- 19611)
test std_delay_5m     ... bench:  17356012 ns/iter (+/- 15906588)
test std_delay_5m_cap ... bench: 883555035 ns/iter (+/- 205559857)
test std_nodelay_4    ... bench:    144937 ns/iter (+/- 2448)
test std_nodelay_5m   ... bench:  16095893 ns/iter (+/- 3315116)

test result: ok. 0 passed; 0 failed; 1 ignored; 12 measured
```

r? @alexcrichton
  • Loading branch information
bors committed Mar 29, 2015
2 parents c5370be + ccb4e84 commit 92f3d9a
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 59 deletions.
71 changes: 43 additions & 28 deletions src/libstd/io/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use io::prelude::*;
use cmp;
use error::{self, FromError};
use fmt;
use io::{self, Cursor, DEFAULT_BUF_SIZE, Error, ErrorKind};
use io::{self, DEFAULT_BUF_SIZE, Error, ErrorKind};
use ptr;
use iter;

/// Wraps a `Read` and buffers input from it
///
Expand All @@ -30,7 +31,9 @@ use ptr;
#[stable(feature = "rust1", since = "1.0.0")]
pub struct BufReader<R> {
inner: R,
buf: Cursor<Vec<u8>>,
buf: Vec<u8>,
pos: usize,
cap: usize,
}

impl<R: Read> BufReader<R> {
Expand All @@ -43,9 +46,13 @@ impl<R: Read> BufReader<R> {
/// Creates a new `BufReader` with the specified buffer capacity
#[stable(feature = "rust1", since = "1.0.0")]
pub fn with_capacity(cap: usize, inner: R) -> BufReader<R> {
let mut buf = Vec::with_capacity(cap);
buf.extend(iter::repeat(0).take(cap));
BufReader {
inner: inner,
buf: Cursor::new(Vec::with_capacity(cap)),
buf: buf,
pos: 0,
cap: 0,
}
}

Expand Down Expand Up @@ -74,12 +81,15 @@ impl<R: Read> Read for BufReader<R> {
// If we don't have any buffered data and we're doing a massive read
// (larger than our internal buffer), bypass our internal buffer
// entirely.
if self.buf.get_ref().len() == self.buf.position() as usize &&
buf.len() >= self.buf.get_ref().capacity() {
if self.pos == self.cap && buf.len() >= self.buf.len() {
return self.inner.read(buf);
}
try!(self.fill_buf());
self.buf.read(buf)
let nread = {
let mut rem = try!(self.fill_buf());
try!(rem.read(buf))
};
self.consume(nread);
Ok(nread)
}
}

Expand All @@ -88,26 +98,25 @@ impl<R: Read> BufRead for BufReader<R> {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
// If we've reached the end of our internal buffer then we need to fetch
// some more data from the underlying reader.
if self.buf.position() as usize == self.buf.get_ref().len() {
self.buf.set_position(0);
let v = self.buf.get_mut();
v.truncate(0);
let inner = &mut self.inner;
try!(super::with_end_to_cap(v, |b| inner.read(b)));
if self.pos == self.cap {
self.cap = try!(self.inner.read(&mut self.buf));
self.pos = 0;
}
self.buf.fill_buf()
Ok(&self.buf[self.pos..self.cap])
}

fn consume(&mut self, amt: usize) {
self.buf.consume(amt)
self.pos = cmp::min(self.pos + amt, self.cap);
}
}

#[stable(feature = "rust1", since = "1.0.0")]
impl<R> fmt::Debug for BufReader<R> where R: fmt::Debug {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "BufReader {{ reader: {:?}, buffer: {}/{} }}",
self.inner, self.buf.position(), self.buf.get_ref().len())
fmt.debug_struct("BufReader")
.field("reader", &self.inner)
.field("buffer", &format_args!("{}/{}", self.cap - self.pos, self.buf.len()))
.finish()
}
}

Expand Down Expand Up @@ -222,8 +231,10 @@ impl<W: Write> Write for BufWriter<W> {
#[stable(feature = "rust1", since = "1.0.0")]
impl<W: Write> fmt::Debug for BufWriter<W> where W: fmt::Debug {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "BufWriter {{ writer: {:?}, buffer: {}/{} }}",
self.inner.as_ref().unwrap(), self.buf.len(), self.buf.capacity())
fmt.debug_struct("BufWriter")
.field("writer", &self.inner.as_ref().unwrap())
.field("buffer", &format_args!("{}/{}", self.buf.len(), self.buf.capacity()))
.finish()
}
}

Expand Down Expand Up @@ -337,9 +348,11 @@ impl<W: Write> Write for LineWriter<W> {
#[stable(feature = "rust1", since = "1.0.0")]
impl<W: Write> fmt::Debug for LineWriter<W> where W: fmt::Debug {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "LineWriter {{ writer: {:?}, buffer: {}/{} }}",
self.inner.inner, self.inner.buf.len(),
self.inner.buf.capacity())
fmt.debug_struct("LineWriter")
.field("writer", &self.inner.inner)
.field("buffer",
&format_args!("{}/{}", self.inner.buf.len(), self.inner.buf.capacity()))
.finish()
}
}

Expand Down Expand Up @@ -415,10 +428,10 @@ impl<S: Read + Write> BufStream<S> {
/// Any leftover data in the read buffer is lost.
#[stable(feature = "rust1", since = "1.0.0")]
pub fn into_inner(self) -> Result<S, IntoInnerError<BufStream<S>>> {
let BufReader { inner: InternalBufWriter(w), buf } = self.inner;
let BufReader { inner: InternalBufWriter(w), buf, pos, cap } = self.inner;
w.into_inner().map_err(|IntoInnerError(w, e)| {
IntoInnerError(BufStream {
inner: BufReader { inner: InternalBufWriter(w), buf: buf },
inner: BufReader { inner: InternalBufWriter(w), buf: buf, pos: pos, cap: cap },
}, e)
})
}
Expand Down Expand Up @@ -452,10 +465,12 @@ impl<S: Write> fmt::Debug for BufStream<S> where S: fmt::Debug {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let reader = &self.inner;
let writer = &self.inner.inner.0;
write!(fmt, "BufStream {{ stream: {:?}, write_buffer: {}/{}, read_buffer: {}/{} }}",
writer.inner,
writer.buf.len(), writer.buf.capacity(),
reader.buf.position(), reader.buf.get_ref().len())
fmt.debug_struct("BufStream")
.field("stream", &writer.inner)
.field("write_buffer", &format_args!("{}/{}", writer.buf.len(), writer.buf.capacity()))
.field("read_buffer",
&format_args!("{}/{}", reader.cap - reader.pos, reader.buf.len()))
.finish()
}
}

Expand Down
64 changes: 33 additions & 31 deletions src/libstd/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,30 +48,6 @@ mod stdio;

const DEFAULT_BUF_SIZE: usize = 64 * 1024;

// Acquires a slice of the vector `v` from its length to its capacity
// (after initializing the data), reads into it, and then updates the length.
//
// This function is leveraged to efficiently read some bytes into a destination
// vector without extra copying and taking advantage of the space that's already
// in `v`.
fn with_end_to_cap<F>(v: &mut Vec<u8>, f: F) -> Result<usize>
where F: FnOnce(&mut [u8]) -> Result<usize>
{
let len = v.len();
let new_area = v.capacity() - len;
v.extend(iter::repeat(0).take(new_area));
match f(&mut v[len..]) {
Ok(n) => {
v.truncate(len + n);
Ok(n)
}
Err(e) => {
v.truncate(len);
Err(e)
}
}
}

// A few methods below (read_to_string, read_line) will append data into a
// `String` buffer, but we need to be pretty careful when doing this. The
// implementation will just call `.as_mut_vec()` and then delegate to a
Expand Down Expand Up @@ -116,19 +92,45 @@ fn append_to_string<F>(buf: &mut String, f: F) -> Result<usize>
}
}

// This uses an adaptive system to extend the vector when it fills. We want to
// avoid paying to allocate and zero a huge chunk of memory if the reader only
// has 4 bytes while still making large reads if the reader does have a ton
// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
// time is 4,500 times (!) slower than this if the reader has a very small
// amount of data to return.
fn read_to_end<R: Read + ?Sized>(r: &mut R, buf: &mut Vec<u8>) -> Result<usize> {
let mut read = 0;
let start_len = buf.len();
let mut len = start_len;
let mut cap_bump = 16;
let ret;
loop {
if buf.capacity() == buf.len() {
buf.reserve(DEFAULT_BUF_SIZE);
if len == buf.len() {
if buf.capacity() == buf.len() {
if cap_bump < DEFAULT_BUF_SIZE {
cap_bump *= 2;
}
buf.reserve(cap_bump);
}
let new_area = buf.capacity() - buf.len();
buf.extend(iter::repeat(0).take(new_area));
}
match with_end_to_cap(buf, |b| r.read(b)) {
Ok(0) => return Ok(read),
Ok(n) => read += n,

match r.read(&mut buf[len..]) {
Ok(0) => {
ret = Ok(len - start_len);
break;
}
Ok(n) => len += n,
Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
Err(e) => return Err(e),
Err(e) => {
ret = Err(e);
break;
}
}
}

buf.truncate(len);
ret
}

/// A trait for objects which are byte-oriented sources.
Expand Down

0 comments on commit 92f3d9a

Please sign in to comment.