Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-10804: [Rust] Removed some unsafe code from the parquet crate #8829

Closed
wants to merge 4 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 85 additions & 139 deletions rust/parquet/src/arrow/record_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
// under the License.

use std::cmp::{max, min};
use std::mem::align_of;
use std::mem::size_of;
use std::mem::{replace, swap};
use std::slice;
use std::mem::{replace, size_of};

use crate::column::{page::PageReader, reader::ColumnReaderImpl};
use crate::data_type::DataType;
Expand All @@ -28,7 +25,6 @@ use crate::schema::types::ColumnDescPtr;
use arrow::array::BooleanBufferBuilder;
use arrow::bitmap::Bitmap;
use arrow::buffer::{Buffer, MutableBuffer};
use arrow::memory;

const MIN_BATCH_SIZE: usize = 1024;

Expand All @@ -53,45 +49,6 @@ pub struct RecordReader<T: DataType> {
in_middle_of_record: bool,
}

#[derive(Debug)]
struct FatPtr<'a, T> {
ptr: &'a mut [T],
}

impl<'a, T> FatPtr<'a, T> {
fn new(ptr: &'a mut [T]) -> Self {
Self { ptr }
}

fn with_offset(buf: &'a mut MutableBuffer, offset: usize) -> Self {
FatPtr::<T>::with_offset_and_size(buf, offset, size_of::<T>())
}

fn with_offset_and_size(
buf: &'a mut MutableBuffer,
offset: usize,
type_size: usize,
) -> Self {
assert!(align_of::<T>() <= memory::ALIGNMENT);
// TODO Prevent this from being called with non primitive types (like `Box<A>`)
unsafe {
FatPtr::new(slice::from_raw_parts_mut(
&mut *(buf.raw_data() as *mut T).add(offset),
buf.capacity() / type_size - offset,
))
}
}

fn to_slice(&self) -> &[T] {
self.ptr
}

#[allow(clippy::wrong_self_convention)]
fn to_slice_mut(&mut self) -> &mut [T] {
self.ptr
}
}

