Skip to content

Commit

Permalink
Remove unnecessary null buffer construction when converting arrays to…
Browse files Browse the repository at this point in the history
… a different type (#6244)

* create primitive array from iter and nulls

* clippy

* speed up some more decimals

* add optimizations for byte_stream_split

* decimal256

* Revert "add optimizations for byte_stream_split"

This reverts commit 5d4ae0d.

* add comments
  • Loading branch information
etseidl authored Aug 14, 2024
1 parent 43b29b9 commit 3e5c76f
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 64 deletions.
14 changes: 14 additions & 0 deletions arrow-array/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,20 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
}
}

/// Creates a PrimitiveArray based on an iterator of values with provided nulls
pub fn from_iter_values_with_nulls<I: IntoIterator<Item = T::Native>>(
iter: I,
nulls: Option<NullBuffer>,
) -> Self {
let val_buf: Buffer = iter.into_iter().collect();
let len = val_buf.len() / std::mem::size_of::<T::Native>();
Self {
data_type: T::DATA_TYPE,
values: ScalarBuffer::new(val_buf, 0, len),
nulls,
}
}

/// Creates a PrimitiveArray based on a constant value with `count` elements
pub fn from_value(value: T::Native, count: usize) -> Self {
unsafe {
Expand Down
83 changes: 47 additions & 36 deletions parquet/src/arrow/array_reader/fixed_len_byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::column::reader::decoder::ColumnValueDecoder;
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use arrow_array::{
ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array,
Array, ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array,
IntervalDayTimeArray, IntervalYearMonthArray,
};
use arrow_buffer::{i256, Buffer, IntervalDayTime};
Expand Down Expand Up @@ -165,57 +165,68 @@ impl ArrayReader for FixedLenByteArrayReader {
// TODO: An improvement might be to do this conversion on read
let array: ArrayRef = match &self.data_type {
ArrowType::Decimal128(p, s) => {
let decimal = binary
.iter()
.map(|opt| Some(i128::from_be_bytes(sign_extend_be(opt?))))
.collect::<Decimal128Array>()
// We can simply reuse the null buffer from `binary` rather than recomputing it
// (as was the case when we simply used `collect` to produce the new array).
// The same applies to the transformations below.
let nulls = binary.nulls().cloned();
let decimal = binary.iter().map(|o| match o {
Some(b) => i128::from_be_bytes(sign_extend_be(b)),
None => i128::default(),
});
let decimal = Decimal128Array::from_iter_values_with_nulls(decimal, nulls)
.with_precision_and_scale(*p, *s)?;

Arc::new(decimal)
}
ArrowType::Decimal256(p, s) => {
let decimal = binary
.iter()
.map(|opt| Some(i256::from_be_bytes(sign_extend_be(opt?))))
.collect::<Decimal256Array>()
let nulls = binary.nulls().cloned();
let decimal = binary.iter().map(|o| match o {
Some(b) => i256::from_be_bytes(sign_extend_be(b)),
None => i256::default(),
});
let decimal = Decimal256Array::from_iter_values_with_nulls(decimal, nulls)
.with_precision_and_scale(*p, *s)?;

Arc::new(decimal)
}
ArrowType::Interval(unit) => {
let nulls = binary.nulls().cloned();
// An interval is stored as 3x 32-bit unsigned integers storing months, days,
// and milliseconds
match unit {
IntervalUnit::YearMonth => Arc::new(
binary
.iter()
.map(|o| o.map(|b| i32::from_le_bytes(b[0..4].try_into().unwrap())))
.collect::<IntervalYearMonthArray>(),
) as ArrayRef,
IntervalUnit::DayTime => Arc::new(
binary
.iter()
.map(|o| {
o.map(|b| {
IntervalDayTime::new(
i32::from_le_bytes(b[4..8].try_into().unwrap()),
i32::from_le_bytes(b[8..12].try_into().unwrap()),
)
})
})
.collect::<IntervalDayTimeArray>(),
) as ArrayRef,
IntervalUnit::YearMonth => {
let iter = binary.iter().map(|o| match o {
Some(b) => i32::from_le_bytes(b[0..4].try_into().unwrap()),
None => i32::default(),
});
let interval =
IntervalYearMonthArray::from_iter_values_with_nulls(iter, nulls);
Arc::new(interval) as ArrayRef
}
IntervalUnit::DayTime => {
let iter = binary.iter().map(|o| match o {
Some(b) => IntervalDayTime::new(
i32::from_le_bytes(b[4..8].try_into().unwrap()),
i32::from_le_bytes(b[8..12].try_into().unwrap()),
),
None => IntervalDayTime::default(),
});
let interval =
IntervalDayTimeArray::from_iter_values_with_nulls(iter, nulls);
Arc::new(interval) as ArrayRef
}
IntervalUnit::MonthDayNano => {
return Err(nyi_err!("MonthDayNano intervals not supported"));
}
}
}
ArrowType::Float16 => Arc::new(
binary
.iter()
.map(|o| o.map(|b| f16::from_le_bytes(b[..2].try_into().unwrap())))
.collect::<Float16Array>(),
) as ArrayRef,
ArrowType::Float16 => {
let nulls = binary.nulls().cloned();
let f16s = binary.iter().map(|o| match o {
Some(b) => f16::from_le_bytes(b[..2].try_into().unwrap()),
None => f16::default(),
});
let f16s = Float16Array::from_iter_values_with_nulls(f16s, nulls);
Arc::new(f16s) as ArrayRef
}
_ => Arc::new(binary) as ArrayRef,
};

Expand Down
82 changes: 54 additions & 28 deletions parquet/src/arrow/array_reader/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,22 +217,35 @@ where
arrow_cast::cast(&a, target_type)?
}
ArrowType::Decimal128(p, s) => {
// We can simply reuse the null buffer from `array` rather than recomputing it
// (as was the case when we simply used `collect` to produce the new array).
let nulls = array.nulls().cloned();
let array = match array.data_type() {
ArrowType::Int32 => array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
.map(|v| v.map(|v| v as i128))
.collect::<Decimal128Array>(),
ArrowType::Int32 => {
let decimal = array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
.map(|v| match v {
Some(i) => i as i128,
None => i128::default(),
});
Decimal128Array::from_iter_values_with_nulls(decimal, nulls)
}

ArrowType::Int64 => array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.iter()
.map(|v| v.map(|v| v as i128))
.collect::<Decimal128Array>(),
ArrowType::Int64 => {
let decimal = array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.iter()
.map(|v| match v {
Some(i) => i as i128,
None => i128::default(),
});
Decimal128Array::from_iter_values_with_nulls(decimal, nulls)
}
_ => {
return Err(arrow_err!(
"Cannot convert {:?} to decimal",
Expand All @@ -245,22 +258,35 @@ where
Arc::new(array) as ArrayRef
}
ArrowType::Decimal256(p, s) => {
// We can simply reuse the null buffer from `array` rather than recomputing it
// (as was the case when we simply used `collect` to produce the new array).
let nulls = array.nulls().cloned();
let array = match array.data_type() {
ArrowType::Int32 => array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
.map(|v| v.map(|v| i256::from_i128(v as i128)))
.collect::<Decimal256Array>(),
ArrowType::Int32 => {
let decimal = array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
.map(|v| match v {
Some(i) => i256::from_i128(i as i128),
None => i256::default(),
});
Decimal256Array::from_iter_values_with_nulls(decimal, nulls)
}

ArrowType::Int64 => array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.iter()
.map(|v| v.map(|v| i256::from_i128(v as i128)))
.collect::<Decimal256Array>(),
ArrowType::Int64 => {
let decimal = array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.iter()
.map(|v| match v {
Some(i) => i256::from_i128(i as i128),
None => i256::default(),
});
Decimal256Array::from_iter_values_with_nulls(decimal, nulls)
}
_ => {
return Err(arrow_err!(
"Cannot convert {:?} to decimal",
Expand Down

0 comments on commit 3e5c76f

Please sign in to comment.