Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-11045: [Rust] Fix performance issues of allocator #9027

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rust/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,7 @@ harness = false
[[bench]]
name = "mutable_array"
harness = false

[[bench]]
name = "buffer_create"
harness = false
2 changes: 1 addition & 1 deletion rust/arrow/benches/buffer_bit_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ fn create_buffer(size: usize) -> Buffer {
let mut result = MutableBuffer::new(size).with_bitset(size, false);

for i in 0..size {
result.data_mut()[i] = 0b01010101 << i << (i % 4);
result.as_slice_mut()[i] = 0b01010101 << i << (i % 4);
}

result.freeze()
Expand Down
71 changes: 71 additions & 0 deletions rust/arrow/benches/buffer_create.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#[macro_use]
extern crate criterion;
use criterion::Criterion;

extern crate arrow;

use arrow::{
buffer::{Buffer, MutableBuffer},
datatypes::ToByteSlice,
};

fn mutable_buffer(bytes: &[u32], size: usize, capacity: usize) -> Buffer {
criterion::black_box({
let mut result = MutableBuffer::new(capacity);

for _ in 0..size {
result.extend_from_slice(bytes.to_byte_slice())
}

result.freeze()
})
}

fn from_slice(bytes: &[u32], size: usize, capacity: usize) -> Buffer {
criterion::black_box({
let mut a = Vec::<u32>::with_capacity(capacity);

for _ in 0..size {
a.extend_from_slice(bytes)
}

Buffer::from(a.to_byte_slice())
})
}

fn benchmark(c: &mut Criterion) {
let bytes = &[128u32; 1025];
let size = 2usize.pow(10);

c.bench_function("mutable", |b| b.iter(|| mutable_buffer(bytes, size, 0)));

c.bench_function("mutable prepared", |b| {
b.iter(|| mutable_buffer(bytes, size, size * bytes.len() * std::mem::size_of::<u32>()))
});

c.bench_function("from_slice", |b| b.iter(|| from_slice(bytes, size, 0)));

c.bench_function("from_slice prepared", |b| {
b.iter(|| from_slice(bytes, size, size * bytes.len()))
});
}

criterion_group!(benches, benchmark);
criterion_main!(benches);
159 changes: 159 additions & 0 deletions rust/arrow/src/alloc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

/// This module is largely copied (with minor simplifications) from the implementation of Rust's std alloc.
/// Rust's current allocator's API is `unstable`, but it solves our main use-case: a container with a custom alignment.
/// The problem that this module solves is to offer an API to allocate, reallocate and deallocate
/// Memory blocks. Main responsibilities:
/// * safeguards against null pointers
/// * panic on Out of memory (OOM)
/// * global thread-safe tracking of allocations
/// * only allocate initialized regions (zero).
/// This module makes no assumptions about type alignments or buffer sizes. Consumers use [std::alloc::Layout] to
/// share this information with this module.
use std::alloc::handle_alloc_error;
use std::ptr::NonNull;
use std::{alloc::Layout, sync::atomic::AtomicIsize};

// If this number is not zero after all objects have been `drop`, there is a memory leak
pub static mut ALLOCATIONS: AtomicIsize = AtomicIsize::new(0);

/// A memory region
#[derive(Debug, Copy, Clone)]
pub struct MemoryBlock {
pub ptr: NonNull<u8>,
pub size: usize,
}

/// Returns a dangling pointer aligned with `layout.align()`.
#[inline]
pub fn dangling(layout: Layout) -> NonNull<u8> {
// SAFETY: align is guaranteed to be non-zero
unsafe { NonNull::new_unchecked(layout.align() as *mut u8) }
}

/// Allocates a new memory region. Returns a dangling pointer iff `layout.size() == 0`.
/// # Panic
/// This function panics whenever it is impossible to allocate a new region (e.g. OOM)
#[inline]
pub fn alloc(layout: Layout) -> MemoryBlock {
debug_assert!(layout.align() > 0);
unsafe {
let size = layout.size();
if size == 0 {
MemoryBlock {
ptr: dangling(layout),
size: 0,
}
} else {
let raw_ptr = std::alloc::alloc_zeroed(layout);
let ptr = NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout));
ALLOCATIONS
.fetch_add(layout.size() as isize, std::sync::atomic::Ordering::SeqCst);
MemoryBlock { ptr, size }
}
}
}

