Skip to content

Commit

Permalink
fix: Don't load Parquet nested metadata (#18183)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Aug 14, 2024
1 parent 9b4bd7f commit f92f147
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 67 deletions.
9 changes: 5 additions & 4 deletions crates/polars-io/src/parquet/read/mmap.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use arrow::array::Array;
use arrow::datatypes::Field;
#[cfg(feature = "async")]
use bytes::Bytes;
#[cfg(feature = "async")]
use polars_core::datatypes::PlHashMap;
use polars_error::PolarsResult;
use polars_parquet::read::{
column_iter_to_arrays, get_field_columns, ArrayIter, BasicDecompressor, ColumnChunkMetaData,
Filter, PageReader,
column_iter_to_arrays, get_field_columns, BasicDecompressor, ColumnChunkMetaData, Filter,
PageReader,
};
use polars_utils::mmap::{MemReader, MemSlice};

Expand Down Expand Up @@ -62,11 +63,11 @@ fn _mmap_single_column<'a>(

// similar to arrow2 serializer, except this accepts a slice instead of a vec.
// this allows us to memory map
pub(super) fn to_deserializer<'a>(
pub(super) fn to_deserializer(
columns: Vec<(&ColumnChunkMetaData, MemSlice)>,
field: Field,
filter: Option<Filter>,
) -> PolarsResult<ArrayIter<'a>> {
) -> PolarsResult<Box<dyn Array>> {
let (columns, types): (Vec<_>, Vec<_>) = columns
.into_iter()
.map(|(column_meta, chunk)| {
Expand Down
57 changes: 19 additions & 38 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ use std::borrow::Cow;
use std::collections::VecDeque;
use std::ops::{Deref, Range};

use arrow::array::new_empty_array;
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::datatypes::ArrowSchemaRef;
use polars_core::prelude::*;
use polars_core::utils::{accumulate_dataframes_vertical, split_df};
use polars_core::POOL;
use polars_parquet::read::{self, ArrayIter, FileMetaData, Filter, PhysicalType, RowGroupMetaData};
use polars_parquet::parquet::error::ParquetResult;
use polars_parquet::parquet::statistics::Statistics;
use polars_parquet::read::{self, FileMetaData, Filter, PhysicalType, RowGroupMetaData};
use polars_utils::mmap::MemSlice;
use polars_utils::vec::inplace_zip_filtermap;
use rayon::prelude::*;
Expand Down Expand Up @@ -70,13 +71,25 @@ fn column_idx_to_series(
}

let columns = mmap_columns(store, md.columns(), &field.name);
let iter = mmap::to_deserializer(columns, field.clone(), filter)?;

let mut series = array_iter_to_series(iter, field, None)?;
let stats = columns
.iter()
.map(|(col_md, _)| col_md.statistics().transpose())
.collect::<ParquetResult<Vec<Option<Statistics>>>>();
let array = mmap::to_deserializer(columns, field.clone(), filter)?;
let mut series = Series::try_from((field, array))?;

// We cannot really handle nested metadata at the moment. Just skip it.
use ArrowDataType as AD;
match field.data_type() {
AD::List(_) | AD::LargeList(_) | AD::Struct(_) | AD::FixedSizeList(_, _) => {
return Ok(series)
},
_ => {},
}

// See if we can find some statistics for this series. If we cannot find anything just return
// the series as is.
let Some(Ok(stats)) = md.columns()[column_i].statistics() else {
let Ok(Some(stats)) = stats.map(|mut s| s.pop().flatten()) else {
return Ok(series);
};

Expand Down Expand Up @@ -118,38 +131,6 @@ fn column_idx_to_series(
Ok(series)
}

pub(super) fn array_iter_to_series(
iter: ArrayIter,
field: &ArrowField,
num_rows: Option<usize>,
) -> PolarsResult<Series> {
let mut total_count = 0;
let chunks = match num_rows {
None => iter.collect::<PolarsResult<Vec<_>>>()?,
Some(n) => {
let mut out = Vec::with_capacity(2);

for arr in iter {
let arr = arr?;
let len = arr.len();
out.push(arr);

total_count += len;
if total_count >= n {
break;
}
}
out
},
};
if chunks.is_empty() {
let arr = new_empty_array(field.data_type.clone());
Series::try_from((field, arr))
} else {
Series::try_from((field, chunks))
}
}

#[allow(clippy::too_many_arguments)]
fn rg_to_dfs(
store: &mmap::ColumnStore,
Expand Down
7 changes: 3 additions & 4 deletions crates/polars-parquet/src/arrow/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,12 @@ pub fn n_columns(data_type: &ArrowDataType) -> usize {
/// For nested types, `columns` must be composed by all parquet columns with associated types `types`.
///
/// The arrays are guaranteed to be at most of size `chunk_size` and data type `field.data_type`.
pub fn column_iter_to_arrays<'a>(
pub fn column_iter_to_arrays(
columns: Vec<BasicDecompressor>,
types: Vec<&PrimitiveType>,
field: Field,
filter: Option<Filter>,
) -> PolarsResult<ArrayIter<'a>> {
) -> PolarsResult<Box<dyn Array>> {
let (_, array) = columns_to_iter_recursive(columns, types, field, vec![], filter)?;

Ok(Box::new(std::iter::once(Ok(array))))
Ok(array)
}
4 changes: 0 additions & 4 deletions crates/polars-parquet/src/arrow/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ pub mod statistics;

use std::io::{Read, Seek};

use arrow::array::Array;
use arrow::types::{i256, NativeType};
pub use deserialize::{
column_iter_to_arrays, create_list, create_map, get_page_iterator, init_nested, n_columns,
Expand Down Expand Up @@ -45,9 +44,6 @@ pub use crate::parquet::{
FallibleStreamingIterator,
};

/// Type def for a sharable, boxed dyn [`Iterator`] of arrays
pub type ArrayIter<'a> = Box<dyn Iterator<Item = PolarsResult<Box<dyn Array>>> + Send + Sync + 'a>;

/// Reads parquets' metadata synchronously.
pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> PolarsResult<FileMetaData> {
Ok(_read_metadata(reader)?)
Expand Down
25 changes: 8 additions & 17 deletions crates/polars-parquet/src/arrow/read/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use arrow::record_batch::RecordBatchT;
use polars_error::PolarsResult;
use polars_utils::mmap::MemReader;

use super::{ArrayIter, RowGroupMetaData};
use super::RowGroupMetaData;
use crate::arrow::read::column_iter_to_arrays;
use crate::arrow::read::deserialize::Filter;
use crate::parquet::metadata::ColumnChunkMetaData;
Expand All @@ -23,7 +23,7 @@ use crate::parquet::read::{BasicDecompressor, PageReader};
pub struct RowGroupDeserializer {
num_rows: usize,
remaining_rows: usize,
column_chunks: Vec<ArrayIter<'static>>,
column_chunks: Vec<Box<dyn Array>>,
}

impl RowGroupDeserializer {
Expand All @@ -32,11 +32,7 @@ impl RowGroupDeserializer {
/// # Panic
/// This function panics iff any of the `column_chunks`
/// do not return an array with an equal length.
pub fn new(
column_chunks: Vec<ArrayIter<'static>>,
num_rows: usize,
limit: Option<usize>,
) -> Self {
pub fn new(column_chunks: Vec<Box<dyn Array>>, num_rows: usize, limit: Option<usize>) -> Self {
Self {
num_rows,
remaining_rows: limit.unwrap_or(usize::MAX).min(num_rows),
Expand All @@ -57,12 +53,7 @@ impl Iterator for RowGroupDeserializer {
if self.remaining_rows == 0 {
return None;
}
let chunk = self
.column_chunks
.iter_mut()
.map(|iter| iter.next().unwrap())
.collect::<PolarsResult<Vec<_>>>()
.and_then(RecordBatchT::try_new);
let chunk = RecordBatchT::try_new(std::mem::take(&mut self.column_chunks));
self.remaining_rows = self.remaining_rows.saturating_sub(
chunk
.as_ref()
Expand Down Expand Up @@ -134,11 +125,11 @@ where

/// Converts a vector of columns associated with the parquet field whose name is [`Field`]
/// to an iterator of [`Array`], [`ArrayIter`] of chunk size `chunk_size`.
pub fn to_deserializer<'a>(
pub fn to_deserializer(
columns: Vec<(&ColumnChunkMetaData, Vec<u8>)>,
field: Field,
filter: Option<Filter>,
) -> PolarsResult<ArrayIter<'a>> {
) -> PolarsResult<Box<dyn Array>> {
let (columns, types): (Vec<_>, Vec<_>) = columns
.into_iter()
.map(|(column_meta, chunk)| {
Expand Down Expand Up @@ -169,12 +160,12 @@ pub fn to_deserializer<'a>(
/// This operation is single-threaded. For readers with stronger invariants
/// (e.g. implement [`Clone`]) you can use [`read_columns`] to read multiple columns at once
/// and convert them to [`ArrayIter`] via [`to_deserializer`].
pub fn read_columns_many<'a, R: Read + Seek>(
pub fn read_columns_many<R: Read + Seek>(
reader: &mut R,
row_group: &RowGroupMetaData,
fields: Vec<Field>,
filter: Option<Filter>,
) -> PolarsResult<Vec<ArrayIter<'a>>> {
) -> PolarsResult<Vec<Box<dyn Array>>> {
// reads all the necessary columns for all fields from the row group
// This operation is IO-bounded `O(C)` where C is the number of columns in the row group
let field_columns = fields
Expand Down
1 change: 1 addition & 0 deletions crates/polars-parquet/src/arrow/read/statistics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ fn push(
.as_mut_any()
.downcast_mut::<struct_::DynMutableStructArray>()
.unwrap();

return min
.inner
.iter_mut()
Expand Down

0 comments on commit f92f147

Please sign in to comment.