Skip to content

Commit

Permalink
Fmt.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Dec 13, 2020
1 parent 1bdabf6 commit 5ca2fb1
Showing 1 changed file with 41 additions and 53 deletions.
94 changes: 41 additions & 53 deletions rust/parquet/src/arrow/record_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,55 +295,46 @@ impl<T: DataType> RecordReader<T> {
let values_written = self.values_written;

// Convert mutable buffer spaces to mutable slices
let (prefix, values, suffix) = unsafe { self.records.data_mut().align_to_mut::<T::T>() };
let (prefix, values, suffix) =
unsafe { self.records.data_mut().align_to_mut::<T::T>() };
assert!(prefix.is_empty() && suffix.is_empty());
let values = &mut values[values_written..];

let def_levels = self
.def_levels
let def_levels = self.def_levels.as_mut().map(|buf| {
let (prefix, def_levels, suffix) =
unsafe { buf.data_mut().align_to_mut::<i16>() };
assert!(prefix.is_empty() && suffix.is_empty());
&mut def_levels[values_written..]
});

let rep_levels = self.rep_levels.as_mut().map(|buf| {
let (prefix, rep_levels, suffix) =
unsafe { buf.data_mut().align_to_mut::<i16>() };
assert!(prefix.is_empty() && suffix.is_empty());
&mut rep_levels[values_written..]
});

let (values_read, levels_read) = self
.column_reader
.as_mut()
.map(|buf| {
let (prefix, def_levels, suffix) = unsafe { buf.data_mut().align_to_mut::<i16>() };
assert!(prefix.is_empty() && suffix.is_empty());
&mut def_levels[values_written..]
});

let rep_levels = self
.rep_levels
.as_mut()
.map(|buf| {
let (prefix, rep_levels, suffix) = unsafe { buf.data_mut().align_to_mut::<i16>() };
assert!(prefix.is_empty() && suffix.is_empty());
&mut rep_levels[values_written..]
});

let (values_read, levels_read) =
self.column_reader.as_mut().unwrap().read_batch(
batch_size,
def_levels,
rep_levels,
values,
)?;
.unwrap()
.read_batch(batch_size, def_levels, rep_levels, values)?;

// get new references for the def levels.
let def_levels = self
.def_levels
.as_ref()
.map(|buf| {
let (prefix, def_levels, suffix) = unsafe { buf.data().align_to::<i16>() };
assert!(prefix.is_empty() && suffix.is_empty());
&def_levels[values_written..]
});
let def_levels = self.def_levels.as_ref().map(|buf| {
let (prefix, def_levels, suffix) = unsafe { buf.data().align_to::<i16>() };
assert!(prefix.is_empty() && suffix.is_empty());
&def_levels[values_written..]
});

let max_def_level = self.column_desc.max_def_level();

if values_read < levels_read {
let def_levels = def_levels
.ok_or_else(|| {
general_err!(
"Definition levels should exist when data is less than levels!"
)
})?;
let def_levels = def_levels.ok_or_else(|| {
general_err!(
"Definition levels should exist when data is less than levels!"
)
})?;

// Fill spaces in column data with default values
let mut values_pos = values_read;
Expand All @@ -366,12 +357,11 @@ impl<T: DataType> RecordReader<T> {

// Fill in bitmap data
if let Some(null_buffer) = self.null_bitmap.as_mut() {
let def_levels = def_levels
.ok_or_else(|| {
general_err!(
"Definition levels should exist when data is less than levels!"
)
})?;
let def_levels = def_levels.ok_or_else(|| {
general_err!(
"Definition levels should exist when data is less than levels!"
)
})?;
(0..levels_read).try_for_each(|idx| {
null_buffer.append(def_levels[idx] == max_def_level)
})?;
Expand All @@ -385,14 +375,12 @@ impl<T: DataType> RecordReader<T> {
/// Split values into records according repetition definition and returns number of
/// records read.
fn split_records(&mut self, records_to_read: usize) -> Result<usize> {
let rep_levels = self
.rep_levels
.as_mut()
.map(|buf| {
let (prefix, rep_levels, suffix) = unsafe { buf.data_mut().align_to_mut::<i16>() };
assert!(prefix.is_empty() && suffix.is_empty());
rep_levels
});
let rep_levels = self.rep_levels.as_mut().map(|buf| {
let (prefix, rep_levels, suffix) =
unsafe { buf.data_mut().align_to_mut::<i16>() };
assert!(prefix.is_empty() && suffix.is_empty());
rep_levels
});

match rep_levels {
Some(buf) => {
Expand Down

0 comments on commit 5ca2fb1

Please sign in to comment.