Skip to content

Commit

Permalink
feat: support reading and writingStringView and BinaryView in par…
Browse files Browse the repository at this point in the history
…quet (part 1) (#5618)

* support string and binary view read write parquet

* read to view type using offset_buffer

* add bench codes for view type read write

* using view_buffer instead of offset_buffer for view type parquet reader

* Revert "using view_buffer instead of offset_buffer for view type parquet reader"

This reverts commit d286521.

---------

Co-authored-by: Yijun Zhao <ariesdevil77@gmail.com>
  • Loading branch information
alamb and ariesdevil authored Apr 9, 2024
1 parent 2c4b321 commit 91f0b17
Show file tree
Hide file tree
Showing 14 changed files with 534 additions and 47 deletions.
55 changes: 54 additions & 1 deletion arrow-array/src/array/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,18 @@ impl<T: ByteViewType + ?Sized> From<GenericByteViewArray<T>> for ArrayData {
}
}

impl<'a, Ptr, T> FromIterator<&'a Option<Ptr>> for GenericByteViewArray<T>
where
Ptr: AsRef<T::Native> + 'a,
T: ByteViewType + ?Sized,
{
fn from_iter<I: IntoIterator<Item = &'a Option<Ptr>>>(iter: I) -> Self {
iter.into_iter()
.map(|o| o.as_ref().map(|p| p.as_ref()))
.collect()
}
}

impl<Ptr, T: ByteViewType + ?Sized> FromIterator<Option<Ptr>> for GenericByteViewArray<T>
where
Ptr: AsRef<T::Native>,
Expand All @@ -400,7 +412,23 @@ where
/// ```
pub type BinaryViewArray = GenericByteViewArray<BinaryViewType>;

/// A [`GenericByteViewArray`] that stores uf8 data
impl BinaryViewArray {
/// Convert the [`BinaryViewArray`] to [`StringViewArray`]
/// If items not utf8 data, validate will fail and error returned.
pub fn to_string_view(self) -> Result<StringViewArray, ArrowError> {
StringViewType::validate(self.views(), self.data_buffers())?;
unsafe { Ok(self.to_string_view_unchecked()) }
}

/// Convert the [`BinaryViewArray`] to [`StringViewArray`]
/// # Safety
/// Caller is responsible for ensuring that items in array are utf8 data.
pub unsafe fn to_string_view_unchecked(self) -> StringViewArray {
StringViewArray::new_unchecked(self.views, self.buffers, self.nulls)
}
}

/// A [`GenericByteViewArray`] that stores utf8 data
///
/// # Example
/// ```
Expand All @@ -411,12 +439,37 @@ pub type BinaryViewArray = GenericByteViewArray<BinaryViewType>;
/// ```
pub type StringViewArray = GenericByteViewArray<StringViewType>;

impl StringViewArray {
/// Convert the [`StringViewArray`] to [`BinaryViewArray`]
pub fn to_binary_view(self) -> BinaryViewArray {
unsafe { BinaryViewArray::new_unchecked(self.views, self.buffers, self.nulls) }
}
}

impl From<Vec<&str>> for StringViewArray {
fn from(v: Vec<&str>) -> Self {
Self::from_iter_values(v)
}
}

impl From<Vec<Option<&str>>> for StringViewArray {
fn from(v: Vec<Option<&str>>) -> Self {
v.into_iter().collect()
}
}

impl From<Vec<String>> for StringViewArray {
fn from(v: Vec<String>) -> Self {
Self::from_iter_values(v)
}
}

impl From<Vec<Option<String>>> for StringViewArray {
fn from(v: Vec<Option<String>>) -> Self {
v.into_iter().collect()
}
}

