Skip to content

Commit

Permalink
attempt to optimise vectored write
Browse files Browse the repository at this point in the history
  • Loading branch information
conradludgate committed Jun 26, 2022
1 parent 0887113 commit 803083a
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 25 deletions.
126 changes: 101 additions & 25 deletions library/std/src/io/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,38 +396,99 @@ fn slice_write_vectored(
Ok(nwritten)
}

// Resizing write implementation
fn vec_write<A>(pos_mut: &mut u64, vec: &mut Vec<u8, A>, buf: &[u8]) -> io::Result<usize>
where
A: Allocator,
{
/// Reserves the required space, and pads the vec with 0s if necessary.
fn reserve_and_pad<A: Allocator>(
pos_mut: &mut u64,
vec: &mut Vec<u8, A>,
buf_len: usize,
) -> io::Result<usize> {
let pos: usize = (*pos_mut).try_into().map_err(|_| {
io::const_io_error!(
ErrorKind::InvalidInput,
"cursor position exceeds maximum possible vector length",
)
})?;
// Make sure the internal buffer is as least as big as where we
// currently are
let len = vec.len();
if len < pos {
// use `resize` so that the zero filling is as efficient as possible
vec.resize(pos, 0);
}
// Figure out what bytes will be used to overwrite what's currently
// there (left), and what will be appended on the end (right)
{
let space = vec.len() - pos;
let (left, right) = buf.split_at(cmp::min(space, buf.len()));
vec[pos..pos + left.len()].copy_from_slice(left);
vec.extend_from_slice(right);

// For safety reasons, we don't want these numbers to overflow
// otherwise our allocation won't be enough
let desired_cap = pos.saturating_add(buf_len);
if desired_cap > vec.capacity() {
// We want our vec's total capacity
// to have room for (pos+buf_len) bytes. Reserve allocates
// based on additional elements from the length, so we need to
// reserve the difference
vec.reserve(desired_cap - vec.len());
}
// Pad if pos is above the current len.
if pos > vec.len() {
let diff = pos - vec.len();
// Unfortunately, `resize()` would suffice but the optimiser does not
// realise the `reserve` it does can be eliminated. So we do it manually
// to eliminate that extra branch
let spare = vec.spare_capacity_mut();
debug_assert!(spare.len() >= diff);
// Safety: we have allocated enough capacity for this.
// And we are only writing, not reading
unsafe {
spare.get_unchecked_mut(..diff).fill(core::mem::MaybeUninit::new(0));
vec.set_len(pos);
}
}

Ok(pos)
}

/// Writes the slice to the vec without allocating
/// # Safety: vec must have buf.len() spare capacity
unsafe fn vec_write_unchecked<A>(pos: usize, vec: &mut Vec<u8, A>, buf: &[u8]) -> usize
where
A: Allocator,
{
debug_assert!(vec.capacity() >= pos + buf.len());
vec.as_mut_ptr().add(pos).copy_from(buf.as_ptr(), buf.len());
pos + buf.len()
}

/// Resizing write implementation for [`Cursor`]
///
/// Cursor is allowed to have a pre-allocated and initialised
/// vector body, but with a position of 0. This means the [`Write`]
/// will overwrite the contents of the vec.
///
/// This also allows for the vec body to be empty, but with a position of N.
/// This means that [`Write`] will pad the vec with 0 initially,
/// before writing anything from that point
fn vec_write<A>(pos_mut: &mut u64, vec: &mut Vec<u8, A>, buf: &[u8]) -> io::Result<usize>
where
A: Allocator,
{
let buf_len = buf.len();
let mut pos = reserve_and_pad(pos_mut, vec, buf_len)?;

// Write the buf then progress the vec forward if necessary
// Safety: we have ensured that the capacity is available
// and that all bytes get written up to pos
unsafe {
pos = vec_write_unchecked(pos, vec, buf);
if pos > vec.len() {
vec.set_len(pos);
}
};

// Bump us forward
*pos_mut = (pos + buf.len()) as u64;
Ok(buf.len())
*pos_mut += buf_len as u64;
Ok(buf_len)
}

/// Resizing write_vectored implementation for [`Cursor`]
///
/// Cursor is allowed to have a pre-allocated and initialised
/// vector body, but with a position of 0. This means the [`Write`]
/// will overwrite the contents of the vec.
///
/// This also allows for the vec body to be empty, but with a position of N.
/// This means that [`Write`] will pad the vec with 0 initially,
/// before writing anything from that point
fn vec_write_vectored<A>(
pos_mut: &mut u64,
vec: &mut Vec<u8, A>,
Expand All @@ -436,11 +497,26 @@ fn vec_write_vectored<A>(
where
A: Allocator,
{
let mut nwritten = 0;
for buf in bufs {
nwritten += vec_write(pos_mut, vec, buf)?;
// For safety reasons, we don't want this sum to overflow ever.
// If this saturates, the reserve should panic to avoid any unsound writing.
let buf_len = bufs.iter().fold(0usize, |a, b| a.saturating_add(b.len()));
let mut pos = reserve_and_pad(pos_mut, vec, buf_len)?;

// Write the buf then progress the vec forward if necessary
// Safety: we have ensured that the capacity is available
// and that all bytes get written up to the last pos
unsafe {
for buf in bufs {
pos = vec_write_unchecked(pos, vec, buf);
}
if pos > vec.len() {
vec.set_len(pos);
}
}
Ok(nwritten)

// Bump us forward
*pos_mut += buf_len as u64;
Ok(buf_len)
}

#[stable(feature = "rust1", since = "1.0.0")]
Expand Down
48 changes: 48 additions & 0 deletions library/std/src/io/cursor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ fn test_vec_writer() {
#[test]
fn test_mem_writer() {
let mut writer = Cursor::new(Vec::new());
writer.set_position(10);
assert_eq!(writer.write(&[0]).unwrap(), 1);
assert_eq!(writer.write(&[1, 2, 3]).unwrap(), 3);
assert_eq!(writer.write(&[4, 5, 6, 7]).unwrap(), 4);
Expand All @@ -30,6 +31,17 @@ fn test_mem_writer() {
3
);
let b: &[_] = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
assert_eq!(&writer.get_ref()[..10], &[0; 10]);
assert_eq!(&writer.get_ref()[10..], b);
}

#[test]
fn test_mem_writer_preallocated() {
let mut writer = Cursor::new(vec![0, 0, 0, 0, 0, 0, 0, 0, 8, 9, 10]);
assert_eq!(writer.write(&[0]).unwrap(), 1);
assert_eq!(writer.write(&[1, 2, 3]).unwrap(), 3);
assert_eq!(writer.write(&[4, 5, 6, 7]).unwrap(), 4);
let b: &[_] = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
assert_eq!(&writer.get_ref()[..], b);
}

Expand Down Expand Up @@ -517,3 +529,39 @@ fn const_cursor() {
const _: &&[u8] = CURSOR.get_ref();
const _: u64 = CURSOR.position();
}

#[bench]
fn bench_write_vec(b: &mut test::Bencher) {
let slice = &[1; 128];

b.iter(|| {
let mut buf = b"some random data to overwrite".to_vec();
let mut cursor = Cursor::new(&mut buf);

let _ = cursor.write_all(slice);
test::black_box(&cursor);
})
}

#[bench]
fn bench_write_vec_vectored(b: &mut test::Bencher) {
let slices = [
IoSlice::new(&[1; 128]),
IoSlice::new(&[2; 256]),
IoSlice::new(&[3; 512]),
IoSlice::new(&[4; 1024]),
IoSlice::new(&[5; 2048]),
IoSlice::new(&[6; 4096]),
IoSlice::new(&[7; 8192]),
IoSlice::new(&[8; 8192 * 2]),
];

b.iter(|| {
let mut buf = b"some random data to overwrite".to_vec();
let mut cursor = Cursor::new(&mut buf);

let mut slices = slices;
let _ = cursor.write_all_vectored(&mut slices);
test::black_box(&cursor);
})
}

0 comments on commit 803083a

Please sign in to comment.