impl<T: DataType> RecordReader<T> {
pub fn new(column_schema: ColumnDescPtr) -> Self {
let (def_levels, null_map) = if column_schema.max_def_level() > 0 {
Expand Down Expand Up @@ -199,21 +156,19 @@ impl<T: DataType> RecordReader<T> {
pub fn consume_def_levels(&mut self) -> Result<Option<Buffer>> {
let new_buffer = if let Some(ref mut def_levels_buf) = &mut self.def_levels {
let num_left_values = self.values_written - self.num_values;
let mut new_buffer = MutableBuffer::new(
size_of::<i16>() * max(MIN_BATCH_SIZE, num_left_values),
);
new_buffer.resize(num_left_values * size_of::<i16>());
// create an empty buffer, as it will be resized below
let mut new_buffer = MutableBuffer::new(0);
let num_bytes = num_left_values * size_of::<i16>();
let new_len = self.num_values * size_of::<i16>();

new_buffer.resize(num_bytes);

let mut new_def_levels = FatPtr::<i16>::with_offset(&mut new_buffer, 0);
let new_def_levels = new_def_levels.to_slice_mut();
let left_def_levels =
FatPtr::<i16>::with_offset(def_levels_buf, self.num_values);
let left_def_levels = left_def_levels.to_slice();
let new_def_levels = new_buffer.data_mut();
let left_def_levels = &def_levels_buf.data_mut()[new_len..];

new_def_levels[0..num_left_values]
.copy_from_slice(&left_def_levels[0..num_left_values]);
new_def_levels[0..num_bytes].copy_from_slice(&left_def_levels[0..num_bytes]);

def_levels_buf.resize(self.num_values * size_of::<i16>());
def_levels_buf.resize(new_len);
Some(new_buffer)
} else {
None
Expand All @@ -228,21 +183,19 @@ impl<T: DataType> RecordReader<T> {
// TODO: Optimize to reduce the copy
let new_buffer = if let Some(ref mut rep_levels_buf) = &mut self.rep_levels {
let num_left_values = self.values_written - self.num_values;
let mut new_buffer = MutableBuffer::new(
size_of::<i16>() * max(MIN_BATCH_SIZE, num_left_values),
);
new_buffer.resize(num_left_values * size_of::<i16>());
// create an empty buffer, as it will be resized below
let mut new_buffer = MutableBuffer::new(0);
let num_bytes = num_left_values * size_of::<i16>();
let new_len = self.num_values * size_of::<i16>();

let mut new_rep_levels = FatPtr::<i16>::with_offset(&mut new_buffer, 0);
let new_rep_levels = new_rep_levels.to_slice_mut();
let left_rep_levels =
FatPtr::<i16>::with_offset(rep_levels_buf, self.num_values);
let left_rep_levels = left_rep_levels.to_slice();
new_buffer.resize(num_bytes);

new_rep_levels[0..num_left_values]
.copy_from_slice(&left_rep_levels[0..num_left_values]);
let new_rep_levels = new_buffer.data_mut();
let left_rep_levels = &rep_levels_buf.data_mut()[new_len..];

rep_levels_buf.resize(self.num_values * size_of::<i16>());
new_rep_levels[0..num_bytes].copy_from_slice(&left_rep_levels[0..num_bytes]);

rep_levels_buf.resize(new_len);

Some(new_buffer)
} else {
Expand All @@ -257,24 +210,19 @@ impl<T: DataType> RecordReader<T> {
pub fn consume_record_data(&mut self) -> Result<Buffer> {
// TODO: Optimize to reduce the copy
let num_left_values = self.values_written - self.num_values;
let mut new_buffer = MutableBuffer::new(max(MIN_BATCH_SIZE, num_left_values));
new_buffer.resize(num_left_values * T::get_type_size());

let mut new_records =
FatPtr::<T::T>::with_offset_and_size(&mut new_buffer, 0, T::get_type_size());
let new_records = new_records.to_slice_mut();
let mut left_records = FatPtr::<T::T>::with_offset_and_size(
&mut self.records,
self.num_values,
T::get_type_size(),
);
let left_records = left_records.to_slice_mut();
// create an empty buffer, as it will be resized below
let mut new_buffer = MutableBuffer::new(0);
let num_bytes = num_left_values * T::get_type_size();
let new_len = self.num_values * T::get_type_size();

for idx in 0..num_left_values {
swap(&mut new_records[idx], &mut left_records[idx]);
}
new_buffer.resize(num_bytes);

let new_records = new_buffer.data_mut();
let left_records = &mut self.records.data_mut()[new_len..];

self.records.resize(self.num_values * T::get_type_size());
new_records[0..num_bytes].copy_from_slice(&left_records[0..num_bytes]);

self.records.resize(new_len);

Ok(replace(&mut self.records, new_buffer).freeze())
}
Expand Down Expand Up @@ -331,70 +279,71 @@ impl<T: DataType> RecordReader<T> {
fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> {
// Reserve spaces
self.records
.reserve(self.records.len() + batch_size * T::get_type_size());
.resize(self.records.len() + batch_size * T::get_type_size());
if let Some(ref mut buf) = self.rep_levels {
buf.reserve(buf.len() + batch_size * size_of::<i16>());
buf.resize(buf.len() + batch_size * size_of::<i16>());
}
if let Some(ref mut buf) = self.def_levels {
buf.reserve(buf.len() + batch_size * size_of::<i16>());
buf.resize(buf.len() + batch_size * size_of::<i16>());
}

// Convert mutable buffer spaces to mutable slices
let mut values_buf = FatPtr::<T::T>::with_offset_and_size(
&mut self.records,
self.values_written,
T::get_type_size(),
);

let values_written = self.values_written;
let mut def_levels_buf = self
.def_levels
.as_mut()
.map(|buf| FatPtr::<i16>::with_offset(buf, values_written));

let mut rep_levels_buf = self
.rep_levels
// Convert mutable buffer spaces to mutable slices
let (prefix, values, suffix) =
unsafe { self.records.data_mut().align_to_mut::<T::T>() };
assert!(prefix.is_empty() && suffix.is_empty());
let values = &mut values[values_written..];

let def_levels = self.def_levels.as_mut().map(|buf| {
let (prefix, def_levels, suffix) =
unsafe { buf.data_mut().align_to_mut::<i16>() };
assert!(prefix.is_empty() && suffix.is_empty());
&mut def_levels[values_written..]
});

let rep_levels = self.rep_levels.as_mut().map(|buf| {
let (prefix, rep_levels, suffix) =
unsafe { buf.data_mut().align_to_mut::<i16>() };
assert!(prefix.is_empty() && suffix.is_empty());
&mut rep_levels[values_written..]
});

let (values_read, levels_read) = self
.column_reader
.as_mut()
.map(|buf| FatPtr::<i16>::with_offset(buf, values_written));
.unwrap()
.read_batch(batch_size, def_levels, rep_levels, values)?;

let (values_read, levels_read) =
self.column_reader.as_mut().unwrap().read_batch(
batch_size,
def_levels_buf.as_mut().map(|ptr| ptr.to_slice_mut()),
rep_levels_buf.as_mut().map(|ptr| ptr.to_slice_mut()),
values_buf.to_slice_mut(),
)?;
// get new references for the def levels.
let def_levels = self.def_levels.as_ref().map(|buf| {
let (prefix, def_levels, suffix) = unsafe { buf.data().align_to::<i16>() };
assert!(prefix.is_empty() && suffix.is_empty());
&def_levels[values_written..]
});

let max_def_level = self.column_desc.max_def_level();

if values_read < levels_read {
// This means that there are null values in column data
// TODO: Move this into ColumnReader

let values_buf = values_buf.to_slice_mut();

let def_levels_buf = def_levels_buf
.as_mut()
.map(|ptr| ptr.to_slice_mut())
.ok_or_else(|| {
general_err!(
"Definition levels should exist when data is less than levels!"
)
})?;
let def_levels = def_levels.ok_or_else(|| {
general_err!(
"Definition levels should exist when data is less than levels!"
)
})?;

// Fill spaces in column data with default values
let mut values_pos = values_read;
let mut level_pos = levels_read;

while level_pos > values_pos {
if def_levels_buf[level_pos - 1] == max_def_level {
if def_levels[level_pos - 1] == max_def_level {
// This values is not empty
// We use swap rather than assign here because T::T doesn't
// implement Copy
values_buf.swap(level_pos - 1, values_pos - 1);
values.swap(level_pos - 1, values_pos - 1);
values_pos -= 1;
} else {
values_buf[level_pos - 1] = T::T::default();
values[level_pos - 1] = T::T::default();
}

level_pos -= 1;
Expand All @@ -403,16 +352,13 @@ impl<T: DataType> RecordReader<T> {

// Fill in bitmap data
if let Some(null_buffer) = self.null_bitmap.as_mut() {
let def_levels_buf = def_levels_buf
.as_mut()
.map(|ptr| ptr.to_slice_mut())
.ok_or_else(|| {
general_err!(
"Definition levels should exist when data is less than levels!"
)
})?;
let def_levels = def_levels.ok_or_else(|| {
general_err!(
"Definition levels should exist when data is less than levels!"
)
})?;
(0..levels_read).try_for_each(|idx| {
null_buffer.append(def_levels_buf[idx] == max_def_level)
null_buffer.append(def_levels[idx] == max_def_level)
})?;
}

Expand All @@ -424,13 +370,13 @@ impl<T: DataType> RecordReader<T> {
/// Split values into records according repetition definition and returns number of
/// records read.
fn split_records(&mut self, records_to_read: usize) -> Result<usize> {
let rep_levels_buf = self
.rep_levels
.as_mut()
.map(|buf| FatPtr::<i16>::with_offset(buf, 0));
let rep_levels_buf = rep_levels_buf.as_ref().map(|x| x.to_slice());
let rep_levels = self.rep_levels.as_ref().map(|buf| {
let (prefix, rep_levels, suffix) = unsafe { buf.data().align_to::<i16>() };
assert!(prefix.is_empty() && suffix.is_empty());
rep_levels
});

match rep_levels_buf {
match rep_levels {
Some(buf) => {
let mut records_read = 0;

Expand Down