Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ParquetMetaDataBuilder #6466

Merged
merged 7 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1556,13 +1556,16 @@ mod tests {
index_reader::read_offset_indexes(&data, metadata.row_group(0).columns())
.expect("reading offset index");

let row_group_meta = metadata.row_group(0).clone();
let metadata = ParquetMetaData::new_with_page_index(
metadata.file_metadata().clone(),
vec![row_group_meta],
None,
Some(vec![offset_index.clone()]),
);
let mut metadata_builder = metadata.into_builder();
let mut row_groups = metadata_builder.take_row_groups();
row_groups.truncate(1);
let row_group_meta = row_groups.pop().unwrap();

let metadata = metadata_builder
.add_row_group(row_group_meta)
.set_column_index(None)
.set_offset_index(Some(vec![offset_index.clone()]))
.build();

let metadata = Arc::new(metadata);

Expand Down
173 changes: 159 additions & 14 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,16 @@ pub type ParquetOffsetIndex = Vec<Vec<OffsetIndexMetaData>>;
/// defined by [`parquet.thrift`].
///
/// # Overview
/// The fields of this structure are:
/// * [`FileMetaData`]: Information about the overall file (such as the schema) (See [`Self::file_metadata`])
/// * [`RowGroupMetaData`]: Information about each Row Group (see [`Self::row_groups`])
/// * [`ParquetColumnIndex`] and [`ParquetOffsetIndex`]: Optional "Page Index" structures (see [`Self::column_index`] and [`Self::offset_index`])
///
/// This structure is read by the various readers in this crate or can be read
/// directly from a file using the [`ParquetMetaDataReader`] struct.
///
/// See the [`ParquetMetaDataBuilder`] to create and modify this structure.
///
/// [`parquet.thrift`]: https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift
#[derive(Debug, Clone, PartialEq)]
pub struct ParquetMetaData {
Expand Down Expand Up @@ -190,18 +193,23 @@ impl ParquetMetaData {

/// Creates Parquet metadata from file metadata, a list of row
/// group metadata, and the column index structures.
#[deprecated(note = "Use ParquetMetaDataBuilder")]
pub fn new_with_page_index(
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
column_index: Option<ParquetColumnIndex>,
offset_index: Option<ParquetOffsetIndex>,
) -> Self {
ParquetMetaData {
file_metadata,
row_groups,
column_index,
offset_index,
}
ParquetMetaDataBuilder::new(file_metadata)
.set_row_groups(row_groups)
.set_column_index(column_index)
.set_offset_index(offset_index)
.build()
}

/// Convert this ParquetMetaData into a [`ParquetMetaDataBuilder`]
pub fn into_builder(self) -> ParquetMetaDataBuilder {
self.into()
}

/// Returns file metadata as reference.
Expand Down Expand Up @@ -290,6 +298,127 @@ impl ParquetMetaData {
}
}

/// A builder for creating / manipulating [`ParquetMetaData`]
///
/// # Example creating a new [`ParquetMetaData`]
///
///```no_run
/// # use parquet::file::metadata::{FileMetaData, ParquetMetaData, ParquetMetaDataBuilder, RowGroupMetaData, RowGroupMetaDataBuilder};
/// # fn get_file_metadata() -> FileMetaData { unimplemented!(); }
/// // Create a new builder given the file metadata
/// let file_metadata = get_file_metadata();
/// // Create a row group
/// let row_group = RowGroupMetaData::builder(file_metadata.schema_descr_ptr())
/// .set_num_rows(100)
/// // ... (A real row group needs more than just the number of rows)
/// .build()
/// .unwrap();
/// // Create the final metadata
/// let metadata: ParquetMetaData = ParquetMetaDataBuilder::new(file_metadata)
/// .add_row_group(row_group)
/// .build();
/// ```
///
/// # Example modifying an existing [`ParquetMetaData`]
/// ```no_run
/// # use parquet::file::metadata::ParquetMetaData;
/// # fn load_metadata() -> ParquetMetaData { unimplemented!(); }
/// // Modify the metadata so only the last RowGroup remains
/// let metadata: ParquetMetaData = load_metadata();
/// let mut builder = metadata.into_builder();
///
/// // Take existing row groups to modify
/// let mut row_groups = builder.take_row_groups();
/// let last_row_group = row_groups.pop().unwrap();
///
/// let metadata = builder
/// .add_row_group(last_row_group)
/// .build();
/// ```
pub struct ParquetMetaDataBuilder(ParquetMetaData);

impl ParquetMetaDataBuilder {
/// Create a new builder from a file metadata, with no row groups
pub fn new(file_meta_data: FileMetaData) -> Self {
Self(ParquetMetaData::new(file_meta_data, vec![]))
}

/// Create a new builder from an existing ParquetMetaData
pub fn new_from_metadata(metadata: ParquetMetaData) -> Self {
Self(metadata)
}

/// Adds a row group to the metadata
pub fn add_row_group(mut self, row_group: RowGroupMetaData) -> Self {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.0.row_groups.push(row_group);
self
}

/// Sets all the row groups to the specified list
pub fn set_row_groups(mut self, row_groups: Vec<RowGroupMetaData>) -> Self {
self.0.row_groups = row_groups;
self
}

/// Takes ownership of the row groups in this builder, and clears the list
/// of row groups.
///
/// This can be used for more efficient creation of a new ParquetMetaData
/// from an existing one.
pub fn take_row_groups(&mut self) -> Vec<RowGroupMetaData> {
std::mem::take(&mut self.0.row_groups)
}

/// Return a reference to the current row groups
pub fn row_groups(&self) -> &[RowGroupMetaData] {
alamb marked this conversation as resolved.
Show resolved Hide resolved
&self.0.row_groups
}

/// Sets the column index
pub fn set_column_index(mut self, column_index: Option<ParquetColumnIndex>) -> Self {
self.0.column_index = column_index;
self
}

/// Returns the current column index from the builder, replacing it with `None`
pub fn take_column_index(&mut self) -> Option<ParquetColumnIndex> {
std::mem::take(&mut self.0.column_index)
}

/// Return a reference to the current column index, if any
pub fn column_index(&self) -> Option<&ParquetColumnIndex> {
self.0.column_index.as_ref()
}

/// Sets the offset index
pub fn set_offset_index(mut self, offset_index: Option<ParquetOffsetIndex>) -> Self {
self.0.offset_index = offset_index;
self
}

/// Returns the current offset index from the builder, replacing it with `None`
pub fn take_offset_index(&mut self) -> Option<ParquetOffsetIndex> {
std::mem::take(&mut self.0.offset_index)
}

/// Return a reference to the current offset index, if any
pub fn offset_index(&self) -> Option<&ParquetOffsetIndex> {
self.0.offset_index.as_ref()
}

/// Creates a new ParquetMetaData from the builder
pub fn build(self) -> ParquetMetaData {
let Self(metadata) = self;
metadata
}
}

impl From<ParquetMetaData> for ParquetMetaDataBuilder {
fn from(meta_data: ParquetMetaData) -> Self {
Self(meta_data)
}
}

pub type KeyValue = crate::format::KeyValue;

/// Reference counted pointer for [`FileMetaData`].
Expand Down Expand Up @@ -566,12 +695,27 @@ impl RowGroupMetaDataBuilder {
self
}

/// Takes ownership of the the column metadata in this builder, and clears
/// the list of columns.
///
/// This can be used for more efficient creation of a new RowGroupMetaData
/// from an existing one.
pub fn take_columns(&mut self) -> Vec<ColumnChunkMetaData> {
std::mem::take(&mut self.0.columns)
}

/// Sets column metadata for this row group.
pub fn set_column_metadata(mut self, value: Vec<ColumnChunkMetaData>) -> Self {
self.0.columns = value;
self
}

/// Adds a column metadata to this row group
pub fn add_column_metadata(mut self, value: ColumnChunkMetaData) -> Self {
self.0.columns.push(value);
self
}

/// Sets ordinal for this row group.
pub fn set_ordinal(mut self, value: i16) -> Self {
self.0.ordinal = Some(value);
Expand Down Expand Up @@ -1672,7 +1816,9 @@ mod tests {
.unwrap();
let row_group_meta_with_stats = vec![row_group_meta_with_stats];

let parquet_meta = ParquetMetaData::new(file_metadata.clone(), row_group_meta_with_stats);
let parquet_meta = ParquetMetaDataBuilder::new(file_metadata.clone())
.set_row_groups(row_group_meta_with_stats)
.build();
let base_expected_size = 2312;

assert_eq!(parquet_meta.memory_size(), base_expected_size);
Expand All @@ -1692,14 +1838,13 @@ mod tests {
offset_index.append_unencoded_byte_array_data_bytes(Some(10));
let offset_index = offset_index.build_to_thrift();

let parquet_meta = ParquetMetaData::new_with_page_index(
file_metadata,
row_group_meta,
Some(vec![vec![Index::BOOLEAN(native_index)]]),
Some(vec![vec![
let parquet_meta = ParquetMetaDataBuilder::new(file_metadata)
.set_row_groups(row_group_meta)
.set_column_index(Some(vec![vec![Index::BOOLEAN(native_index)]]))
.set_offset_index(Some(vec![vec![
OffsetIndexMetaData::try_new(offset_index).unwrap()
]]),
);
]]))
.build();

let bigger_expected_size = 2816;
// more set fields means more memory usage
Expand Down
43 changes: 17 additions & 26 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,13 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
/// Creates file reader from a Parquet file with read options.
/// Returns error if Parquet file does not exist or is corrupt.
pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
let mut metadata_builder = ParquetMetaDataReader::new()
.parse_and_finish(&chunk_reader)?
.into_builder();
let mut predicates = options.predicates;
let row_groups = metadata.row_groups().to_vec();
let mut filtered_row_groups = Vec::<RowGroupMetaData>::new();
for (i, rg_meta) in row_groups.into_iter().enumerate() {

// Filter row groups based on the predicates
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the cleanup of this code (which is modifying the ParquetMetaData) is the best example of why having this API makes sense -- it makes one fewer copies and also I think is quite a bit clearer

for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
let mut keep = true;
for predicate in &mut predicates {
if !predicate(&rg_meta, i) {
Expand All @@ -204,41 +206,30 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
}
}
if keep {
filtered_row_groups.push(rg_meta);
metadata_builder = metadata_builder.add_row_group(rg_meta);
}
}

if options.enable_page_index {
let mut columns_indexes = vec![];
let mut offset_indexes = vec![];

for rg in &mut filtered_row_groups {
for rg in metadata_builder.row_groups().iter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can build the metadata here (with the filtered row groups), pass it into ParquetMetaDataReader and then load the page indexes into the metadata. Let me give that a try.

Copy link
Contributor

@etseidl etseidl Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        if options.enable_page_index {
            let mut reader = ParquetMetaDataReader::new_with_metadata(metadata_builder.build())
                .with_page_indexes(options.enable_page_index);
            reader.read_page_indexes(&chunk_reader)?;
            metadata_builder = ParquetMetaDataBuilder::new_from_metadata(reader.finish()?);
        }

I forgot to do this in #6450.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite follow why this is needed. What scenario does it help (I can write a test to cover it)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean replace

        if options.enable_page_index {
            let mut columns_indexes = vec![];
            let mut offset_indexes = vec![];

            for rg in metadata_builder.row_groups().iter() {
                let column_index = index_reader::read_columns_indexes(&chunk_reader, rg.columns())?;
                let offset_index = index_reader::read_offset_indexes(&chunk_reader, rg.columns())?;
                columns_indexes.push(column_index);
                offset_indexes.push(offset_index);
            }
            metadata_builder = metadata_builder
                .set_column_index(Some(columns_indexes))
                .set_offset_index(Some(offset_indexes));
        }

with the above code snippet from my earlier comment. This should be a bit more efficient since read_page_indexes will fetch the necessary bytes from the file in a single read, rather than 2 reads per row group.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented something slightly different:

Since there is already a ParquetMetaDataReader created at the beginning of the function, I made the change to simply read it when needed.

One thing that might be different is that it looks like the current code may only read the column index/page index for row groups that passed the "predicates" but the ParquetMetadataReader reads the index for all the row groups.

That being said, no tests fail, so i am not sure if it is a real problem or not

Copy link
Contributor

@etseidl etseidl Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this worries me a bit, since the column and offset indexes will have more row groups represented than are in the ParquetMetaData. The split path from before would only read the page indexes for the remaining row groups.

We could prune the page indexes at the same time we're pruning the row groups.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could prune the page indexes at the same time we're pruning the row groups.

Yeah, that is probably mirrors the intent the most closely. How about I'll back out c0432e6 and we can address improving this code as a follow on PR (along with tests)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #6491 and reverted c0432e6

let column_index = index_reader::read_columns_indexes(&chunk_reader, rg.columns())?;
let offset_index = index_reader::read_offset_indexes(&chunk_reader, rg.columns())?;
columns_indexes.push(column_index);
offset_indexes.push(offset_index);
}

Ok(Self {
chunk_reader: Arc::new(chunk_reader),
metadata: Arc::new(ParquetMetaData::new_with_page_index(
metadata.file_metadata().clone(),
filtered_row_groups,
Some(columns_indexes),
Some(offset_indexes),
)),
props: Arc::new(options.props),
})
} else {
Ok(Self {
chunk_reader: Arc::new(chunk_reader),
metadata: Arc::new(ParquetMetaData::new(
metadata.file_metadata().clone(),
filtered_row_groups,
)),
props: Arc::new(options.props),
})
metadata_builder = metadata_builder
.set_column_index(Some(columns_indexes))
.set_offset_index(Some(offset_indexes));
}

Ok(Self {
chunk_reader: Arc::new(chunk_reader),
metadata: Arc::new(metadata_builder.build()),
props: Arc::new(options.props),
})
}
}

Expand Down
Loading