diff --git a/rust/arrow/benches/buffer_create.rs b/rust/arrow/benches/buffer_create.rs index c5b9a4e69a9b2..17c3423948ec0 100644 --- a/rust/arrow/benches/buffer_create.rs +++ b/rust/arrow/benches/buffer_create.rs @@ -39,6 +39,17 @@ fn mutable_buffer(data: &[Vec], capacity: usize) -> Buffer { }) } +fn mutable_buffer_extend(data: &[Vec], capacity: usize) -> Buffer { + criterion::black_box({ + let mut result = MutableBuffer::new(capacity); + + data.iter() + .for_each(|vec| result.extend(vec.iter().copied())); + + result.into() + }) +} + fn from_slice(data: &[Vec], capacity: usize) -> Buffer { criterion::black_box({ let mut a = Vec::::with_capacity(capacity); @@ -72,6 +83,10 @@ fn benchmark(c: &mut Criterion) { c.bench_function("mutable", |b| b.iter(|| mutable_buffer(&data, 0))); + c.bench_function("mutable extend", |b| { + b.iter(|| mutable_buffer_extend(&data, 0)) + }); + c.bench_function("mutable prepared", |b| { b.iter(|| mutable_buffer(&data, byte_cap)) }); diff --git a/rust/arrow/benches/length_kernel.rs b/rust/arrow/benches/length_kernel.rs index cdc338acee4c0..b70f6374f8ffc 100644 --- a/rust/arrow/benches/length_kernel.rs +++ b/rust/arrow/benches/length_kernel.rs @@ -24,25 +24,23 @@ extern crate arrow; use arrow::array::*; use arrow::compute::kernels::length::length; -fn bench_length() { +fn bench_length(array: &StringArray) { + criterion::black_box(length(array).unwrap()); +} + +fn add_benchmark(c: &mut Criterion) { fn double_vec(v: Vec) -> Vec { [&v[..], &v[..]].concat() } // double ["hello", " ", "world", "!"] 10 times let mut values = vec!["one", "on", "o", ""]; - let mut expected = vec![3, 2, 1, 0]; for _ in 0..10 { values = double_vec(values); - expected = double_vec(expected); } let array = StringArray::from(values); - criterion::black_box(length(&array).unwrap()); -} - -fn add_benchmark(c: &mut Criterion) { - c.bench_function("length", |b| b.iter(bench_length)); + c.bench_function("length", |b| b.iter(|| bench_length(&array))); } criterion_group!(benches, add_benchmark); diff --git a/rust/arrow/src/array/transform/fixed_binary.rs b/rust/arrow/src/array/transform/fixed_binary.rs index 477d2ad277c1a..36952d46a4d6a 100644 --- a/rust/arrow/src/array/transform/fixed_binary.rs +++ b/rust/arrow/src/array/transform/fixed_binary.rs @@ -46,7 +46,7 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend { let bytes = &values[i * size..(i + 1) * size]; values_buffer.extend_from_slice(bytes); } else { - values_buffer.extend(size); + values_buffer.extend_zeros(size); } }) }, @@ -61,5 +61,5 @@ pub(super) fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) { }; let values_buffer = &mut mutable.buffer1; - values_buffer.extend(len * size); + values_buffer.extend_zeros(len * size); } diff --git a/rust/arrow/src/array/transform/primitive.rs b/rust/arrow/src/array/transform/primitive.rs index 77038432459c1..032bb4a877940 100644 --- a/rust/arrow/src/array/transform/primitive.rs +++ b/rust/arrow/src/array/transform/primitive.rs @@ -36,5 +36,5 @@ pub(super) fn extend_nulls( mutable: &mut _MutableArrayData, len: usize, ) { - mutable.buffer1.extend(len * size_of::()); + mutable.buffer1.extend_zeros(len * size_of::()); } diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index cf300fe3fd3d3..97f9b1f9c8dcd 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -23,19 +23,19 @@ use packed_simd::u8x64; use crate::{ bytes::{Bytes, Deallocation}, - datatypes::ToByteSlice, + datatypes::{ArrowNativeType, ToByteSlice}, ffi, }; -use std::convert::AsRef; use std::fmt::Debug; +use std::iter::FromIterator; use std::ops::{BitAnd, BitOr, Not}; use std::ptr::NonNull; use std::sync::Arc; +use std::{convert::AsRef, usize}; #[cfg(feature = "avx512")] use crate::arch::avx512::*; -use crate::datatypes::ArrowNativeType; use crate::error::{ArrowError, Result}; use crate::memory; use crate::util::bit_chunk_iterator::BitChunks; @@ -697,6 +697,7 @@ unsafe impl Sync for Buffer {} unsafe impl Send for Buffer {} impl From for Buffer { + #[inline] fn from(buffer: MutableBuffer) -> Self { buffer.into_buffer() } @@ -727,13 +728,14 @@ pub struct MutableBuffer { impl MutableBuffer { /// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`. + #[inline] pub fn new(capacity: usize) -> Self { - let new_capacity = bit_util::round_upto_multiple_of_64(capacity); - let ptr = memory::allocate_aligned(new_capacity); + let capacity = bit_util::round_upto_multiple_of_64(capacity); + let ptr = memory::allocate_aligned(capacity); Self { data: ptr, len: 0, - capacity: new_capacity, + capacity, } } @@ -810,10 +812,14 @@ impl MutableBuffer { pub fn reserve(&mut self, additional: usize) { let required_cap = self.len + additional; if required_cap > self.capacity { - let new_capacity = bit_util::round_upto_multiple_of_64(required_cap); - let new_capacity = std::cmp::max(new_capacity, self.capacity * 2); - self.data = - unsafe { memory::reallocate(self.data, self.capacity, new_capacity) }; + // JUSTIFICATION + // Benefit + // necessity + // Soundness + // `self.data` is valid for `self.capacity`. + let (ptr, new_capacity) = + unsafe { reallocate(self.data, self.capacity, required_cap) }; + self.data = ptr; self.capacity = new_capacity; } } @@ -899,6 +905,7 @@ impl MutableBuffer { self.into_buffer() } + #[inline] fn into_buffer(self) -> Buffer { let buffer_data = unsafe { Bytes::new(self.data, self.len, Deallocation::Native(self.capacity)) @@ -963,11 +970,167 @@ impl MutableBuffer { /// Extends the buffer by `additional` bytes equal to `0u8`, incrementing its capacity if needed. #[inline] - pub fn extend(&mut self, additional: usize) { + pub fn extend_zeros(&mut self, additional: usize) { self.resize(self.len + additional, 0); } } +/// # Safety +/// `ptr` must be allocated for `old_capacity`. +#[inline] +unsafe fn reallocate( + ptr: NonNull, + old_capacity: usize, + new_capacity: usize, +) -> (NonNull, usize) { + let new_capacity = bit_util::round_upto_multiple_of_64(new_capacity); + let new_capacity = std::cmp::max(new_capacity, old_capacity * 2); + let ptr = memory::reallocate(ptr, old_capacity, new_capacity); + (ptr, new_capacity) +} + +impl Extend for MutableBuffer { + #[inline] + fn extend>(&mut self, iter: T) { + let iterator = iter.into_iter(); + self.extend_from_iter(iterator) + } +} + +impl MutableBuffer { + #[inline] + fn extend_from_iter>( + &mut self, + mut iterator: I, + ) { + let size = std::mem::size_of::(); + let (lower, _) = iterator.size_hint(); + let additional = lower * size; + self.reserve(additional); + + // this is necessary because of https://github.com/rust-lang/rust/issues/32155 + let mut len = SetLenOnDrop::new(&mut self.len); + let mut dst = unsafe { self.data.as_ptr().add(len.local_len) as *mut T }; + let capacity = self.capacity; + + while len.local_len + size <= capacity { + if let Some(item) = iterator.next() { + unsafe { + std::ptr::write(dst, item); + dst = dst.add(1); + } + len.local_len += size; + } else { + break; + } + } + drop(len); + + iterator.for_each(|item| self.push(item)); + } +} + +impl Buffer { + /// Creates a [`Buffer`] from an [`Iterator`] with a trusted (upper) length. + /// Prefer this to `collect` whenever possible, as it is faster ~60% faster. + /// # Example + /// ``` + /// # use arrow::buffer::Buffer; + /// let v = vec![1u32]; + /// let iter = v.iter().map(|x| x * 2); + /// let buffer = unsafe { Buffer::from_trusted_len_iter(iter) }; + /// assert_eq!(buffer.len(), 4) // u32 has 4 bytes + /// ``` + /// # Safety + /// This method assumes that the iterator's size is correct and is undefined behavior + /// to use it on an iterator that reports an incorrect length. + // This implementation is required for two reasons: + // 1. there is no trait `TrustedLen` in stable rust and therefore + // we can't specialize `extend` for `TrustedLen` like `Vec` does. + // 2. `from_trusted_len_iter` is faster. + pub unsafe fn from_trusted_len_iter>( + iterator: I, + ) -> Self { + let (_, upper) = iterator.size_hint(); + let upper = upper.expect("from_trusted_len_iter requires an upper limit"); + let len = upper * std::mem::size_of::(); + + let mut buffer = MutableBuffer::new(len); + + let mut dst = buffer.data.as_ptr() as *mut T; + for item in iterator { + // note how there is no reserve here (compared with `extend_from_iter`) + std::ptr::write(dst, item); + dst = dst.add(1); + } + assert_eq!( + dst.offset_from(buffer.data.as_ptr() as *mut T) as usize, + upper, + "Trusted iterator length was not accurately reported" + ); + buffer.len = len; + buffer.into() + } + + /// Creates a [`Buffer`] from an [`Iterator`] with a trusted (upper) length or errors + /// if any of the items of the iterator is an error. + /// Prefer this to `collect` whenever possible, as it is faster ~60% faster. + /// # Safety + /// This method assumes that the iterator's size is correct and is undefined behavior + /// to use it on an iterator that reports an incorrect length. + pub unsafe fn try_from_trusted_len_iter< + E, + T: ArrowNativeType, + I: Iterator>, + >( + iterator: I, + ) -> std::result::Result { + let (_, upper) = iterator.size_hint(); + let upper = upper.expect("try_from_trusted_len_iter requires an upper limit"); + let len = upper * std::mem::size_of::(); + + let mut buffer = MutableBuffer::new(len); + + let mut dst = buffer.data.as_ptr() as *mut T; + for item in iterator { + // note how there is no reserve here (compared with `extend_from_iter`) + std::ptr::write(dst, item?); + dst = dst.add(1); + } + assert_eq!( + dst.offset_from(buffer.data.as_ptr() as *mut T) as usize, + upper, + "Trusted iterator length was not accurately reported" + ); + buffer.len = len; + Ok(buffer.into()) + } +} + +impl FromIterator for Buffer { + fn from_iter>(iter: I) -> Self { + let mut iterator = iter.into_iter(); + let size = std::mem::size_of::(); + + // first iteration, which will likely reserve sufficient space for the buffer. + let mut buffer = match iterator.next() { + None => MutableBuffer::new(0), + Some(element) => { + let (lower, _) = iterator.size_hint(); + let mut buffer = MutableBuffer::new(lower.saturating_add(1) * size); + unsafe { + std::ptr::write(buffer.as_mut_ptr() as *mut T, element); + buffer.len = size; + } + buffer + } + }; + + buffer.extend_from_iter(iterator); + buffer.into() + } +} + impl std::ops::Deref for MutableBuffer { type Target = [u8]; @@ -1003,6 +1166,28 @@ impl PartialEq for MutableBuffer { unsafe impl Sync for MutableBuffer {} unsafe impl Send for MutableBuffer {} +struct SetLenOnDrop<'a> { + len: &'a mut usize, + local_len: usize, +} + +impl<'a> SetLenOnDrop<'a> { + #[inline] + fn new(len: &'a mut usize) -> Self { + SetLenOnDrop { + local_len: *len, + len, + } + } +} + +impl Drop for SetLenOnDrop<'_> { + #[inline] + fn drop(&mut self) { + *self.len = self.local_len; + } +} + #[cfg(test)] mod tests { use std::thread; @@ -1172,6 +1357,29 @@ mod tests { assert_eq!(b"hello arrow", buf.as_slice()); } + #[test] + fn mutable_extend_from_iter() { + let mut buf = MutableBuffer::new(0); + buf.extend(vec![1u32, 2]); + assert_eq!(8, buf.len()); + assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice()); + + buf.extend(vec![3u32, 4]); + assert_eq!(16, buf.len()); + assert_eq!( + &[1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0], + buf.as_slice() + ); + } + + #[test] + fn test_from_trusted_len_iter() { + let iter = vec![1u32, 2].into_iter(); + let buf = unsafe { Buffer::from_trusted_len_iter(iter) }; + assert_eq!(8, buf.len()); + assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice()); + } + #[test] fn test_mutable_reserve() { let mut buf = MutableBuffer::new(1); diff --git a/rust/arrow/src/bytes.rs b/rust/arrow/src/bytes.rs index 61d52ef572ff8..323654954f802 100644 --- a/rust/arrow/src/bytes.rs +++ b/rust/arrow/src/bytes.rs @@ -79,6 +79,7 @@ impl Bytes { /// /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. + #[inline] pub unsafe fn new( ptr: std::ptr::NonNull, len: usize, diff --git a/rust/arrow/src/compute/kernels/arithmetic.rs b/rust/arrow/src/compute/kernels/arithmetic.rs index 98c0660ba2b7c..06d7c90f6a5de 100644 --- a/rust/arrow/src/compute/kernels/arithmetic.rs +++ b/rust/arrow/src/compute/kernels/arithmetic.rs @@ -28,7 +28,7 @@ use std::sync::Arc; use num::{One, Zero}; use crate::buffer::Buffer; -#[cfg(feature = "simd")] +#[cfg(simd)] use crate::buffer::MutableBuffer; use crate::compute::util::combine_option_bitmap; use crate::datatypes; @@ -51,11 +51,13 @@ where T::Native: Neg, F: Fn(T::Native) -> T::Native, { - let values = array - .values() - .iter() - .map(|v| op(*v)) - .collect::>(); + let values = array.values().iter().map(|v| op(*v)); + // JUSTIFICATION + // Benefit + // ~60% speedup + // Soundness + // `values` is an iterator with a known size. + let buffer = unsafe { Buffer::from_trusted_len_iter(values) }; let data = ArrayData::new( T::DATA_TYPE, @@ -63,7 +65,7 @@ where None, array.data_ref().null_buffer().cloned(), 0, - vec![Buffer::from_slice_ref(&values)], + vec![buffer], vec![], ); Ok(PrimitiveArray::::from(Arc::new(data))) @@ -147,8 +149,13 @@ where .values() .iter() .zip(right.values().iter()) - .map(|(l, r)| op(*l, *r)) - .collect::>(); + .map(|(l, r)| op(*l, *r)); + // JUSTIFICATION + // Benefit + // ~60% speedup + // Soundness + // `values` is an iterator with a known size. + let buffer = unsafe { Buffer::from_trusted_len_iter(values) }; let data = ArrayData::new( T::DATA_TYPE, @@ -156,7 +163,7 @@ where None, null_bit_buffer, 0, - vec![Buffer::from_slice_ref(&values)], + vec![buffer], vec![], ); Ok(PrimitiveArray::::from(Arc::new(data))) @@ -186,33 +193,37 @@ where let null_bit_buffer = combine_option_bitmap(left.data_ref(), right.data_ref(), left.len())?; - let mut values = Vec::with_capacity(left.len()); - if let Some(b) = &null_bit_buffer { - // some value is null - for i in 0..left.len() { - let is_valid = unsafe { bit_util::get_bit_raw(b.as_ptr(), i) }; - values.push(if is_valid { - let right_value = right.value(i); - if right_value.is_zero() { - return Err(ArrowError::DivideByZero); + let buffer = if let Some(b) = &null_bit_buffer { + let values = left.values().iter().zip(right.values()).enumerate().map( + |(i, (left, right))| { + let is_valid = unsafe { bit_util::get_bit_raw(b.as_ptr(), i) }; + if is_valid { + if right.is_zero() { + Err(ArrowError::DivideByZero) + } else { + Ok(*left / *right) + } } else { - left.value(i) / right_value + Ok(T::default_value()) } - } else { - T::default_value() - }); - } + }, + ); + unsafe { Buffer::try_from_trusted_len_iter(values) } } else { // no value is null - for i in 0..left.len() { - let right_value = right.value(i); - values.push(if right_value.is_zero() { - return Err(ArrowError::DivideByZero); - } else { - left.value(i) / right_value + let values = left + .values() + .iter() + .zip(right.values()) + .map(|(left, right)| { + if right.is_zero() { + Err(ArrowError::DivideByZero) + } else { + Ok(*left / *right) + } }); - } - }; + unsafe { Buffer::try_from_trusted_len_iter(values) } + }?; let data = ArrayData::new( T::DATA_TYPE, @@ -220,7 +231,7 @@ where None, null_bit_buffer, 0, - vec![Buffer::from_slice_ref(&values)], + vec![buffer], vec![], ); Ok(PrimitiveArray::::from(Arc::new(data))) diff --git a/rust/arrow/src/compute/kernels/length.rs b/rust/arrow/src/compute/kernels/length.rs index a0107c35f911b..5f2ed59e2629d 100644 --- a/rust/arrow/src/compute/kernels/length.rs +++ b/rust/arrow/src/compute/kernels/length.rs @@ -29,16 +29,20 @@ where OffsetSize: OffsetSizeTrait, { // note: offsets are stored as u8, but they can be interpreted as OffsetSize - let offsets = array.data_ref().clone().buffers()[0].clone(); + let offsets = &array.data_ref().buffers()[0]; // this is a 30% improvement over iterating over u8s and building OffsetSize, which // justifies the usage of `unsafe`. let slice: &[OffsetSize] = &unsafe { offsets.typed_data::() }[array.offset()..]; - let lengths: Vec = slice - .windows(2) - .map(|offset| offset[1] - offset[0]) - .collect(); + let lengths = slice.windows(2).map(|offset| offset[1] - offset[0]); + + // JUSTIFICATION + // Benefit + // ~60% speedup + // Soundness + // `values` is an iterator with a known size. + let buffer = unsafe { Buffer::from_trusted_len_iter(lengths) }; let null_bit_buffer = array .data_ref() @@ -52,7 +56,7 @@ where None, null_bit_buffer, 0, - vec![Buffer::from_slice_ref(&lengths)], + vec![buffer], vec![], ); Ok(make_array(Arc::new(data)))