#[cfg(test)]
mod tests {
use crate::builder::StringViewBuilder;
Expand Down
36 changes: 36 additions & 0 deletions arrow/src/util/bench_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,42 @@ pub fn create_string_array_with_len<Offset: OffsetSizeTrait>(
.collect()
}

/// Creates a random (but fixed-seeded) array of a given size, null density and length
pub fn create_string_view_array_with_len(
size: usize,
null_density: f32,
str_len: usize,
mixed: bool,
) -> StringViewArray {
let rng = &mut seedable_rng();

let mut lengths = Vec::with_capacity(size);

// if mixed, we creates first half that string length small than 12 bytes and second half large than 12 bytes
if mixed {
for _ in 0..size / 2 {
lengths.push(rng.gen_range(1..12));
}
for _ in size / 2..size {
lengths.push(rng.gen_range(12..=std::cmp::max(30, str_len)));
}
} else {
lengths.resize(size, str_len);
}

lengths
.into_iter()
.map(|len| {
if rng.gen::<f32>() < null_density {
None
} else {
let value: Vec<u8> = rng.sample_iter(&Alphanumeric).take(len).collect();
Some(String::from_utf8(value).unwrap())
}
})
.collect()
}

/// Creates an random (but fixed-seeded) array of a given size and null density
/// consisting of random 4 character alphanumeric strings
pub fn create_string_dict_array<K: ArrowDictionaryKeyType>(
Expand Down
9 changes: 9 additions & 0 deletions arrow/src/util/data_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,18 @@ pub fn create_random_array(
},
Utf8 => Arc::new(create_string_array::<i32>(size, null_density)),
LargeUtf8 => Arc::new(create_string_array::<i64>(size, null_density)),
Utf8View => Arc::new(create_string_view_array_with_len(
size,
null_density,
4,
false,
)),
Binary => Arc::new(create_binary_array::<i32>(size, null_density)),
LargeBinary => Arc::new(create_binary_array::<i64>(size, null_density)),
FixedSizeBinary(len) => Arc::new(create_fsb_array(size, null_density, *len as usize)),
BinaryView => Arc::new(
create_string_view_array_with_len(size, null_density, 4, false).to_binary_view(),
),
List(_) => create_random_list_array(field, size, null_density, true_density)?,
LargeList(_) => create_random_list_array(field, size, null_density, true_density)?,
Struct(fields) => Arc::new(StructArray::try_from(
Expand Down
99 changes: 98 additions & 1 deletion parquet/benches/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use criterion::{criterion_group, criterion_main, BenchmarkGroup, Criterion};
use num::FromPrimitive;
use num_bigint::BigInt;
use parquet::arrow::array_reader::{
make_byte_array_reader, make_fixed_len_byte_array_reader, ListArrayReader,
make_byte_array_reader, make_byte_view_array_reader, make_fixed_len_byte_array_reader,
ListArrayReader,
};
use parquet::basic::Type;
use parquet::data_type::{ByteArray, FixedLenByteArrayType};
Expand Down Expand Up @@ -502,6 +503,13 @@ fn create_string_byte_array_reader(
make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
}

fn create_string_view_byte_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
make_byte_view_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
}

fn create_string_byte_array_dictionary_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
Expand Down Expand Up @@ -993,6 +1001,95 @@ fn add_benches(c: &mut Criterion) {

group.finish();

// string view benchmarks
//==============================

let mut group = c.benchmark_group("arrow_array_reader/StringViewArray");

// string, plain encoded, no NULLs
let plain_string_no_null_data =
build_plain_encoded_string_page_iterator(mandatory_string_column_desc.clone(), 0.0);
group.bench_function("plain encoded, mandatory, no NULLs", |b| {
b.iter(|| {
let array_reader = create_string_view_byte_array_reader(
plain_string_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

let plain_string_no_null_data =
build_plain_encoded_string_page_iterator(optional_string_column_desc.clone(), 0.0);
group.bench_function("plain encoded, optional, no NULLs", |b| {
b.iter(|| {
let array_reader = create_string_view_byte_array_reader(
plain_string_no_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

// string, plain encoded, half NULLs
let plain_string_half_null_data =
build_plain_encoded_string_page_iterator(optional_string_column_desc.clone(), 0.5);
group.bench_function("plain encoded, optional, half NULLs", |b| {
b.iter(|| {
let array_reader = create_string_view_byte_array_reader(
plain_string_half_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

// string, dictionary encoded, no NULLs
let dictionary_string_no_null_data =
build_dictionary_encoded_string_page_iterator(mandatory_string_column_desc.clone(), 0.0);
group.bench_function("dictionary encoded, mandatory, no NULLs", |b| {
b.iter(|| {
let array_reader = create_string_view_byte_array_reader(
dictionary_string_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

let dictionary_string_no_null_data =
build_dictionary_encoded_string_page_iterator(optional_string_column_desc.clone(), 0.0);
group.bench_function("dictionary encoded, optional, no NULLs", |b| {
b.iter(|| {
let array_reader = create_string_view_byte_array_reader(
dictionary_string_no_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

// string, dictionary encoded, half NULLs
let dictionary_string_half_null_data =
build_dictionary_encoded_string_page_iterator(optional_string_column_desc.clone(), 0.5);
group.bench_function("dictionary encoded, optional, half NULLs", |b| {
b.iter(|| {
let array_reader = create_string_view_byte_array_reader(
dictionary_string_half_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

group.finish();

// list benchmarks
//==============================

Expand Down
34 changes: 34 additions & 0 deletions parquet/benches/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,24 @@ fn create_string_bench_batch(
)?)
}

fn create_string_and_binary_view_bench_batch(
size: usize,
null_density: f32,
true_density: f32,
) -> Result<RecordBatch> {
let fields = vec![
Field::new("_1", DataType::Utf8View, true),
Field::new("_2", DataType::BinaryView, true),
];
let schema = Schema::new(fields);
Ok(create_random_batch(
Arc::new(schema),
size,
null_density,
true_density,
)?)
}

fn create_string_dictionary_bench_batch(
size: usize,
null_density: f32,
Expand Down Expand Up @@ -395,6 +413,22 @@ fn bench_primitive_writer(c: &mut Criterion) {
b.iter(|| write_batch_enable_bloom_filter(&batch).unwrap())
});

let batch = create_string_and_binary_view_bench_batch(4096, 0.25, 0.75).unwrap();
group.throughput(Throughput::Bytes(
batch
.columns()
.iter()
.map(|f| f.get_array_memory_size() as u64)
.sum(),
));
group.bench_function("4096 values string", |b| {
b.iter(|| write_batch(&batch).unwrap())
});

group.bench_function("4096 values string with bloom filter", |b| {
b.iter(|| write_batch_enable_bloom_filter(&batch).unwrap())
});

let batch = create_string_dictionary_bench_batch(4096, 0.25, 0.75).unwrap();
group.throughput(Throughput::Bytes(
batch
Expand Down
28 changes: 12 additions & 16 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;

use arrow_schema::{DataType, Fields, SchemaBuilder};

use crate::arrow::array_reader::byte_array::make_byte_view_array_reader;
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
use crate::arrow::array_reader::{
Expand All @@ -29,9 +30,7 @@ use crate::arrow::array_reader::{
use crate::arrow::schema::{ParquetField, ParquetFieldType};
use crate::arrow::ProjectionMask;
use crate::basic::Type as PhysicalType;
use crate::data_type::{
BoolType, DoubleType, FloatType, Int32Type, Int64Type, Int96Type,
};
use crate::data_type::{BoolType, DoubleType, FloatType, Int32Type, Int64Type, Int96Type};
use crate::errors::{ParquetError, Result};
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};

Expand All @@ -55,17 +54,13 @@ fn build_reader(
row_groups: &dyn RowGroups,
) -> Result<Option<Box<dyn ArrayReader>>> {
match field.field_type {
ParquetFieldType::Primitive { .. } => {
build_primitive_reader(field, mask, row_groups)
}
ParquetFieldType::Primitive { .. } => build_primitive_reader(field, mask, row_groups),
ParquetFieldType::Group { .. } => match &field.arrow_type {
DataType::Map(_, _) => build_map_reader(field, mask, row_groups),
DataType::Struct(_) => build_struct_reader(field, mask, row_groups),
DataType::List(_) => build_list_reader(field, mask, false, row_groups),
DataType::LargeList(_) => build_list_reader(field, mask, true, row_groups),
DataType::FixedSizeList(_, _) => {
build_fixed_size_list_reader(field, mask, row_groups)
}
DataType::FixedSizeList(_, _) => build_fixed_size_list_reader(field, mask, row_groups),
d => unimplemented!("reading group type {} not implemented", d),
},
}
Expand Down Expand Up @@ -140,9 +135,9 @@ fn build_list_reader(
DataType::List(f) => {
DataType::List(Arc::new(f.as_ref().clone().with_data_type(item_type)))
}
DataType::LargeList(f) => DataType::LargeList(Arc::new(
f.as_ref().clone().with_data_type(item_type),
)),
DataType::LargeList(f) => {
DataType::LargeList(Arc::new(f.as_ref().clone().with_data_type(item_type)))
}
_ => unreachable!(),
};

Expand Down Expand Up @@ -289,6 +284,9 @@ fn build_primitive_reader(
Some(DataType::Dictionary(_, _)) => {
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
}
Some(DataType::Utf8View | DataType::BinaryView) => {
make_byte_view_array_reader(page_iterator, column_desc, arrow_type)?
}
_ => make_byte_array_reader(page_iterator, column_desc, arrow_type)?,
},
PhysicalType::FIXED_LEN_BYTE_ARRAY => {
Expand Down Expand Up @@ -347,8 +345,7 @@ mod tests {
#[test]
fn test_create_array_reader() {
let file = get_test_file("nulls.snappy.parquet");
let file_reader: Arc<dyn FileReader> =
Arc::new(SerializedFileReader::new(file).unwrap());
let file_reader: Arc<dyn FileReader> = Arc::new(SerializedFileReader::new(file).unwrap());

let file_metadata = file_reader.metadata().file_metadata();
let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
Expand All @@ -359,8 +356,7 @@ mod tests {
)
.unwrap();

let array_reader =
build_array_reader(fields.as_ref(), &mask, &file_reader).unwrap();
let array_reader = build_array_reader(fields.as_ref(), &mask, &file_reader).unwrap();

// Create arrow types
let arrow_type = DataType::Struct(Fields::from(vec![Field::new(
Expand Down
Loading

0 comments on commit 91f0b17

Please sign in to comment.