/// Deallocates a previously allocated region. This can be safely called with a dangling pointer iff `layout.size() == 0`.
/// # Safety
/// This function requires the region to be allocated according to the `layout`.
#[inline]
pub unsafe fn dealloc(ptr: NonNull<u8>, layout: Layout) {
if layout.size() != 0 {
std::alloc::dealloc(ptr.as_ptr(), layout);
ALLOCATIONS
.fetch_sub(layout.size() as isize, std::sync::atomic::Ordering::SeqCst);
}
}

/// Initializes a [MemoryBlock] with zeros starting at `offset`.
#[inline]
unsafe fn init_zero(memory: &mut MemoryBlock, offset: usize) {
memory
.ptr
.as_ptr()
.add(offset)
.write_bytes(0, memory.size - offset);
}

/// Grows a memory region, potentially reallocating it.
// This is similar to AllocRef::grow, but without placement, since it is a no-op, and init, since we always
// allocate initialized to 0.
#[inline]
pub unsafe fn grow(ptr: NonNull<u8>, layout: Layout, new_size: usize) -> MemoryBlock {
let size = layout.size();
debug_assert!(
new_size >= size,
"`new_size` must be greater than or equal to `memory.size()`"
);

if size == new_size {
return MemoryBlock { ptr, size };
}

if layout.size() == 0 {
let new_layout = Layout::from_size_align_unchecked(new_size, layout.align());
alloc(new_layout)
} else {
let ptr = std::alloc::realloc(ptr.as_ptr(), layout, new_size);
let mut memory = MemoryBlock {
ptr: NonNull::new(ptr).unwrap_or_else(|| handle_alloc_error(layout)),
size: new_size,
};
ALLOCATIONS.fetch_add(
(new_size - size) as isize,
std::sync::atomic::Ordering::SeqCst,
);
init_zero(&mut memory, size);
memory
}
}

// similar to AllocRef::shrink, but without placement, since it is a no-op
#[inline]
pub unsafe fn shrink(ptr: NonNull<u8>, layout: Layout, new_size: usize) -> MemoryBlock {
let size = layout.size();
debug_assert!(
new_size <= size,
"`new_size` must be smaller than or equal to `memory.size()`"
);

if size == new_size {
return MemoryBlock { ptr, size };
}

if new_size == 0 {
dealloc(ptr, layout);
MemoryBlock {
ptr: dangling(layout),
size: 0,
}
} else {
// `realloc` probably checks for `new_size < size` or something similar.
let ptr = std::alloc::realloc(ptr.as_ptr(), layout, new_size);
let ptr = NonNull::new(ptr).unwrap_or_else(|| handle_alloc_error(layout));
ALLOCATIONS.fetch_sub(
(size - new_size) as isize,
std::sync::atomic::Ordering::SeqCst,
);
MemoryBlock {
ptr,
size: new_size,
}
}
}
36 changes: 17 additions & 19 deletions rust/arrow/src/array/array_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use std::{
};

