Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of FixedLengthBinary decoding #6220

Merged
merged 3 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions parquet/benches/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use criterion::*;
use half::f16;
use parquet::basic::Encoding;
use parquet::basic::{Encoding, Type as ParquetType};
use parquet::data_type::{
DataType, DoubleType, FixedLenByteArray, FixedLenByteArrayType, FloatType,
};
Expand All @@ -35,7 +35,10 @@ fn bench_typed<T: DataType>(
) {
let name = format!(
"dtype={}, encoding={:?}",
std::any::type_name::<T::T>(),
match T::get_physical_type() {
ParquetType::FIXED_LEN_BYTE_ARRAY => format!("FixedLenByteArray({type_length})"),
_ => std::any::type_name::<T::T>().to_string(),
},
encoding
);
let column_desc_ptr = ColumnDescPtr::new(ColumnDescriptor::new(
Expand Down
21 changes: 18 additions & 3 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,13 @@ pub(crate) mod private {

/// Return the value as an mutable Any to allow for downcasts without transmutation
fn as_mut_any(&mut self) -> &mut dyn std::any::Any;

/// Sets the value of this object from the provided [`Bytes`]
///
/// Only implemented for `ByteArray` and `FixedLenByteArray`. Will panic for other types.
fn set_from_bytes(&mut self, _data: Bytes) {
unimplemented!();
}
}

impl ParquetValueType for bool {
Expand Down Expand Up @@ -953,9 +960,7 @@ pub(crate) mod private {
return Err(eof_err!("Not enough bytes to decode"));
}

let val: &mut Self = val_array.as_mut_any().downcast_mut().unwrap();

val.set_data(data.slice(decoder.start..decoder.start + len));
val_array.set_data(data.slice(decoder.start..decoder.start + len));
decoder.start += len;
}
decoder.num_values -= num_values;
Expand Down Expand Up @@ -998,6 +1003,11 @@ pub(crate) mod private {
fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
self
}

#[inline]
fn set_from_bytes(&mut self, data: Bytes) {
self.set_data(data);
}
}

impl HeapSize for super::ByteArray {
Expand Down Expand Up @@ -1093,6 +1103,11 @@ pub(crate) mod private {
fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
self
}

#[inline]
fn set_from_bytes(&mut self, data: Bytes) {
self.set_data(data);
}
}

impl HeapSize for super::FixedLenByteArray {
Expand Down
23 changes: 3 additions & 20 deletions parquet/src/encodings/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -901,11 +901,7 @@ impl<T: DataType> Decoder<T> for DeltaLengthByteArrayDecoder<T> {

for item in buffer.iter_mut().take(num_values) {
let len = self.lengths[self.current_idx] as usize;

item.as_mut_any()
.downcast_mut::<ByteArray>()
.unwrap()
.set_data(data.slice(self.offset..self.offset + len));
item.set_from_bytes(data.slice(self.offset..self.offset + len));

self.offset += len;
self.current_idx += 1;
Expand Down Expand Up @@ -1029,7 +1025,7 @@ impl<T: DataType> Decoder<T> for DeltaByteArrayDecoder<T> {

fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
match T::get_physical_type() {
ty @ Type::BYTE_ARRAY | ty @ Type::FIXED_LEN_BYTE_ARRAY => {
Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
let num_values = cmp::min(buffer.len(), self.num_values);
let mut v: [ByteArray; 1] = [ByteArray::new(); 1];
for item in buffer.iter_mut().take(num_values) {
Expand All @@ -1051,20 +1047,7 @@ impl<T: DataType> Decoder<T> for DeltaByteArrayDecoder<T> {
result.extend_from_slice(suffix);

let data = Bytes::from(result.clone());

match ty {
Type::BYTE_ARRAY => item
.as_mut_any()
.downcast_mut::<ByteArray>()
.unwrap()
.set_data(data),
Type::FIXED_LEN_BYTE_ARRAY => item
.as_mut_any()
.downcast_mut::<FixedLenByteArray>()
.unwrap()
.set_data(data),
_ => unreachable!(),
};
item.set_from_bytes(data);

self.previous_value = result;
self.current_idx += 1;
Expand Down
9 changes: 2 additions & 7 deletions parquet/src/encodings/decoding/byte_stream_split_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use bytes::Bytes;

use crate::basic::{Encoding, Type};
use crate::data_type::private::ParquetValueType;
use crate::data_type::{DataType, FixedLenByteArray, SliceAsBytes};
use crate::data_type::{DataType, SliceAsBytes};
use crate::errors::{ParquetError, Result};

use super::Decoder;
Expand Down Expand Up @@ -234,12 +234,7 @@ impl<T: DataType> Decoder<T> for VariableWidthByteStreamSplitDecoder<T> {
for (i, bi) in buffer.iter_mut().enumerate().take(num_values) {
// Get a view into the data, without also copying the bytes
let data = bytes_with_data.slice(i * type_size..(i + 1) * type_size);
// TODO: perhaps add a `set_from_bytes` method to `DataType` to avoid downcasting
let bi = bi
.as_mut_any()
.downcast_mut::<FixedLenByteArray>()
.expect("Decoding fixed length byte array");
bi.set_data(data);
bi.set_from_bytes(data);
}

Ok(num_values)
Expand Down
Loading