diff --git a/rust/parquet/src/arrow/record_reader.rs b/rust/parquet/src/arrow/record_reader.rs index 16b084698b8af..14908de6d4350 100644 --- a/rust/parquet/src/arrow/record_reader.rs +++ b/rust/parquet/src/arrow/record_reader.rs @@ -16,10 +16,7 @@ // under the License. use std::cmp::{max, min}; -use std::mem::align_of; -use std::mem::size_of; -use std::mem::{replace, swap}; -use std::slice; +use std::mem::{replace, size_of}; use crate::column::{page::PageReader, reader::ColumnReaderImpl}; use crate::data_type::DataType; @@ -28,7 +25,6 @@ use crate::schema::types::ColumnDescPtr; use arrow::array::BooleanBufferBuilder; use arrow::bitmap::Bitmap; use arrow::buffer::{Buffer, MutableBuffer}; -use arrow::memory; const MIN_BATCH_SIZE: usize = 1024; @@ -53,45 +49,6 @@ pub struct RecordReader { in_middle_of_record: bool, } -#[derive(Debug)] -struct FatPtr<'a, T> { - ptr: &'a mut [T], -} - -impl<'a, T> FatPtr<'a, T> { - fn new(ptr: &'a mut [T]) -> Self { - Self { ptr } - } - - fn with_offset(buf: &'a mut MutableBuffer, offset: usize) -> Self { - FatPtr::::with_offset_and_size(buf, offset, size_of::()) - } - - fn with_offset_and_size( - buf: &'a mut MutableBuffer, - offset: usize, - type_size: usize, - ) -> Self { - assert!(align_of::() <= memory::ALIGNMENT); - // TODO Prevent this from being called with non primitive types (like `Box`) - unsafe { - FatPtr::new(slice::from_raw_parts_mut( - &mut *(buf.raw_data() as *mut T).add(offset), - buf.capacity() / type_size - offset, - )) - } - } - - fn to_slice(&self) -> &[T] { - self.ptr - } - - #[allow(clippy::wrong_self_convention)] - fn to_slice_mut(&mut self) -> &mut [T] { - self.ptr - } -} - impl RecordReader { pub fn new(column_schema: ColumnDescPtr) -> Self { let (def_levels, null_map) = if column_schema.max_def_level() > 0 { @@ -199,21 +156,19 @@ impl RecordReader { pub fn consume_def_levels(&mut self) -> Result> { let new_buffer = if let Some(ref mut def_levels_buf) = &mut self.def_levels { let num_left_values = self.values_written - self.num_values; - let mut new_buffer = MutableBuffer::new( - size_of::() * max(MIN_BATCH_SIZE, num_left_values), - ); - new_buffer.resize(num_left_values * size_of::()); + // create an empty buffer, as it will be resized below + let mut new_buffer = MutableBuffer::new(0); + let num_bytes = num_left_values * size_of::(); + let new_len = self.num_values * size_of::(); + + new_buffer.resize(num_bytes); - let mut new_def_levels = FatPtr::::with_offset(&mut new_buffer, 0); - let new_def_levels = new_def_levels.to_slice_mut(); - let left_def_levels = - FatPtr::::with_offset(def_levels_buf, self.num_values); - let left_def_levels = left_def_levels.to_slice(); + let new_def_levels = new_buffer.data_mut(); + let left_def_levels = &def_levels_buf.data_mut()[new_len..]; - new_def_levels[0..num_left_values] - .copy_from_slice(&left_def_levels[0..num_left_values]); + new_def_levels[0..num_bytes].copy_from_slice(&left_def_levels[0..num_bytes]); - def_levels_buf.resize(self.num_values * size_of::()); + def_levels_buf.resize(new_len); Some(new_buffer) } else { None @@ -228,21 +183,19 @@ impl RecordReader { // TODO: Optimize to reduce the copy let new_buffer = if let Some(ref mut rep_levels_buf) = &mut self.rep_levels { let num_left_values = self.values_written - self.num_values; - let mut new_buffer = MutableBuffer::new( - size_of::() * max(MIN_BATCH_SIZE, num_left_values), - ); - new_buffer.resize(num_left_values * size_of::()); + // create an empty buffer, as it will be resized below + let mut new_buffer = MutableBuffer::new(0); + let num_bytes = num_left_values * size_of::(); + let new_len = self.num_values * size_of::(); - let mut new_rep_levels = FatPtr::::with_offset(&mut new_buffer, 0); - let new_rep_levels = new_rep_levels.to_slice_mut(); - let left_rep_levels = - FatPtr::::with_offset(rep_levels_buf, self.num_values); - let left_rep_levels = left_rep_levels.to_slice(); + new_buffer.resize(num_bytes); - new_rep_levels[0..num_left_values] - .copy_from_slice(&left_rep_levels[0..num_left_values]); + let new_rep_levels = new_buffer.data_mut(); + let left_rep_levels = &rep_levels_buf.data_mut()[new_len..]; - rep_levels_buf.resize(self.num_values * size_of::()); + new_rep_levels[0..num_bytes].copy_from_slice(&left_rep_levels[0..num_bytes]); + + rep_levels_buf.resize(new_len); Some(new_buffer) } else { @@ -257,24 +210,19 @@ impl RecordReader { pub fn consume_record_data(&mut self) -> Result { // TODO: Optimize to reduce the copy let num_left_values = self.values_written - self.num_values; - let mut new_buffer = MutableBuffer::new(max(MIN_BATCH_SIZE, num_left_values)); - new_buffer.resize(num_left_values * T::get_type_size()); - - let mut new_records = - FatPtr::::with_offset_and_size(&mut new_buffer, 0, T::get_type_size()); - let new_records = new_records.to_slice_mut(); - let mut left_records = FatPtr::::with_offset_and_size( - &mut self.records, - self.num_values, - T::get_type_size(), - ); - let left_records = left_records.to_slice_mut(); + // create an empty buffer, as it will be resized below + let mut new_buffer = MutableBuffer::new(0); + let num_bytes = num_left_values * T::get_type_size(); + let new_len = self.num_values * T::get_type_size(); - for idx in 0..num_left_values { - swap(&mut new_records[idx], &mut left_records[idx]); - } + new_buffer.resize(num_bytes); + + let new_records = new_buffer.data_mut(); + let left_records = &mut self.records.data_mut()[new_len..]; - self.records.resize(self.num_values * T::get_type_size()); + new_records[0..num_bytes].copy_from_slice(&left_records[0..num_bytes]); + + self.records.resize(new_len); Ok(replace(&mut self.records, new_buffer).freeze()) } @@ -331,70 +279,71 @@ impl RecordReader { fn read_one_batch(&mut self, batch_size: usize) -> Result { // Reserve spaces self.records - .reserve(self.records.len() + batch_size * T::get_type_size()); + .resize(self.records.len() + batch_size * T::get_type_size()); if let Some(ref mut buf) = self.rep_levels { - buf.reserve(buf.len() + batch_size * size_of::()); + buf.resize(buf.len() + batch_size * size_of::()); } if let Some(ref mut buf) = self.def_levels { - buf.reserve(buf.len() + batch_size * size_of::()); + buf.resize(buf.len() + batch_size * size_of::()); } - // Convert mutable buffer spaces to mutable slices - let mut values_buf = FatPtr::::with_offset_and_size( - &mut self.records, - self.values_written, - T::get_type_size(), - ); - let values_written = self.values_written; - let mut def_levels_buf = self - .def_levels - .as_mut() - .map(|buf| FatPtr::::with_offset(buf, values_written)); - let mut rep_levels_buf = self - .rep_levels + // Convert mutable buffer spaces to mutable slices + let (prefix, values, suffix) = + unsafe { self.records.data_mut().align_to_mut::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + let values = &mut values[values_written..]; + + let def_levels = self.def_levels.as_mut().map(|buf| { + let (prefix, def_levels, suffix) = + unsafe { buf.data_mut().align_to_mut::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + &mut def_levels[values_written..] + }); + + let rep_levels = self.rep_levels.as_mut().map(|buf| { + let (prefix, rep_levels, suffix) = + unsafe { buf.data_mut().align_to_mut::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + &mut rep_levels[values_written..] + }); + + let (values_read, levels_read) = self + .column_reader .as_mut() - .map(|buf| FatPtr::::with_offset(buf, values_written)); + .unwrap() + .read_batch(batch_size, def_levels, rep_levels, values)?; - let (values_read, levels_read) = - self.column_reader.as_mut().unwrap().read_batch( - batch_size, - def_levels_buf.as_mut().map(|ptr| ptr.to_slice_mut()), - rep_levels_buf.as_mut().map(|ptr| ptr.to_slice_mut()), - values_buf.to_slice_mut(), - )?; + // get new references for the def levels. + let def_levels = self.def_levels.as_ref().map(|buf| { + let (prefix, def_levels, suffix) = unsafe { buf.data().align_to::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + &def_levels[values_written..] + }); let max_def_level = self.column_desc.max_def_level(); if values_read < levels_read { - // This means that there are null values in column data - // TODO: Move this into ColumnReader - - let values_buf = values_buf.to_slice_mut(); - - let def_levels_buf = def_levels_buf - .as_mut() - .map(|ptr| ptr.to_slice_mut()) - .ok_or_else(|| { - general_err!( - "Definition levels should exist when data is less than levels!" - ) - })?; + let def_levels = def_levels.ok_or_else(|| { + general_err!( + "Definition levels should exist when data is less than levels!" + ) + })?; // Fill spaces in column data with default values let mut values_pos = values_read; let mut level_pos = levels_read; while level_pos > values_pos { - if def_levels_buf[level_pos - 1] == max_def_level { + if def_levels[level_pos - 1] == max_def_level { // This values is not empty // We use swap rather than assign here because T::T doesn't // implement Copy - values_buf.swap(level_pos - 1, values_pos - 1); + values.swap(level_pos - 1, values_pos - 1); values_pos -= 1; } else { - values_buf[level_pos - 1] = T::T::default(); + values[level_pos - 1] = T::T::default(); } level_pos -= 1; @@ -403,16 +352,13 @@ impl RecordReader { // Fill in bitmap data if let Some(null_buffer) = self.null_bitmap.as_mut() { - let def_levels_buf = def_levels_buf - .as_mut() - .map(|ptr| ptr.to_slice_mut()) - .ok_or_else(|| { - general_err!( - "Definition levels should exist when data is less than levels!" - ) - })?; + let def_levels = def_levels.ok_or_else(|| { + general_err!( + "Definition levels should exist when data is less than levels!" + ) + })?; (0..levels_read).try_for_each(|idx| { - null_buffer.append(def_levels_buf[idx] == max_def_level) + null_buffer.append(def_levels[idx] == max_def_level) })?; } @@ -424,13 +370,13 @@ impl RecordReader { /// Split values into records according repetition definition and returns number of /// records read. fn split_records(&mut self, records_to_read: usize) -> Result { - let rep_levels_buf = self - .rep_levels - .as_mut() - .map(|buf| FatPtr::::with_offset(buf, 0)); - let rep_levels_buf = rep_levels_buf.as_ref().map(|x| x.to_slice()); + let rep_levels = self.rep_levels.as_ref().map(|buf| { + let (prefix, rep_levels, suffix) = unsafe { buf.data().align_to::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + rep_levels + }); - match rep_levels_buf { + match rep_levels { Some(buf) => { let mut records_read = 0;