Skip to content

Commit

Permalink
ARROW-11291: [Rust] Add extend to MutableBuffer (-20% for arithmetic,…
Browse files Browse the repository at this point in the history
… -97% for length)

# Rational

Rust forbids safely accessing uninitialized memory because it is undefined behavior. However, when building `Buffer`s, it is important to be able to _write_ to uninitialized memory regions, thereby avoiding the need to write _something_ to it before using it.

Currently, all our initializations are zeroed, which is expensive. apache#9076 modifies our allocator to allocate uninitialized regions. However, by itself, this is not useful if we do not offer any methods to write to those (uninitialized) regions.

# This PR

This PR is built on top of apache#9076 and introduces methods to extend a `MutableBuffer` from an iterator, thereby offering an API to efficiently grow `MutableBuffer` without having to initialize memory regions with zeros (i.e. without `with_bitset` and the like).

This PR also introduces methods to create a `Buffer` from an iterator and a `trusted_len` iterator.

The design is inspired in `Vec`, with the catch that we use stable Rust (i.e. no trait specialization, no `TrustedLen`), and thus have to expose a bit more methods than what `Vec` exposes. This means that we can't use that (nicer) API for trustedlen iterators based on `collect()`.

Note that, as before, there are still `unsafe` uses of the `MutableBuffer` derived from the fact that it is not a generic over a type `T` (and thus people can mix types and grow the buffer in unsound ways).

Special thanks to @mbrubeck for all the help on this, originally discussed [here](https://users.rust-lang.org/t/collect-for-exactsizediterator/54367/6).

```bash
git checkout master
cargo bench --bench arithmetic_kernels
git checkout length_faster
cargo bench --bench arithmetic_kernels

git checkout 16bc720
cargo bench --bench length_kernel
git checkout length_faster
```

```
Switched to branch 'length_faster'
  (use "git pull" to merge the remote branch into yours)
   Compiling arrow v3.0.0-SNAPSHOT (/Users/jorgecarleitao/projects/arrow/rust/arrow)
    Finished bench [optimized] target(s) in 1m 02s
     Running /Users/jorgecarleitao/projects/arrow/rust/target/release/deps/arithmetic_kernels-ec2cc20ce07d9b83
Gnuplot not found, using plotters backend
add 512                 time:   [522.24 ns 523.67 ns 525.26 ns]
                        change: [-21.738% -20.960% -20.233%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 12 outliers among 100 measurements (12.00%)
  9 (9.00%) high mild
  3 (3.00%) high severe

subtract 512            time:   [503.18 ns 504.93 ns 506.81 ns]
                        change: [-21.741% -21.075% -20.308%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 4 outliers among 100 measurements (4.00%)
  2 (2.00%) high mild
  2 (2.00%) high severe

multiply 512            time:   [508.25 ns 512.04 ns 516.06 ns]
                        change: [-22.569% -21.946% -21.305%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

divide 512              time:   [1.4711 us 1.4753 us 1.4799 us]
                        change: [-24.783% -23.305% -22.176%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 5 outliers among 100 measurements (5.00%)
  2 (2.00%) high mild
  3 (3.00%) high severe

limit 512, 512          time:   [373.47 ns 377.76 ns 382.21 ns]
                        change: [+3.3055% +4.4193% +5.5923%] (p = 0.00 < 0.05)
                        Performance has regressed.

add_nulls_512           time:   [502.94 ns 504.51 ns 506.28 ns]
                        change: [-24.876% -24.299% -23.709%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high severe

divide_nulls_512        time:   [1.4843 us 1.4931 us 1.5053 us]
                        change: [-22.968% -22.243% -21.420%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 24 outliers among 100 measurements (24.00%)
  15 (15.00%) low mild
  1 (1.00%) high mild
  8 (8.00%) high severe
```

Length (against the commit that fixes the bench, `16bc7200f3baa6e526aea7135c60dcc949c9b592`, not master):

```
length                  time:   [1.5379 us 1.5408 us 1.5437 us]
                        change: [-97.311% -97.295% -97.278%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 12 outliers among 100 measurements (12.00%)
  1 (1.00%) low severe
  4 (4.00%) low mild
  3 (3.00%) high mild
  4 (4.00%) high severe
```

Closes apache#9235 from jorgecarleitao/length_faster

Lead-authored-by: Jorge C. Leitao <jorgecarleitao@gmail.com>
Co-authored-by: Matt Brubeck <mbrubeck@limpet.net>
Signed-off-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
2 people authored and GeorgeAp committed Jun 7, 2021
1 parent 3062c6d commit 5751cc1
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 61 deletions.
15 changes: 15 additions & 0 deletions rust/arrow/benches/buffer_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ fn mutable_buffer(data: &[Vec<u32>], capacity: usize) -> Buffer {
})
}

