From afabc5ebfcfa0330653051f8ac6f6c6968d29c62 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Fri, 27 Nov 2020 21:33:42 +0100 Subject: [PATCH 1/3] Removed lots of unsafe code. --- rust/arrow/src/array/array_binary.rs | 10 +- rust/arrow/src/array/array_list.rs | 38 +-- rust/arrow/src/array/array_primitive.rs | 19 -- rust/arrow/src/array/array_string.rs | 9 +- rust/arrow/src/array/array_union.rs | 10 - rust/arrow/src/array/builder.rs | 90 +------- rust/arrow/src/array/data.rs | 4 +- rust/arrow/src/array/raw_pointer.rs | 10 - rust/arrow/src/bitmap.rs | 4 +- rust/arrow/src/buffer.rs | 293 +++--------------------- rust/arrow/src/bytes.rs | 29 +-- rust/arrow/src/lib.rs | 1 - rust/arrow/src/memory.rs | 273 ---------------------- 13 files changed, 52 insertions(+), 738 deletions(-) delete mode 100644 rust/arrow/src/memory.rs diff --git a/rust/arrow/src/array/array_binary.rs b/rust/arrow/src/array/array_binary.rs index da640723f12d8..9be9cc600e273 100644 --- a/rust/arrow/src/array/array_binary.rs +++ b/rust/arrow/src/array/array_binary.rs @@ -24,9 +24,9 @@ use std::{ }; use super::{ - array::print_long_array, raw_pointer::as_aligned_pointer, raw_pointer::RawPtrBox, - Array, ArrayData, ArrayDataRef, FixedSizeListArray, GenericBinaryIter, - GenericListArray, LargeListArray, ListArray, OffsetSizeTrait, + array::print_long_array, raw_pointer::RawPtrBox, Array, ArrayData, ArrayDataRef, + FixedSizeListArray, GenericBinaryIter, GenericListArray, LargeListArray, ListArray, + OffsetSizeTrait, }; use crate::util::bit_util; use crate::{buffer::Buffer, datatypes::ToByteSlice}; @@ -209,9 +209,7 @@ impl From let value_data = data.buffers()[1].raw_data(); Self { data, - value_offsets: RawPtrBox::new(as_aligned_pointer::( - raw_value_offsets, - )), + value_offsets: RawPtrBox::new(raw_value_offsets as *const OffsetSize), value_data: RawPtrBox::new(value_data), } } diff --git a/rust/arrow/src/array/array_list.rs b/rust/arrow/src/array/array_list.rs index 4eb8dc56640f0..8e11c025a21a5 100644 --- a/rust/arrow/src/array/array_list.rs +++ b/rust/arrow/src/array/array_list.rs @@ -23,8 +23,8 @@ use std::mem; use num::Num; use super::{ - array::print_long_array, make_array, raw_pointer::as_aligned_pointer, - raw_pointer::RawPtrBox, Array, ArrayDataRef, ArrayRef, + array::print_long_array, make_array, raw_pointer::RawPtrBox, Array, ArrayDataRef, + ArrayRef, }; use crate::datatypes::ArrowNativeType; use crate::datatypes::DataType; @@ -118,7 +118,7 @@ impl From for GenericListArray From for PrimitiveArray { "PrimitiveArray data should contain a single buffer only (values buffer)" ); let raw_values = data.buffers()[0].raw_data(); - assert!( - memory::is_aligned::(raw_values, mem::align_of::()), - "memory is not aligned" - ); Self { data, raw_values: RawPtrBox::new(raw_values as *const T::Native), @@ -466,13 +461,6 @@ mod tests { assert!(arr.is_valid(i)); assert_eq!(i as i32, arr.value(i)); } - - assert_eq!(64, arr.get_buffer_memory_size()); - let internals_of_primitive_array = 8 + 72; // RawPtrBox & Arc combined. - assert_eq!( - arr.get_buffer_memory_size() + internals_of_primitive_array, - arr.get_array_memory_size() - ); } #[test] @@ -492,13 +480,6 @@ mod tests { assert!(!arr.is_valid(i)); } } - - assert_eq!(128, arr.get_buffer_memory_size()); - let internals_of_primitive_array = 8 + 72 + 16; // RawPtrBox & Arc and it's null_bitmap combined. - assert_eq!( - arr.get_buffer_memory_size() + internals_of_primitive_array, - arr.get_array_memory_size() - ); } #[test] diff --git a/rust/arrow/src/array/array_string.rs b/rust/arrow/src/array/array_string.rs index 5f871b8f595de..85043fdff4cd4 100644 --- a/rust/arrow/src/array/array_string.rs +++ b/rust/arrow/src/array/array_string.rs @@ -21,9 +21,8 @@ use std::mem; use std::{any::Any, iter::FromIterator}; use super::{ - array::print_long_array, raw_pointer::as_aligned_pointer, raw_pointer::RawPtrBox, - Array, ArrayData, ArrayDataRef, GenericListArray, GenericStringIter, LargeListArray, - ListArray, OffsetSizeTrait, + array::print_long_array, raw_pointer::RawPtrBox, Array, ArrayData, ArrayDataRef, + GenericListArray, GenericStringIter, LargeListArray, ListArray, OffsetSizeTrait, }; use crate::util::bit_util; use crate::{buffer::Buffer, datatypes::ToByteSlice}; @@ -258,9 +257,7 @@ impl From let value_data = data.buffers()[1].raw_data(); Self { data, - value_offsets: RawPtrBox::new(as_aligned_pointer::( - raw_value_offsets, - )), + value_offsets: RawPtrBox::new(raw_value_offsets as *const OffsetSize), value_data: RawPtrBox::new(value_data), } } diff --git a/rust/arrow/src/array/array_union.rs b/rust/arrow/src/array/array_union.rs index ea42843589fee..a497a2f3f57c3 100644 --- a/rust/arrow/src/array/array_union.rs +++ b/rust/arrow/src/array/array_union.rs @@ -414,16 +414,6 @@ mod tests { let value = slot.value(0); assert_eq!(expected_value, &value); } - - assert_eq!( - 4 * 8 * 4 * mem::size_of::(), - union.get_buffer_memory_size() - ); - let internals_of_union_array = (8 + 72) + (union.boxed_fields.len() * 144); // Arc & Vec combined. - assert_eq!( - union.get_buffer_memory_size() + internals_of_union_array, - union.get_array_memory_size() - ); } #[test] diff --git a/rust/arrow/src/array/builder.rs b/rust/arrow/src/array/builder.rs index 97f2978a3b8b1..972b961352fa3 100644 --- a/rust/arrow/src/array/builder.rs +++ b/rust/arrow/src/array/builder.rs @@ -150,13 +150,6 @@ pub trait BufferBuilderTrait { /// ``` fn is_empty(&self) -> bool; - /// Returns the actual capacity (number of elements) of the internal buffer. - /// - /// Note: the internal capacity returned by this method might be larger than - /// what you'd expect after setting the capacity in the `new()` or `reserve()` - /// functions. - fn capacity(&self) -> usize; - /// Increases the number of elements in the internal buffer by `n` /// and resizes the buffer as needed. /// @@ -271,11 +264,6 @@ impl BufferBuilderTrait for BufferBuilder { self.len == 0 } - fn capacity(&self) -> usize { - let byte_capacity = self.buffer.capacity(); - byte_capacity / T::get_byte_width() - } - #[inline] fn advance(&mut self, i: usize) -> Result<()> { let new_buffer_len = (self.len + i) * mem::size_of::(); @@ -286,9 +274,7 @@ impl BufferBuilderTrait for BufferBuilder { #[inline] fn reserve(&mut self, n: usize) { - let new_capacity = self.len + n; - let byte_capacity = mem::size_of::() * new_capacity; - self.buffer.reserve(byte_capacity); + self.buffer.reserve(mem::size_of::() * n); } #[inline] @@ -349,10 +335,6 @@ impl BooleanBufferBuilder { self.len == 0 } - pub fn capacity(&self) -> usize { - self.buffer.capacity() * 8 - } - #[inline] pub fn advance(&mut self, i: usize) -> Result<()> { let new_buffer_len = bit_util::ceil(self.len + i, 8); @@ -363,27 +345,15 @@ impl BooleanBufferBuilder { #[inline] pub fn reserve(&mut self, n: usize) { - let new_capacity = self.len + n; - if new_capacity > self.capacity() { - let new_byte_capacity = bit_util::ceil(new_capacity, 8); - let existing_capacity = self.buffer.capacity(); - let new_capacity = self.buffer.reserve(new_byte_capacity); - self.buffer - .set_null_bits(existing_capacity, new_capacity - existing_capacity); - } + let new_byte_capacity = bit_util::ceil(self.len + n, 8); + self.buffer.resize(new_byte_capacity); } #[inline] pub fn append(&mut self, v: bool) -> Result<()> { self.reserve(1); if v { - let data = unsafe { - std::slice::from_raw_parts_mut( - self.buffer.raw_data_mut(), - self.buffer.capacity(), - ) - }; - bit_util::set_bit(data, self.len); + bit_util::set_bit(self.buffer.data_mut(), self.len); } self.len += 1; Ok(()) @@ -393,13 +363,7 @@ impl BooleanBufferBuilder { pub fn append_n(&mut self, n: usize, v: bool) -> Result<()> { self.reserve(n); if n != 0 && v { - let data = unsafe { - std::slice::from_raw_parts_mut( - self.buffer.raw_data_mut(), - self.buffer.capacity(), - ) - }; - (self.len..self.len + n).for_each(|i| bit_util::set_bit(data, i)) + let data = self.buffer.data_mut(); } self.len += n; Ok(()) @@ -412,12 +376,7 @@ impl BooleanBufferBuilder { for v in slice { if *v { - // For performance the `len` of the buffer is not - // updated on each append but is updated in the - // `freeze` method instead. - unsafe { - bit_util::set_bit_raw(self.buffer.raw_data_mut(), self.len); - } + bit_util::set_bit(self.buffer.data_mut(), self.len); } self.len += 1; } @@ -491,11 +450,6 @@ impl BooleanBuilder { } } - /// Returns the capacity of this builder measured in slots of type `T` - pub fn capacity(&self) -> usize { - self.values_builder.capacity() - } - /// Appends a value of type `T` into the builder pub fn append_value(&mut self, v: bool) -> Result<()> { self.bitmap_builder.append(true)?; @@ -635,11 +589,6 @@ impl PrimitiveBuilder { } } - /// Returns the capacity of this builder measured in slots of type `T` - pub fn capacity(&self) -> usize { - self.values_builder.capacity() - } - /// Appends a value of type `T` into the builder pub fn append_value(&mut self, v: T::Native) -> Result<()> { self.bitmap_builder.append(true)?; @@ -2365,7 +2314,6 @@ mod tests { fn test_builder_i32_empty() { let mut b = Int32BufferBuilder::new(5); assert_eq!(0, b.len()); - assert_eq!(16, b.capacity()); let a = b.finish(); assert_eq!(0, a.len()); } @@ -2384,7 +2332,6 @@ mod tests { for i in 0..5 { b.append(i).unwrap(); } - assert_eq!(16, b.capacity()); let a = b.finish(); assert_eq!(20, a.len()); } @@ -2392,11 +2339,9 @@ mod tests { #[test] fn test_builder_i32_grow_buffer() { let mut b = Int32BufferBuilder::new(2); - assert_eq!(16, b.capacity()); for i in 0..20 { b.append(i).unwrap(); } - assert_eq!(32, b.capacity()); let a = b.finish(); assert_eq!(80, a.len()); } @@ -2404,41 +2349,21 @@ mod tests { #[test] fn test_builder_finish() { let mut b = Int32BufferBuilder::new(5); - assert_eq!(16, b.capacity()); for i in 0..10 { b.append(i).unwrap(); } let mut a = b.finish(); assert_eq!(40, a.len()); assert_eq!(0, b.len()); - assert_eq!(0, b.capacity()); // Try build another buffer after cleaning up. for i in 0..20 { b.append(i).unwrap() } - assert_eq!(32, b.capacity()); a = b.finish(); assert_eq!(80, a.len()); } - #[test] - fn test_reserve() { - let mut b = UInt8BufferBuilder::new(2); - assert_eq!(64, b.capacity()); - b.reserve(64); - assert_eq!(64, b.capacity()); - b.reserve(65); - assert_eq!(128, b.capacity()); - - let mut b = Int32BufferBuilder::new(2); - assert_eq!(16, b.capacity()); - b.reserve(16); - assert_eq!(16, b.capacity()); - b.reserve(17); - assert_eq!(32, b.capacity()); - } - #[test] fn test_append_slice() { let mut b = UInt8BufferBuilder::new(0); @@ -2487,14 +2412,12 @@ mod tests { b.append(false).unwrap(); b.append(true).unwrap(); assert_eq!(4, b.len()); - assert_eq!(512, b.capacity()); let buffer = b.finish(); assert_eq!(1, buffer.len()); let mut b = BooleanBufferBuilder::new(4); b.append_slice(&[false, true, false, true]).unwrap(); assert_eq!(4, b.len()); - assert_eq!(512, b.capacity()); let buffer = b.finish(); assert_eq!(1, buffer.len()); } @@ -2505,7 +2428,6 @@ mod tests { let bytes = [8, 16, 32, 64].to_byte_slice(); b.write_bytes(bytes, 4); assert_eq!(4, b.len()); - assert_eq!(16, b.capacity()); let buffer = b.finish(); assert_eq!(16, buffer.len()); } diff --git a/rust/arrow/src/array/data.rs b/rust/arrow/src/array/data.rs index d634ed18d5cf6..b86bb92456885 100644 --- a/rust/arrow/src/array/data.rs +++ b/rust/arrow/src/array/data.rs @@ -177,7 +177,7 @@ impl ArrayData { pub fn get_buffer_memory_size(&self) -> usize { let mut size = 0; for buffer in &self.buffers { - size += buffer.capacity(); + size += buffer.len(); } if let Some(bitmap) = &self.null_bitmap { size += bitmap.get_buffer_memory_size() @@ -200,7 +200,7 @@ impl ArrayData { // Calculate rest of the fields top down which contain actual data for buffer in &self.buffers { size += mem::size_of_val(&buffer); - size += buffer.capacity(); + size += buffer.len(); } if let Some(bitmap) = &self.null_bitmap { size += bitmap.get_array_memory_size() diff --git a/rust/arrow/src/array/raw_pointer.rs b/rust/arrow/src/array/raw_pointer.rs index 8eeadfe93900d..96526d092552c 100644 --- a/rust/arrow/src/array/raw_pointer.rs +++ b/rust/arrow/src/array/raw_pointer.rs @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -use crate::memory; - pub(super) struct RawPtrBox { inner: *const T, } @@ -33,11 +31,3 @@ impl RawPtrBox { unsafe impl Send for RawPtrBox {} unsafe impl Sync for RawPtrBox {} - -pub(super) fn as_aligned_pointer(p: *const u8) -> *const T { - assert!( - memory::is_aligned(p, std::mem::align_of::()), - "memory is not aligned" - ); - p as *const T -} diff --git a/rust/arrow/src/bitmap.rs b/rust/arrow/src/bitmap.rs index 7df609fb88be2..c19864bb3863e 100644 --- a/rust/arrow/src/bitmap.rs +++ b/rust/arrow/src/bitmap.rs @@ -67,12 +67,12 @@ impl Bitmap { /// Returns the total number of bytes of memory occupied by the buffers owned by this [Bitmap]. pub fn get_buffer_memory_size(&self) -> usize { - self.bits.capacity() + self.bits.len() } /// Returns the total number of bytes of memory occupied physically by this [Bitmap]. pub fn get_array_memory_size(&self) -> usize { - self.bits.capacity() + mem::size_of_val(self) + self.bits.len() + mem::size_of_val(self) } } diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index 92b42d52606a2..cf89a7a00ff5f 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -38,7 +38,6 @@ use std::sync::Arc; use crate::arch::avx512::*; use crate::datatypes::ArrowNativeType; use crate::error::{ArrowError, Result}; -use crate::memory; use crate::util::bit_chunk_iterator::BitChunks; use crate::util::bit_util; use crate::util::bit_util::ceil; @@ -57,68 +56,11 @@ pub struct Buffer { } impl Buffer { - /// Creates a buffer from an existing memory region (must already be byte-aligned), this - /// `Buffer` will free this piece of memory when dropped. - /// - /// # Arguments - /// - /// * `ptr` - Pointer to raw parts - /// * `len` - Length of raw parts in **bytes** - /// * `capacity` - Total allocated memory for the pointer `ptr`, in **bytes** - /// - /// # Safety - /// - /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` - /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. - pub unsafe fn from_raw_parts(ptr: *const u8, len: usize, capacity: usize) -> Self { - Buffer::build_with_arguments(ptr, len, Deallocation::Native(capacity)) - } - - /// Creates a buffer from an existing memory region (must already be byte-aligned), this - /// `Buffer` **does not** free this piece of memory when dropped. - /// - /// # Arguments - /// - /// * `ptr` - Pointer to raw parts - /// * `len` - Length of raw parts in **bytes** - /// * `data` - An [ffi::FFI_ArrowArray] with the data - /// - /// # Safety - /// - /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` - /// bytes and that the foreign deallocator frees the region. - pub unsafe fn from_unowned( - ptr: *const u8, - len: usize, - data: Arc, - ) -> Self { - Buffer::build_with_arguments(ptr, len, Deallocation::Foreign(data)) - } - - /// Auxiliary method to create a new Buffer - unsafe fn build_with_arguments( - ptr: *const u8, - len: usize, - deallocation: Deallocation, - ) -> Self { - let bytes = Bytes::new(ptr, len, deallocation); - Buffer { - data: Arc::new(bytes), - offset: 0, - } - } - /// Returns the number of bytes in the buffer pub fn len(&self) -> usize { self.data.len() - self.offset } - /// Returns the capacity of this buffer. - /// For exernally owned buffers, this returns zero - pub fn capacity(&self) -> usize { - self.data.capacity() - } - /// Returns whether the buffer is empty. pub fn is_empty(&self) -> bool { self.data.len() - self.offset == 0 @@ -161,7 +103,6 @@ impl Buffer { /// `bool` in Rust. However, `bool` arrays in Arrow are bit-packed which breaks this condition. pub unsafe fn typed_data(&self) -> &[T] { assert_eq!(self.len() % mem::size_of::(), 0); - assert!(memory::is_ptr_aligned::(self.raw_data() as *const T)); from_raw_parts( self.raw_data() as *const T, self.len() / mem::size_of::(), @@ -208,14 +149,10 @@ impl Buffer { /// allocated memory region. impl> From for Buffer { fn from(p: T) -> Self { - // allocate aligned memory buffer - let slice = p.as_ref(); - let len = slice.len() * mem::size_of::(); - let capacity = bit_util::round_upto_multiple_of_64(len); - let buffer = memory::allocate_aligned(capacity); - unsafe { - memory::memcpy(buffer, slice.as_ptr(), len); - Buffer::build_with_arguments(buffer, len, Deallocation::Native(capacity)) + let a = p.as_ref(); + Self { + data: Arc::new(a.to_vec()), + offset: 0, } } } @@ -667,20 +604,14 @@ unsafe impl Send for Buffer {} /// converted into a immutable buffer via the `freeze` method. #[derive(Debug)] pub struct MutableBuffer { - data: *mut u8, - len: usize, - capacity: usize, + data: Vec, } impl MutableBuffer { /// Allocate a new mutable buffer with initial capacity to be `capacity`. pub fn new(capacity: usize) -> Self { - let new_capacity = bit_util::round_upto_multiple_of_64(capacity); - let ptr = memory::allocate_aligned(new_capacity); Self { - data: ptr, - len: 0, - capacity: new_capacity, + data: Vec::with_capacity(capacity), } } @@ -697,11 +628,11 @@ impl MutableBuffer { /// the buffer directly (e.g., modifying the buffer by holding a mutable reference /// from `data_mut()`). pub fn with_bitset(mut self, end: usize, val: bool) -> Self { - assert!(end <= self.capacity); + assert!(end <= self.data.capacity()); let v = if val { 255 } else { 0 }; unsafe { - std::ptr::write_bytes(self.data, v, end); - self.len = end; + std::ptr::write_bytes(self.data.as_mut_ptr(), v, end); + self.data.set_len(end); } self } @@ -712,9 +643,9 @@ impl MutableBuffer { /// `len` of the buffer and so can be used to initialize the memory region from /// `len` to `capacity`. pub fn set_null_bits(&mut self, start: usize, count: usize) { - assert!(start + count <= self.capacity); + assert!(start + count <= self.data.capacity()); unsafe { - std::ptr::write_bytes(self.data.add(start), 0, count); + std::ptr::write_bytes(self.data.as_mut_ptr().add(start), 0, count); } } @@ -722,16 +653,10 @@ impl MutableBuffer { /// also ensure the new capacity will be a multiple of 64 bytes. /// /// Returns the new capacity for this buffer. - pub fn reserve(&mut self, capacity: usize) -> usize { - if capacity > self.capacity { - let new_capacity = bit_util::round_upto_multiple_of_64(capacity); - let new_capacity = cmp::max(new_capacity, self.capacity * 2); - let new_data = - unsafe { memory::reallocate(self.data, self.capacity, new_capacity) }; - self.data = new_data as *mut u8; - self.capacity = new_capacity; + pub fn reserve(&mut self, capacity: usize) { + if capacity < self.data.capacity() { + self.data.reserve(self.data.capacity() - capacity); } - self.capacity } /// Resizes the buffer so that the `len` will equal to the `new_len`. @@ -742,83 +667,40 @@ impl MutableBuffer { /// /// If `new_len` is less than `len`, the buffer will be truncated. pub fn resize(&mut self, new_len: usize) { - if new_len > self.len { - self.reserve(new_len); - } else { - let new_capacity = bit_util::round_upto_multiple_of_64(new_len); - if new_capacity < self.capacity { - let new_data = - unsafe { memory::reallocate(self.data, self.capacity, new_capacity) }; - self.data = new_data as *mut u8; - self.capacity = new_capacity; - } - } - self.len = new_len; + self.data.resize(new_len, 0); } /// Returns whether this buffer is empty or not. #[inline] - pub const fn is_empty(&self) -> bool { - self.len == 0 + pub fn is_empty(&self) -> bool { + self.data.is_empty() } /// Returns the length (the number of bytes written) in this buffer. #[inline] - pub const fn len(&self) -> usize { - self.len - } - - /// Returns the total capacity in this buffer. - #[inline] - pub const fn capacity(&self) -> usize { - self.capacity + pub fn len(&self) -> usize { + self.data.len() } /// Clear all existing data from this buffer. pub fn clear(&mut self) { - self.len = 0 + self.data.clear() } /// Returns the data stored in this buffer as a slice. pub fn data(&self) -> &[u8] { - if self.data.is_null() { - &[] - } else { - unsafe { std::slice::from_raw_parts(self.raw_data(), self.len()) } - } + &self.data } /// Returns the data stored in this buffer as a mutable slice. pub fn data_mut(&mut self) -> &mut [u8] { - if self.data.is_null() { - &mut [] - } else { - unsafe { std::slice::from_raw_parts_mut(self.raw_data_mut(), self.len()) } - } - } - - /// Returns a raw pointer for this buffer. - /// - /// Note that this should be used cautiously, and the returned pointer should not be - /// stored anywhere, to avoid dangling pointers. - #[inline] - pub const fn raw_data(&self) -> *const u8 { - self.data - } - - #[inline] - pub fn raw_data_mut(&mut self) -> *mut u8 { - self.data + &mut self.data } /// Freezes this buffer and return an immutable version of it. pub fn freeze(self) -> Buffer { - let buffer_data = unsafe { - Bytes::new(self.data, self.len, Deallocation::Native(self.capacity)) - }; - std::mem::forget(self); Buffer { - data: Arc::new(buffer_data), + data: Arc::new(self.data), offset: 0, } } @@ -826,10 +708,9 @@ impl MutableBuffer { /// View buffer as typed slice. pub fn typed_data_mut(&mut self) -> &mut [T] { assert_eq!(self.len() % mem::size_of::(), 0); - assert!(memory::is_ptr_aligned::(self.raw_data() as *const T)); unsafe { from_raw_parts_mut( - self.raw_data() as *mut T, + self.data.as_mut_ptr() as *mut T, self.len() / mem::size_of::(), ) } @@ -838,43 +719,12 @@ impl MutableBuffer { /// Extends the buffer from a byte slice, incrementing its capacity if needed. #[inline] pub fn extend_from_slice(&mut self, bytes: &[u8]) { - let new_len = self.len + bytes.len(); - if new_len > self.capacity { - self.reserve(new_len); - } - unsafe { - memory::memcpy(self.data.add(self.len), bytes.as_ptr(), bytes.len()); - } - self.len = new_len; + self.data.extend_from_slice(bytes) } /// Extends the buffer by `len` with all bytes equal to `0u8`, incrementing its capacity if needed. pub fn extend(&mut self, len: usize) { - let remaining_capacity = self.capacity - self.len; - if len > remaining_capacity { - self.reserve(self.len + len); - } - self.len += len; - } -} - -impl Drop for MutableBuffer { - fn drop(&mut self) { - if !self.data.is_null() { - unsafe { memory::free_aligned(self.data, self.capacity) }; - } - } -} - -impl PartialEq for MutableBuffer { - fn eq(&self, other: &MutableBuffer) -> bool { - if self.len != other.len { - return false; - } - if self.capacity != other.capacity { - return false; - } - unsafe { memory::memcmp(self.data, other.data, self.len) == 0 } + self.data.resize(self.data.len() + len, 0); } } @@ -883,7 +733,6 @@ unsafe impl Send for MutableBuffer {} #[cfg(test)] mod tests { - use std::ptr::null_mut; use std::thread; use super::*; @@ -917,20 +766,6 @@ mod tests { assert_ne!(buf1, buf2); } - #[test] - fn test_from_raw_parts() { - let buf = unsafe { Buffer::from_raw_parts(null_mut(), 0, 0) }; - assert_eq!(0, buf.len()); - assert_eq!(0, buf.data().len()); - assert_eq!(0, buf.capacity()); - assert!(buf.raw_data().is_null()); - - let buf = Buffer::from(&[0, 1, 2, 3, 4]); - assert_eq!(5, buf.len()); - assert!(!buf.raw_data().is_null()); - assert_eq!([0, 1, 2, 3, 4], buf.data()); - } - #[test] fn test_from_vec() { let buf = Buffer::from(&[0, 1, 2, 3, 4]); @@ -944,7 +779,6 @@ mod tests { let buf = Buffer::from(&[0, 1, 2, 3, 4]); let buf2 = buf; assert_eq!(5, buf2.len()); - assert_eq!(64, buf2.capacity()); assert!(!buf2.raw_data().is_null()); assert_eq!([0, 1, 2, 3, 4], buf2.data()); } @@ -1035,7 +869,6 @@ mod tests { #[test] fn test_mutable_new() { let buf = MutableBuffer::new(63); - assert_eq!(64, buf.capacity()); assert_eq!(0, buf.len()); assert!(buf.is_empty()); } @@ -1058,80 +891,6 @@ mod tests { assert_eq!(b"hello arrow", buf.data()); } - #[test] - fn test_mutable_reserve() { - let mut buf = MutableBuffer::new(1); - assert_eq!(64, buf.capacity()); - - // Reserving a smaller capacity should have no effect. - let mut new_cap = buf.reserve(10); - assert_eq!(64, new_cap); - assert_eq!(64, buf.capacity()); - - new_cap = buf.reserve(100); - assert_eq!(128, new_cap); - assert_eq!(128, buf.capacity()); - } - - #[test] - fn test_mutable_resize() { - let mut buf = MutableBuffer::new(1); - assert_eq!(64, buf.capacity()); - assert_eq!(0, buf.len()); - - buf.resize(20); - assert_eq!(64, buf.capacity()); - assert_eq!(20, buf.len()); - - buf.resize(10); - assert_eq!(64, buf.capacity()); - assert_eq!(10, buf.len()); - - buf.resize(100); - assert_eq!(128, buf.capacity()); - assert_eq!(100, buf.len()); - - buf.resize(30); - assert_eq!(64, buf.capacity()); - assert_eq!(30, buf.len()); - - buf.resize(0); - assert_eq!(0, buf.capacity()); - assert_eq!(0, buf.len()); - } - - #[test] - fn test_mutable_freeze() { - let mut buf = MutableBuffer::new(1); - buf.extend_from_slice(b"aaaa bbbb cccc dddd"); - assert_eq!(19, buf.len()); - assert_eq!(64, buf.capacity()); - assert_eq!(b"aaaa bbbb cccc dddd", buf.data()); - - let immutable_buf = buf.freeze(); - assert_eq!(19, immutable_buf.len()); - assert_eq!(64, immutable_buf.capacity()); - assert_eq!(b"aaaa bbbb cccc dddd", immutable_buf.data()); - } - - #[test] - fn test_mutable_equal() -> Result<()> { - let mut buf = MutableBuffer::new(1); - let mut buf2 = MutableBuffer::new(1); - - buf.extend_from_slice(&[0xaa]); - buf2.extend_from_slice(&[0xaa, 0xbb]); - assert!(buf != buf2); - - buf.extend_from_slice(&[0xbb]); - assert_eq!(buf, buf2); - - buf2.reserve(65); - assert!(buf != buf2); - - Ok(()) - } - #[test] fn test_access_concurrently() { let buffer = Buffer::from(vec![1, 2, 3, 4, 5]); diff --git a/rust/arrow/src/bytes.rs b/rust/arrow/src/bytes.rs index 0363d8735a527..4d65da92a83a7 100644 --- a/rust/arrow/src/bytes.rs +++ b/rust/arrow/src/bytes.rs @@ -23,7 +23,7 @@ use core::slice; use std::sync::Arc; use std::{fmt::Debug, fmt::Formatter}; -use crate::{ffi, memory}; +use crate::ffi; /// Mode of deallocating memory regions pub enum Deallocation { @@ -56,10 +56,7 @@ impl Debug for Deallocation { /// foreign deallocator to deallocate the region when it is no longer needed. pub struct Bytes { /// The raw pointer to be begining of the region - ptr: *const u8, - - /// The number of bytes visible to this region. This is always smaller than its capacity (when avaliable). - len: usize, + ptr: Vec, /// how to deallocate this region deallocation: Deallocation, @@ -78,10 +75,9 @@ impl Bytes { /// /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. - pub unsafe fn new(ptr: *const u8, len: usize, deallocation: Deallocation) -> Bytes { + pub unsafe fn new(ptr: Vec, deallocation: Deallocation) -> Bytes { Bytes { ptr, - len, deallocation, } } @@ -93,12 +89,12 @@ impl Bytes { #[inline] pub fn len(&self) -> usize { - self.len + self.data.len() } #[inline] pub fn is_empty(&self) -> bool { - self.len == 0 + self.len() == 0 } #[inline] @@ -144,23 +140,10 @@ impl PartialEq for Bytes { impl Debug for Bytes { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "Bytes {{ ptr: {:?}, len: {}, data: ", self.ptr, self.len,)?; + write!(f, "Bytes {{ data: ")?; f.debug_list().entries(self.as_slice().iter()).finish()?; write!(f, " }}") } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_dealloc_native() { - let capacity = 5; - let a = memory::allocate_aligned(capacity); - // create Bytes and release it. This will make `a` be an invalid pointer, but it is defined behavior - unsafe { Bytes::new(a, 3, Deallocation::Native(capacity)) }; - } -} diff --git a/rust/arrow/src/lib.rs b/rust/arrow/src/lib.rs index 9c91d38566fd2..cd0310d2f683b 100644 --- a/rust/arrow/src/lib.rs +++ b/rust/arrow/src/lib.rs @@ -147,7 +147,6 @@ pub mod error; pub mod ffi; pub mod ipc; pub mod json; -pub mod memory; pub mod record_batch; pub mod tensor; pub mod util; diff --git a/rust/arrow/src/memory.rs b/rust/arrow/src/memory.rs deleted file mode 100644 index 8bd334469ee44..0000000000000 --- a/rust/arrow/src/memory.rs +++ /dev/null @@ -1,273 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Defines memory-related functions, such as allocate/deallocate/reallocate memory -//! regions, cache and allocation alignments. - -use std::mem::align_of; -use std::ptr::NonNull; -use std::{alloc::Layout, sync::atomic::AtomicIsize}; - -// NOTE: Below code is written for spatial/temporal prefetcher optimizations. Memory allocation -// should align well with usage pattern of cache access and block sizes on layers of storage levels from -// registers to non-volatile memory. These alignments are all cache aware alignments incorporated -// from [cuneiform](https://crates.io/crates/cuneiform) crate. This approach mimicks Intel TBB's -// cache_aligned_allocator which exploits cache locality and minimizes prefetch signals -// resulting in less round trip time between the layers of storage. -// For further info: https://software.intel.com/en-us/node/506094 - -// 32-bit architecture and things other than netburst microarchitecture are using 64 bytes. -/// Cache and allocation multiple alignment size -#[cfg(target_arch = "x86")] -pub const ALIGNMENT: usize = 1 << 6; - -// Intel x86_64: -// L2D streamer from L1: -// Loads data or instructions from memory to the second-level cache. To use the streamer, -// organize the data or instructions in blocks of 128 bytes, aligned on 128 bytes. -// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf -/// Cache and allocation multiple alignment size -#[cfg(target_arch = "x86_64")] -pub const ALIGNMENT: usize = 1 << 7; - -// 24Kc: -// Data Line Size -// - https://s3-eu-west-1.amazonaws.com/downloads-mips/documents/MD00346-2B-24K-DTS-04.00.pdf -// - https://gitlab.e.foundation/e/devices/samsung/n7100/stable_android_kernel_samsung_smdk4412/commit/2dbac10263b2f3c561de68b4c369bc679352ccee -/// Cache and allocation multiple alignment size -#[cfg(target_arch = "mips")] -pub const ALIGNMENT: usize = 1 << 5; -/// Cache and allocation multiple alignment size -#[cfg(target_arch = "mips64")] -pub const ALIGNMENT: usize = 1 << 5; - -// Defaults for powerpc -/// Cache and allocation multiple alignment size -#[cfg(target_arch = "powerpc")] -pub const ALIGNMENT: usize = 1 << 5; - -// Defaults for the ppc 64 -/// Cache and allocation multiple alignment size -#[cfg(target_arch = "powerpc64")] -pub const ALIGNMENT: usize = 1 << 6; - -// e.g.: sifive -// - https://github.com/torvalds/linux/blob/master/Documentation/devicetree/bindings/riscv/sifive-l2-cache.txt#L41 -// in general all of them are the same. -/// Cache and allocation multiple alignment size -#[cfg(target_arch = "riscv")] -pub const ALIGNMENT: usize = 1 << 6; - -// This size is same across all hardware for this architecture. -// - https://docs.huihoo.com/doxygen/linux/kernel/3.7/arch_2s390_2include_2asm_2cache_8h.html -/// Cache and allocation multiple alignment size -#[cfg(target_arch = "s390x")] -pub const ALIGNMENT: usize = 1 << 8; - -// This size is same across all hardware for this architecture. -// - https://docs.huihoo.com/doxygen/linux/kernel/3.7/arch_2sparc_2include_2asm_2cache_8h.html#a9400cc2ba37e33279bdbc510a6311fb4 -/// Cache and allocation multiple alignment size -#[cfg(target_arch = "sparc")] -pub const ALIGNMENT: usize = 1 << 5; -/// Cache and allocation multiple alignment size -#[cfg(target_arch = "sparc64")] -pub const ALIGNMENT: usize = 1 << 6; - -// On ARM cache line sizes are fixed. both v6 and v7. -// Need to add board specific or platform specific things later. -/// Cache and allocation multiple alignment size -#[cfg(target_arch = "thumbv6")] -pub const ALIGNMENT: usize = 1 << 5; -/// Cache and allocation multiple alignment size -#[cfg(target_arch = "thumbv7")] -pub const ALIGNMENT: usize = 1 << 5; - -// Operating Systems cache size determines this. -// Currently no way to determine this without runtime inference. -/// Cache and allocation multiple alignment size -#[cfg(target_arch = "wasm32")] -pub const ALIGNMENT: usize = FALLBACK_ALIGNMENT; - -// Same as v6 and v7. -// List goes like that: -// Cortex A, M, R, ARM v7, v7-M, Krait and NeoverseN uses this size. -/// Cache and allocation multiple alignment size -#[cfg(target_arch = "arm")] -pub const ALIGNMENT: usize = 1 << 5; - -// Combined from 4 sectors. Volta says 128. -// Prevent chunk optimizations better to go to the default size. -// If you have smaller data with less padded functionality then use 32 with force option. -// - https://devtalk.nvidia.com/default/topic/803600/variable-cache-line-width-/ -/// Cache and allocation multiple alignment size -#[cfg(target_arch = "nvptx")] -pub const ALIGNMENT: usize = 1 << 7; -/// Cache and allocation multiple alignment size -#[cfg(target_arch = "nvptx64")] -pub const ALIGNMENT: usize = 1 << 7; - -// This size is same across all hardware for this architecture. -/// Cache and allocation multiple alignment size -#[cfg(target_arch = "aarch64")] -pub const ALIGNMENT: usize = 1 << 6; - -#[doc(hidden)] -/// Fallback cache and allocation multiple alignment size -const FALLBACK_ALIGNMENT: usize = 1 << 6; - -/// -/// As you can see this is global and lives as long as the program lives. -/// Be careful to not write anything to this pointer in any scenario. -/// If you use allocation methods shown here you won't have any problems. -const BYPASS_PTR: NonNull = unsafe { NonNull::new_unchecked(ALIGNMENT as *mut u8) }; - -// If this number is not zero after all objects have been `drop`, there is a memory leak -pub static mut ALLOCATIONS: AtomicIsize = AtomicIsize::new(0); - -pub fn allocate_aligned(size: usize) -> *mut u8 { - unsafe { - if size == 0 { - // In a perfect world, there is no need to request zero size allocation. - // Currently, passing zero sized layout to alloc is UB. - // This will dodge allocator api for any type. - BYPASS_PTR.as_ptr() - } else { - ALLOCATIONS.fetch_add(size as isize, std::sync::atomic::Ordering::SeqCst); - - let layout = Layout::from_size_align_unchecked(size, ALIGNMENT); - std::alloc::alloc_zeroed(layout) - } - } -} - -/// # Safety -/// -/// This function is unsafe because undefined behavior can result if the caller does not ensure all -/// of the following: -/// -/// * ptr must denote a block of memory currently allocated via this allocator, -/// -/// * size must be the same size that was used to allocate that block of memory, -pub unsafe fn free_aligned(ptr: *mut u8, size: usize) { - if ptr != BYPASS_PTR.as_ptr() { - ALLOCATIONS.fetch_sub(size as isize, std::sync::atomic::Ordering::SeqCst); - std::alloc::dealloc(ptr, Layout::from_size_align_unchecked(size, ALIGNMENT)); - } -} - -/// # Safety -/// -/// This function is unsafe because undefined behavior can result if the caller does not ensure all -/// of the following: -/// -/// * ptr must be currently allocated via this allocator, -/// -/// * new_size must be greater than zero. -/// -/// * new_size, when rounded up to the nearest multiple of [ALIGNMENT], must not overflow (i.e., -/// the rounded value must be less than usize::MAX). -pub unsafe fn reallocate(ptr: *mut u8, old_size: usize, new_size: usize) -> *mut u8 { - if ptr == BYPASS_PTR.as_ptr() { - return allocate_aligned(new_size); - } - - if new_size == 0 { - free_aligned(ptr, old_size); - return BYPASS_PTR.as_ptr(); - } - - ALLOCATIONS.fetch_add( - new_size as isize - old_size as isize, - std::sync::atomic::Ordering::SeqCst, - ); - let new_ptr = std::alloc::realloc( - ptr, - Layout::from_size_align_unchecked(old_size, ALIGNMENT), - new_size, - ); - - if !new_ptr.is_null() && new_size > old_size { - new_ptr.add(old_size).write_bytes(0, new_size - old_size); - } - - new_ptr -} - -/// # Safety -/// -/// Behavior is undefined if any of the following conditions are violated: -/// -/// * `src` must be valid for reads of `len * size_of::()` bytes. -/// -/// * `dst` must be valid for writes of `len * size_of::()` bytes. -/// -/// * Both `src` and `dst` must be properly aligned. -/// -/// `memcpy` creates a bitwise copy of `T`, regardless of whether `T` is [`Copy`]. If `T` is not -/// [`Copy`], using both the values in the region beginning at `*src` and the region beginning at -/// `*dst` can [violate memory safety][read-ownership]. -pub unsafe fn memcpy(dst: *mut u8, src: *const u8, len: usize) { - if len != 0x00 && src != BYPASS_PTR.as_ptr() { - std::ptr::copy_nonoverlapping(src, dst, len) - } -} - -extern "C" { - pub fn memcmp(p1: *const u8, p2: *const u8, len: usize) -> i32; -} - -/// Check if the pointer `p` is aligned to offset `a`. -pub fn is_aligned(p: *const T, a: usize) -> bool { - let a_minus_one = a.wrapping_sub(1); - let pmoda = p as usize & a_minus_one; - pmoda == 0 -} - -pub fn is_ptr_aligned(p: *const T) -> bool { - p.align_offset(align_of::()) == 0 -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_allocate() { - for _ in 0..10 { - let p = allocate_aligned(1024); - // make sure this is 64-byte aligned - assert_eq!(0, (p as usize) % 64); - unsafe { free_aligned(p, 1024) }; - } - } - - #[test] - fn test_is_aligned() { - // allocate memory aligned to 64-byte - let mut ptr = allocate_aligned(10); - assert_eq!(true, is_aligned::(ptr, 1)); - assert_eq!(true, is_aligned::(ptr, 2)); - assert_eq!(true, is_aligned::(ptr, 4)); - - // now make the memory aligned to 63-byte - ptr = unsafe { ptr.offset(1) }; - assert_eq!(true, is_aligned::(ptr, 1)); - assert_eq!(false, is_aligned::(ptr, 2)); - assert_eq!(false, is_aligned::(ptr, 4)); - unsafe { free_aligned(ptr.offset(-1), 10) }; - } -} From 666ac94db352ae89d90c04074f7068b1e4fd9262 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 28 Nov 2020 21:45:38 +0100 Subject: [PATCH 2/3] Simplified API for buffer. --- rust/arrow/src/array/array_boolean.rs | 9 +-- rust/arrow/src/array/builder.rs | 5 +- rust/arrow/src/array/transform/list.rs | 2 +- rust/arrow/src/array/transform/utils.rs | 2 +- .../src/array/transform/variable_size.rs | 4 +- rust/arrow/src/buffer.rs | 66 ++++++++++--------- rust/arrow/src/bytes.rs | 65 ++++++------------ rust/arrow/src/ffi.rs | 11 +++- 8 files changed, 70 insertions(+), 94 deletions(-) diff --git a/rust/arrow/src/array/array_boolean.rs b/rust/arrow/src/array/array_boolean.rs index 82deca9bd9f27..e4a70ba4e5fdd 100644 --- a/rust/arrow/src/array/array_boolean.rs +++ b/rust/arrow/src/array/array_boolean.rs @@ -24,7 +24,6 @@ use std::{convert::From, sync::Arc}; use super::*; use super::{array::print_long_array, raw_pointer::RawPtrBox}; use crate::buffer::{Buffer, MutableBuffer}; -use crate::memory; use crate::util::bit_util; /// Array of bools @@ -148,10 +147,6 @@ impl From for BooleanArray { "BooleanArray data should contain a single buffer only (values buffer)" ); let raw_values = data.buffers()[0].raw_data(); - assert!( - memory::is_aligned::(raw_values, mem::align_of::()), - "memory is not aligned" - ); Self { data, raw_values: RawPtrBox::new(raw_values as *const u8), @@ -185,9 +180,7 @@ impl>> FromIterator for BooleanArray { let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); let mut val_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); - let data = unsafe { - std::slice::from_raw_parts_mut(val_buf.raw_data_mut(), val_buf.capacity()) - }; + let data = val_buf.data_mut(); let null_slice = null_buf.data_mut(); iter.enumerate().for_each(|(i, item)| { diff --git a/rust/arrow/src/array/builder.rs b/rust/arrow/src/array/builder.rs index 972b961352fa3..074cdf522fdbf 100644 --- a/rust/arrow/src/array/builder.rs +++ b/rust/arrow/src/array/builder.rs @@ -117,8 +117,6 @@ pub trait BufferBuilderTrait { /// use arrow::array::{UInt8BufferBuilder, BufferBuilderTrait}; /// /// let mut builder = UInt8BufferBuilder::new(10); - /// - /// assert!(builder.capacity() >= 10); /// ``` fn new(capacity: usize) -> Self; @@ -178,8 +176,6 @@ pub trait BufferBuilderTrait { /// /// let mut builder = UInt8BufferBuilder::new(10); /// builder.reserve(10); - /// - /// assert!(builder.capacity() >= 20); /// ``` fn reserve(&mut self, n: usize); @@ -364,6 +360,7 @@ impl BooleanBufferBuilder { self.reserve(n); if n != 0 && v { let data = self.buffer.data_mut(); + (self.len..self.len + n).for_each(|i| bit_util::set_bit(data, i)) } self.len += n; Ok(()) diff --git a/rust/arrow/src/array/transform/list.rs b/rust/arrow/src/array/transform/list.rs index 43c7287fcf277..849fc95538427 100644 --- a/rust/arrow/src/array/transform/list.rs +++ b/rust/arrow/src/array/transform/list.rs @@ -59,7 +59,7 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend { let buffer = &mut mutable.buffers[0]; let delta_len = array.len() - array.null_count(); - buffer.reserve(buffer.len() + delta_len * std::mem::size_of::()); + buffer.reserve(delta_len * std::mem::size_of::()); let child = &mut mutable.child_data[0]; (start..start + len).for_each(|i| { diff --git a/rust/arrow/src/array/transform/utils.rs b/rust/arrow/src/array/transform/utils.rs index df9ce2453be14..548937e5910e7 100644 --- a/rust/arrow/src/array/transform/utils.rs +++ b/rust/arrow/src/array/transform/utils.rs @@ -53,7 +53,7 @@ pub(super) fn extend_offsets( mut last_offset: T, offsets: &[T], ) { - buffer.reserve(buffer.len() + offsets.len() * std::mem::size_of::()); + buffer.reserve(offsets.len() * std::mem::size_of::()); offsets.windows(2).for_each(|offsets| { // compute the new offset let length = offsets[1] - offsets[0]; diff --git a/rust/arrow/src/array/transform/variable_size.rs b/rust/arrow/src/array/transform/variable_size.rs index 6735c8471ff2a..772828146f5fa 100644 --- a/rust/arrow/src/array/transform/variable_size.rs +++ b/rust/arrow/src/array/transform/variable_size.rs @@ -67,9 +67,7 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend { let (offset_buffer, values_buffer) = mutable.buffers.split_at_mut(1); let offset_buffer = &mut offset_buffer[0]; let values_buffer = &mut values_buffer[0]; - offset_buffer.reserve( - offset_buffer.len() + array.len() * std::mem::size_of::(), - ); + offset_buffer.reserve(array.len() * std::mem::size_of::()); (start..start + len).for_each(|i| { if array.is_valid(i) { diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index cf89a7a00ff5f..8f2a029bb8043 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -21,12 +21,8 @@ #[cfg(feature = "simd")] use packed_simd::u8x64; -use crate::{ - bytes::{Bytes, Deallocation}, - ffi, -}; +use crate::bytes::{Bytes, Deallocation}; -use std::cmp; use std::convert::AsRef; use std::fmt::Debug; use std::mem; @@ -44,34 +40,44 @@ use crate::util::bit_util::ceil; #[cfg(any(feature = "simd", feature = "avx512"))] use std::borrow::BorrowMut; -/// Buffer is a contiguous memory region of fixed size and is aligned at a 64-byte -/// boundary. Buffer is immutable. +/// Buffer is a shared contiguous memory region. #[derive(Clone, PartialEq, Debug)] pub struct Buffer { /// Reference-counted pointer to the internal byte buffer. data: Arc, - /// The offset into the buffer. + /// The offset into contiguous memory region offset: usize, } impl Buffer { + pub(crate) fn new(data: Bytes) -> Self { + Self { + data: Arc::new(data), + offset: 0, + } + } + /// Returns the number of bytes in the buffer + #[inline] pub fn len(&self) -> usize { self.data.len() - self.offset } /// Returns whether the buffer is empty. + #[inline] pub fn is_empty(&self) -> bool { - self.data.len() - self.offset == 0 + self.len() == 0 } /// Returns the byte slice stored in this buffer + #[inline] pub fn data(&self) -> &[u8] { &self.data.as_slice()[self.offset..] } /// Returns a slice of this buffer, starting from `offset`. + #[inline] pub fn slice(&self, offset: usize) -> Self { assert!( offset <= self.len(), @@ -87,8 +93,9 @@ impl Buffer { /// /// Note that this should be used cautiously, and the returned pointer should not be /// stored anywhere, to avoid dangling pointers. + #[inline] pub fn raw_data(&self) -> *const u8 { - unsafe { self.data.raw_data().add(self.offset) } + self.data.as_slice()[self.offset..].as_ptr() } /// View buffer as typed slice. @@ -101,6 +108,7 @@ impl Buffer { /// /// Also `typed_data::` is unsafe as `0x00` and `0x01` are the only valid values for /// `bool` in Rust. However, `bool` arrays in Arrow are bit-packed which breaks this condition. + #[inline] pub unsafe fn typed_data(&self) -> &[T] { assert_eq!(self.len() % mem::size_of::(), 0); from_raw_parts( @@ -112,6 +120,7 @@ impl Buffer { /// Returns a slice of this buffer starting at a certain bit offset. /// If the offset is byte-aligned the returned buffer is a shallow clone, /// otherwise a new buffer is allocated and filled with a copy of the bits in the range. + #[inline] pub fn bit_slice(&self, offset: usize, len: usize) -> Self { if offset % 8 == 0 && len % 8 == 0 { return self.slice(offset / 8); @@ -149,11 +158,8 @@ impl Buffer { /// allocated memory region. impl> From for Buffer { fn from(p: T) -> Self { - let a = p.as_ref(); - Self { - data: Arc::new(a.to_vec()), - offset: 0, - } + let bytes = unsafe { Bytes::new(p.as_ref().to_vec(), Deallocation::Native) }; + Buffer::new(bytes) } } @@ -597,9 +603,6 @@ impl Not for &Buffer { } } -unsafe impl Sync for Buffer {} -unsafe impl Send for Buffer {} - /// Similar to `Buffer`, but is growable and can be mutated. A mutable buffer can be /// converted into a immutable buffer via the `freeze` method. #[derive(Debug)] @@ -609,6 +612,7 @@ pub struct MutableBuffer { impl MutableBuffer { /// Allocate a new mutable buffer with initial capacity to be `capacity`. + #[inline] pub fn new(capacity: usize) -> Self { Self { data: Vec::with_capacity(capacity), @@ -616,6 +620,7 @@ impl MutableBuffer { } /// creates a new [MutableBuffer] where every bit is initialized to `0` + #[inline] pub fn new_null(len: usize) -> Self { let num_bytes = bit_util::ceil(len, 8); MutableBuffer::new(num_bytes).with_bitset(num_bytes, false) @@ -627,6 +632,7 @@ impl MutableBuffer { /// This is useful when one wants to clear (or set) the bits and then manipulate /// the buffer directly (e.g., modifying the buffer by holding a mutable reference /// from `data_mut()`). + #[inline] pub fn with_bitset(mut self, end: usize, val: bool) -> Self { assert!(end <= self.data.capacity()); let v = if val { 255 } else { 0 }; @@ -642,6 +648,7 @@ impl MutableBuffer { /// This is used to initialize the bits in a buffer, however, it has no impact on the /// `len` of the buffer and so can be used to initialize the memory region from /// `len` to `capacity`. + #[inline] pub fn set_null_bits(&mut self, start: usize, count: usize) { assert!(start + count <= self.data.capacity()); unsafe { @@ -649,14 +656,10 @@ impl MutableBuffer { } } - /// Ensures that this buffer has at least `capacity` slots in this buffer. This will - /// also ensure the new capacity will be a multiple of 64 bytes. - /// - /// Returns the new capacity for this buffer. - pub fn reserve(&mut self, capacity: usize) { - if capacity < self.data.capacity() { - self.data.reserve(self.data.capacity() - capacity); - } + /// Reserves capacity for at least `additional` more bytes + #[inline] + pub fn reserve(&mut self, additional: usize) { + self.data.reserve(additional); } /// Resizes the buffer so that the `len` will equal to the `new_len`. @@ -666,6 +669,7 @@ impl MutableBuffer { /// `new_len` will be zeroed out. /// /// If `new_len` is less than `len`, the buffer will be truncated. + #[inline] pub fn resize(&mut self, new_len: usize) { self.data.resize(new_len, 0); } @@ -683,26 +687,28 @@ impl MutableBuffer { } /// Clear all existing data from this buffer. + #[inline] pub fn clear(&mut self) { self.data.clear() } /// Returns the data stored in this buffer as a slice. + #[inline] pub fn data(&self) -> &[u8] { &self.data } /// Returns the data stored in this buffer as a mutable slice. + #[inline] pub fn data_mut(&mut self) -> &mut [u8] { &mut self.data } /// Freezes this buffer and return an immutable version of it. + #[inline] pub fn freeze(self) -> Buffer { - Buffer { - data: Arc::new(self.data), - offset: 0, - } + let data = unsafe { Bytes::new(self.data.to_vec(), Deallocation::Native) }; + Buffer::new(data) } /// View buffer as typed slice. diff --git a/rust/arrow/src/bytes.rs b/rust/arrow/src/bytes.rs index 4d65da92a83a7..46f14a3a6f9d2 100644 --- a/rust/arrow/src/bytes.rs +++ b/rust/arrow/src/bytes.rs @@ -19,16 +19,15 @@ //! how to de-allocate itself, [`Bytes`]. //! Note that this is a low-level functionality of this crate. -use core::slice; -use std::sync::Arc; use std::{fmt::Debug, fmt::Formatter}; +use std::{mem::ManuallyDrop, sync::Arc}; use crate::ffi; /// Mode of deallocating memory regions pub enum Deallocation { - /// Native deallocation, using Rust deallocator with Arrow-specific memory aligment - Native(usize), + /// Native deallocation, using the native deallocator + Native, /// Foreign interface, via a callback Foreign(Arc), } @@ -36,8 +35,8 @@ pub enum Deallocation { impl Debug for Deallocation { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { match self { - Deallocation::Native(capacity) => { - write!(f, "Deallocation::Native {{ capacity: {} }}", capacity) + Deallocation::Native => { + write!(f, "Deallocation::Native") } Deallocation::Foreign(_) => { write!(f, "Deallocation::Foreign {{ capacity: unknown }}") @@ -48,43 +47,33 @@ impl Debug for Deallocation { /// A continuous, fixed-size, immutable memory region that knows how to de-allocate itself. /// This structs' API is inspired by the `bytes::Bytes`, but it is not limited to using rust's -/// global allocator nor u8 aligmnent. +/// global allocator, which is useful for FFI. /// -/// In the most common case, this buffer is allocated using [`allocate_aligned`](memory::allocate_aligned) -/// and deallocated accordingly [`free_aligned`](memory::free_aligned). +/// In the most common case, this buffer's memory is managed by the Global allocator. /// When the region is allocated by an foreign allocator, [Deallocation::Foreign], this calls the /// foreign deallocator to deallocate the region when it is no longer needed. pub struct Bytes { - /// The raw pointer to be begining of the region - ptr: Vec, + /// The bytes + data: ManuallyDrop>, /// how to deallocate this region deallocation: Deallocation, } +unsafe impl Send for Bytes {} +unsafe impl Sync for Bytes {} + impl Bytes { - /// Takes ownership of an allocated memory region, - /// - /// # Arguments - /// - /// * `ptr` - Pointer to raw parts - /// * `len` - Length of raw parts in **bytes** - /// * `capacity` - Total allocated memory for the pointer `ptr`, in **bytes** - /// - /// # Safety - /// - /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` - /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. - pub unsafe fn new(ptr: Vec, deallocation: Deallocation) -> Bytes { - Bytes { - ptr, + pub unsafe fn new(data: Vec, deallocation: Deallocation) -> Self { + Self { + data: ManuallyDrop::new(data), deallocation, } } #[inline] pub fn as_slice(&self) -> &[u8] { - unsafe { slice::from_raw_parts(self.ptr, self.len) } + &self.data[..] } #[inline] @@ -97,19 +86,9 @@ impl Bytes { self.len() == 0 } - #[inline] - pub fn raw_data(&self) -> *const u8 { - self.ptr - } - - #[inline] - pub fn raw_data_mut(&mut self) -> *mut u8 { - self.ptr as *mut u8 - } - pub fn capacity(&self) -> usize { match self.deallocation { - Deallocation::Native(capacity) => capacity, + Deallocation::Native => self.data.capacity(), // we cannot determine this in general, // and thus we state that this is externally-owned memory Deallocation::Foreign(_) => 0, @@ -121,13 +100,11 @@ impl Drop for Bytes { #[inline] fn drop(&mut self) { match &self.deallocation { - Deallocation::Native(capacity) => { - if !self.ptr.is_null() { - unsafe { memory::free_aligned(self.ptr as *mut u8, *capacity) }; - } + Deallocation::Native => { + unsafe { ManuallyDrop::drop(&mut self.data) }; } - // foreign interface knows how to deallocate itself. - Deallocation::Foreign(_) => (), + // foreign interface deallocates itself. As such, we forget the vector + Deallocation::Foreign(_) => {} } } } diff --git a/rust/arrow/src/ffi.rs b/rust/arrow/src/ffi.rs index 1d8d36da6d952..815b694b81feb 100644 --- a/rust/arrow/src/ffi.rs +++ b/rust/arrow/src/ffi.rs @@ -78,10 +78,13 @@ To export an array, create an `ArrowArray` using [ArrowArray::try_new]. use std::{ffi::CStr, ffi::CString, iter, mem::size_of, ptr, sync::Arc}; -use crate::buffer::Buffer; use crate::datatypes::DataType; use crate::error::{ArrowError, Result}; use crate::util::bit_util; +use crate::{ + buffer::Buffer, + bytes::{Bytes, Deallocation}, +}; /// ABI-compatible struct for `ArrowSchema` from C Data Interface /// See https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions @@ -391,12 +394,14 @@ unsafe fn create_buffer( let buffers = array.buffers as *mut *const u8; assert!(index < array.n_buffers as usize); - let ptr = *buffers.add(index); + let ptr = *buffers.add(index) as *mut u8; if ptr.is_null() { None } else { - Some(Buffer::from_unowned(ptr, len, array)) + let data = Vec::from_raw_parts(ptr, len, len); + let bytes = Bytes::new(data, Deallocation::Foreign(array)); + Some(Buffer::new(bytes)) } } From 8a1c52c186fc967a3656b4f8639217dcbaacd974 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 19 Dec 2020 09:01:24 +0000 Subject: [PATCH 3/3] Replaced deprecated API. --- rust/arrow/src/compute/kernels/comparison.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rust/arrow/src/compute/kernels/comparison.rs b/rust/arrow/src/compute/kernels/comparison.rs index 6c9d575b5b680..f584eaa816125 100644 --- a/rust/arrow/src/compute/kernels/comparison.rs +++ b/rust/arrow/src/compute/kernels/comparison.rs @@ -51,14 +51,14 @@ macro_rules! compare_op { let actual_capacity = bit_util::round_upto_multiple_of_64(byte_capacity); let mut buffer = MutableBuffer::new(actual_capacity); buffer.resize(byte_capacity); - let data = buffer.raw_data_mut(); + let data = buffer.data_mut(); for i in 0..$left.len() { if $op($left.value(i), $right.value(i)) { // SAFETY: this is safe as `data` has at least $left.len() elements. // and `i` is bound by $left.len() unsafe { - bit_util::set_bit_raw(data, i); + bit_util::set_bit_raw(data.as_mut_ptr(), i); } } } @@ -84,14 +84,14 @@ macro_rules! compare_op_scalar { let actual_capacity = bit_util::round_upto_multiple_of_64(byte_capacity); let mut buffer = MutableBuffer::new(actual_capacity); buffer.resize(byte_capacity); - let data = buffer.raw_data_mut(); + let data = buffer.data_mut(); for i in 0..$left.len() { if $op($left.value(i), $right) { // SAFETY: this is safe as `data` has at least $left.len() elements // and `i` is bound by $left.len() unsafe { - bit_util::set_bit_raw(data, i); + bit_util::set_bit_raw(data.as_mut_ptr(), i); } } }