use super::{
array::print_long_array, raw_pointer::as_aligned_pointer, raw_pointer::RawPtrBox,
Array, ArrayData, ArrayDataRef, FixedSizeListArray, GenericBinaryIter,
GenericListArray, LargeListArray, ListArray, OffsetSizeTrait,
array::print_long_array, raw_pointer::RawPtrBox, Array, ArrayData, ArrayDataRef,
FixedSizeListArray, GenericBinaryIter, GenericListArray, LargeListArray, ListArray,
OffsetSizeTrait,
};
use crate::util::bit_util;
use crate::{buffer::Buffer, datatypes::ToByteSlice};
Expand Down Expand Up @@ -82,7 +82,7 @@ impl<OffsetSize: BinaryOffsetSizeTrait> GenericBinaryArray<OffsetSize> {

#[inline]
fn value_offset_at(&self, i: usize) -> OffsetSize {
unsafe { *self.value_offsets.get().add(i) }
unsafe { *self.value_offsets.as_ptr().add(i) }
}

/// Returns the element at index `i` as a byte slice.
Expand All @@ -92,7 +92,7 @@ impl<OffsetSize: BinaryOffsetSizeTrait> GenericBinaryArray<OffsetSize> {
unsafe {
let pos = self.value_offset_at(offset);
std::slice::from_raw_parts(
self.value_data.get().offset(pos.to_isize()),
self.value_data.as_ptr().offset(pos.to_isize()),
(self.value_offset_at(offset + 1) - pos).to_usize().unwrap(),
)
}
Expand Down Expand Up @@ -205,14 +205,12 @@ impl<OffsetSize: BinaryOffsetSizeTrait> From<ArrayDataRef>
2,
"BinaryArray data should contain 2 buffers only (offsets and values)"
);
let raw_value_offsets = data.buffers()[0].raw_data();
let value_data = data.buffers()[1].raw_data();
let offsets = data.buffers()[0].as_ptr();
let values = data.buffers()[1].as_ptr();
Self {
data,
value_offsets: RawPtrBox::new(as_aligned_pointer::<OffsetSize>(
raw_value_offsets,
)),
value_data: RawPtrBox::new(value_data),
value_offsets: unsafe { RawPtrBox::new(offsets) },
value_data: unsafe { RawPtrBox::new(values) },
}
}
}
Expand All @@ -234,7 +232,7 @@ where
offsets.push(length_so_far);

{
let null_slice = null_buf.data_mut();
let null_slice = null_buf.as_slice_mut();

for (i, s) in iter.enumerate() {
if let Some(s) = s {
Expand Down Expand Up @@ -328,7 +326,7 @@ impl FixedSizeBinaryArray {
unsafe {
let pos = self.value_offset_at(offset);
std::slice::from_raw_parts(
self.value_data.get().offset(pos as isize),
self.value_data.as_ptr().offset(pos as isize),
(self.value_offset_at(offset + 1) - pos) as usize,
)
}
Expand Down Expand Up @@ -389,7 +387,7 @@ impl From<Vec<Option<Vec<u8>>>> for FixedSizeBinaryArray {

let num_bytes = bit_util::ceil(len, 8);
let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);
let null_slice = null_buf.data_mut();
let null_slice = null_buf.as_slice_mut();

data.iter().enumerate().for_each(|(i, entry)| {
if entry.is_some() {
Expand Down Expand Up @@ -421,14 +419,14 @@ impl From<ArrayDataRef> for FixedSizeBinaryArray {
1,
"FixedSizeBinaryArray data should contain 1 buffer only (values)"
);
let value_data = data.buffers()[0].raw_data();
let value_data = data.buffers()[0].as_ptr();
let length = match data.data_type() {
DataType::FixedSizeBinary(len) => *len,
_ => panic!("Expected data type to be FixedSizeBinary"),
};
Self {
data,
value_data: RawPtrBox::new(value_data),
value_data: unsafe { RawPtrBox::new(value_data) },
length,
}
}
Expand Down Expand Up @@ -514,7 +512,7 @@ impl DecimalArray {
let raw_val = unsafe {
let pos = self.value_offset_at(offset);
std::slice::from_raw_parts(
self.value_data.get().offset(pos as isize),
self.value_data.as_ptr().offset(pos as isize),
(self.value_offset_at(offset + 1) - pos) as usize,
)
};
Expand Down Expand Up @@ -589,15 +587,15 @@ impl From<ArrayDataRef> for DecimalArray {
1,
"DecimalArray data should contain 1 buffer only (values)"
);
let value_data = data.buffers()[0].raw_data();
let values = data.buffers()[0].as_ptr();
let (precision, scale) = match data.data_type() {
DataType::Decimal(precision, scale) => (*precision, *scale),
_ => panic!("Expected data type to be Decimal"),
};
let length = 16;
Self {
data,
value_data: RawPtrBox::new(value_data),
value_data: unsafe { RawPtrBox::new(values) },
precision,
scale,
length,
Expand Down
Loading