fn mutable_buffer_extend(data: &[Vec<u32>], capacity: usize) -> Buffer {
criterion::black_box({
let mut result = MutableBuffer::new(capacity);

data.iter()
.for_each(|vec| result.extend(vec.iter().copied()));

result.into()
})
}

fn from_slice(data: &[Vec<u32>], capacity: usize) -> Buffer {
criterion::black_box({
let mut a = Vec::<u32>::with_capacity(capacity);
Expand Down Expand Up @@ -72,6 +83,10 @@ fn benchmark(c: &mut Criterion) {

c.bench_function("mutable", |b| b.iter(|| mutable_buffer(&data, 0)));

c.bench_function("mutable extend", |b| {
b.iter(|| mutable_buffer_extend(&data, 0))
});

c.bench_function("mutable prepared", |b| {
b.iter(|| mutable_buffer(&data, byte_cap))
});
Expand Down
14 changes: 6 additions & 8 deletions rust/arrow/benches/length_kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,23 @@ extern crate arrow;
use arrow::array::*;
use arrow::compute::kernels::length::length;

fn bench_length() {
fn bench_length(array: &StringArray) {
criterion::black_box(length(array).unwrap());
}

fn add_benchmark(c: &mut Criterion) {
fn double_vec<T: Clone>(v: Vec<T>) -> Vec<T> {
[&v[..], &v[..]].concat()
}

// double ["hello", " ", "world", "!"] 10 times
let mut values = vec!["one", "on", "o", ""];
let mut expected = vec![3, 2, 1, 0];
for _ in 0..10 {
values = double_vec(values);
expected = double_vec(expected);
}
let array = StringArray::from(values);

criterion::black_box(length(&array).unwrap());
}

fn add_benchmark(c: &mut Criterion) {
c.bench_function("length", |b| b.iter(bench_length));
c.bench_function("length", |b| b.iter(|| bench_length(&array)));
}

criterion_group!(benches, add_benchmark);
Expand Down
4 changes: 2 additions & 2 deletions rust/arrow/src/array/transform/fixed_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend {
let bytes = &values[i * size..(i + 1) * size];
values_buffer.extend_from_slice(bytes);
} else {
values_buffer.extend(size);
values_buffer.extend_zeros(size);
}
})
},
Expand All @@ -61,5 +61,5 @@ pub(super) fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) {
};

let values_buffer = &mut mutable.buffer1;
values_buffer.extend(len * size);
values_buffer.extend_zeros(len * size);
}
2 changes: 1 addition & 1 deletion rust/arrow/src/array/transform/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ pub(super) fn extend_nulls<T: ArrowNativeType>(
mutable: &mut _MutableArrayData,
len: usize,
) {
mutable.buffer1.extend(len * size_of::<T>());
mutable.buffer1.extend_zeros(len * size_of::<T>());
}
230 changes: 219 additions & 11 deletions rust/arrow/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@ use packed_simd::u8x64;

use crate::{
bytes::{Bytes, Deallocation},
datatypes::ToByteSlice,
datatypes::{ArrowNativeType, ToByteSlice},
ffi,
};

use std::convert::AsRef;
use std::fmt::Debug;
use std::iter::FromIterator;
use std::ops::{BitAnd, BitOr, Not};
use std::ptr::NonNull;
use std::sync::Arc;
use std::{convert::AsRef, usize};

#[cfg(feature = "avx512")]
use crate::arch::avx512::*;
use crate::datatypes::ArrowNativeType;
use crate::error::{ArrowError, Result};
use crate::memory;
use crate::util::bit_chunk_iterator::BitChunks;
Expand Down Expand Up @@ -697,6 +697,7 @@ unsafe impl Sync for Buffer {}
unsafe impl Send for Buffer {}

impl From<MutableBuffer> for Buffer {
#[inline]
fn from(buffer: MutableBuffer) -> Self {
buffer.into_buffer()
}
Expand Down Expand Up @@ -727,13 +728,14 @@ pub struct MutableBuffer {

impl MutableBuffer {
/// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`.
#[inline]
pub fn new(capacity: usize) -> Self {
let new_capacity = bit_util::round_upto_multiple_of_64(capacity);
let ptr = memory::allocate_aligned(new_capacity);
let capacity = bit_util::round_upto_multiple_of_64(capacity);
let ptr = memory::allocate_aligned(capacity);
Self {
data: ptr,
len: 0,
capacity: new_capacity,
capacity,
}
}

Expand Down Expand Up @@ -810,10 +812,14 @@ impl MutableBuffer {
pub fn reserve(&mut self, additional: usize) {
let required_cap = self.len + additional;
if required_cap > self.capacity {
let new_capacity = bit_util::round_upto_multiple_of_64(required_cap);
let new_capacity = std::cmp::max(new_capacity, self.capacity * 2);
self.data =
unsafe { memory::reallocate(self.data, self.capacity, new_capacity) };
// JUSTIFICATION
// Benefit
// necessity
// Soundness
// `self.data` is valid for `self.capacity`.
let (ptr, new_capacity) =
unsafe { reallocate(self.data, self.capacity, required_cap) };
self.data = ptr;
self.capacity = new_capacity;
}
}
Expand Down Expand Up @@ -899,6 +905,7 @@ impl MutableBuffer {
self.into_buffer()
}

#[inline]
fn into_buffer(self) -> Buffer {
let buffer_data = unsafe {
Bytes::new(self.data, self.len, Deallocation::Native(self.capacity))
Expand Down Expand Up @@ -963,11 +970,167 @@ impl MutableBuffer {

/// Extends the buffer by `additional` bytes equal to `0u8`, incrementing its capacity if needed.
#[inline]
pub fn extend(&mut self, additional: usize) {
pub fn extend_zeros(&mut self, additional: usize) {
self.resize(self.len + additional, 0);
}
}

/// # Safety
/// `ptr` must be allocated for `old_capacity`.
#[inline]
unsafe fn reallocate(
ptr: NonNull<u8>,
old_capacity: usize,
new_capacity: usize,
) -> (NonNull<u8>, usize) {
let new_capacity = bit_util::round_upto_multiple_of_64(new_capacity);
let new_capacity = std::cmp::max(new_capacity, old_capacity * 2);
let ptr = memory::reallocate(ptr, old_capacity, new_capacity);
(ptr, new_capacity)
}

impl<A: ArrowNativeType> Extend<A> for MutableBuffer {
#[inline]
fn extend<T: IntoIterator<Item = A>>(&mut self, iter: T) {
let iterator = iter.into_iter();
self.extend_from_iter(iterator)
}
}

impl MutableBuffer {
#[inline]
fn extend_from_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
&mut self,
mut iterator: I,
) {
let size = std::mem::size_of::<T>();
let (lower, _) = iterator.size_hint();
let additional = lower * size;
self.reserve(additional);

// this is necessary because of https://github.com/rust-lang/rust/issues/32155
let mut len = SetLenOnDrop::new(&mut self.len);
let mut dst = unsafe { self.data.as_ptr().add(len.local_len) as *mut T };
let capacity = self.capacity;

while len.local_len + size <= capacity {
if let Some(item) = iterator.next() {
unsafe {
std::ptr::write(dst, item);
dst = dst.add(1);
}
len.local_len += size;
} else {
break;
}
}
drop(len);

iterator.for_each(|item| self.push(item));
}
}

impl Buffer {
/// Creates a [`Buffer`] from an [`Iterator`] with a trusted (upper) length.
/// Prefer this to `collect` whenever possible, as it is faster ~60% faster.
/// # Example
/// ```
/// # use arrow::buffer::Buffer;
/// let v = vec![1u32];
/// let iter = v.iter().map(|x| x * 2);
/// let buffer = unsafe { Buffer::from_trusted_len_iter(iter) };
/// assert_eq!(buffer.len(), 4) // u32 has 4 bytes
/// ```
/// # Safety
/// This method assumes that the iterator's size is correct and is undefined behavior
/// to use it on an iterator that reports an incorrect length.
// This implementation is required for two reasons:
// 1. there is no trait `TrustedLen` in stable rust and therefore
// we can't specialize `extend` for `TrustedLen` like `Vec` does.
// 2. `from_trusted_len_iter` is faster.
pub unsafe fn from_trusted_len_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
iterator: I,
) -> Self {
let (_, upper) = iterator.size_hint();
let upper = upper.expect("from_trusted_len_iter requires an upper limit");
let len = upper * std::mem::size_of::<T>();

let mut buffer = MutableBuffer::new(len);

let mut dst = buffer.data.as_ptr() as *mut T;
for item in iterator {
// note how there is no reserve here (compared with `extend_from_iter`)
std::ptr::write(dst, item);
dst = dst.add(1);
}
assert_eq!(
dst.offset_from(buffer.data.as_ptr() as *mut T) as usize,
upper,
"Trusted iterator length was not accurately reported"
);
buffer.len = len;
buffer.into()
}

/// Creates a [`Buffer`] from an [`Iterator`] with a trusted (upper) length or errors
/// if any of the items of the iterator is an error.
/// Prefer this to `collect` whenever possible, as it is faster ~60% faster.
/// # Safety
/// This method assumes that the iterator's size is correct and is undefined behavior
/// to use it on an iterator that reports an incorrect length.
pub unsafe fn try_from_trusted_len_iter<
E,
T: ArrowNativeType,
I: Iterator<Item = std::result::Result<T, E>>,
>(
iterator: I,
) -> std::result::Result<Self, E> {
let (_, upper) = iterator.size_hint();
let upper = upper.expect("try_from_trusted_len_iter requires an upper limit");
let len = upper * std::mem::size_of::<T>();

let mut buffer = MutableBuffer::new(len);

let mut dst = buffer.data.as_ptr() as *mut T;
for item in iterator {
// note how there is no reserve here (compared with `extend_from_iter`)
std::ptr::write(dst, item?);
dst = dst.add(1);
}
assert_eq!(
dst.offset_from(buffer.data.as_ptr() as *mut T) as usize,
upper,
"Trusted iterator length was not accurately reported"
);
buffer.len = len;
Ok(buffer.into())
}
}

impl<T: ArrowNativeType> FromIterator<T> for Buffer {
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
let mut iterator = iter.into_iter();
let size = std::mem::size_of::<T>();

// first iteration, which will likely reserve sufficient space for the buffer.
let mut buffer = match iterator.next() {
None => MutableBuffer::new(0),
Some(element) => {
let (lower, _) = iterator.size_hint();
let mut buffer = MutableBuffer::new(lower.saturating_add(1) * size);
unsafe {
std::ptr::write(buffer.as_mut_ptr() as *mut T, element);
buffer.len = size;
}
buffer
}
};

buffer.extend_from_iter(iterator);
buffer.into()
}
}

impl std::ops::Deref for MutableBuffer {
type Target = [u8];

Expand Down Expand Up @@ -1003,6 +1166,28 @@ impl PartialEq for MutableBuffer {
unsafe impl Sync for MutableBuffer {}
unsafe impl Send for MutableBuffer {}

struct SetLenOnDrop<'a> {
len: &'a mut usize,
local_len: usize,
}

impl<'a> SetLenOnDrop<'a> {
#[inline]
fn new(len: &'a mut usize) -> Self {
SetLenOnDrop {
local_len: *len,
len,
}
}
}

impl Drop for SetLenOnDrop<'_> {
#[inline]
fn drop(&mut self) {
*self.len = self.local_len;
}
}

#[cfg(test)]
mod tests {
use std::thread;
Expand Down Expand Up @@ -1172,6 +1357,29 @@ mod tests {
assert_eq!(b"hello arrow", buf.as_slice());
}

#[test]
fn mutable_extend_from_iter() {
let mut buf = MutableBuffer::new(0);
buf.extend(vec![1u32, 2]);
assert_eq!(8, buf.len());
assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());

buf.extend(vec![3u32, 4]);
assert_eq!(16, buf.len());
assert_eq!(
&[1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0],
buf.as_slice()
);
}

#[test]
fn test_from_trusted_len_iter() {
let iter = vec![1u32, 2].into_iter();
let buf = unsafe { Buffer::from_trusted_len_iter(iter) };
assert_eq!(8, buf.len());
assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
}

#[test]
fn test_mutable_reserve() {
let mut buf = MutableBuffer::new(1);
Expand Down
1 change: 1 addition & 0 deletions rust/arrow/src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl Bytes {
///
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
/// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed.
#[inline]
pub unsafe fn new(
ptr: std::ptr::NonNull<u8>,
len: usize,
Expand Down
Loading

0 comments on commit 5751cc1

Please sign in to comment.