Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate read_page_locations() and simplify offset index in ParquetMetaData #6095

Merged
merged 2 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ impl ArrowReaderMetadata {
let offset_index = metadata
.row_groups()
.iter()
.map(|rg| index_reader::read_pages_locations(reader, rg.columns()))
.map(|rg| index_reader::read_offset_indexes(reader, rg.columns()))
.collect::<Result<Vec<_>>>()?;

metadata.set_offset_index(Some(offset_index))
Expand Down Expand Up @@ -689,7 +689,7 @@ impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
// To avoid `i[rg_idx][self.oolumn_idx`] panic, we need to filter out empty `i[rg_idx]`.
let page_locations = offset_index
.filter(|i| !i[rg_idx].is_empty())
.map(|i| i[rg_idx][self.column_idx].clone());
.map(|i| i[rg_idx][self.column_idx].page_locations.clone());
let total_rows = rg.num_rows() as usize;
let reader = self.reader.clone();

Expand Down
14 changes: 10 additions & 4 deletions parquet/src/arrow/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1349,7 +1349,9 @@ impl<'a> StatisticsConverter<'a> {
let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();
let num_data_pages = &column_offset_index[*rg_index][parquet_index]
.page_locations()
.len();

(*num_data_pages, column_page_index_per_row_group_per_column)
});
Expand Down Expand Up @@ -1378,7 +1380,9 @@ impl<'a> StatisticsConverter<'a> {
let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();
let num_data_pages = &column_offset_index[*rg_index][parquet_index]
.page_locations()
.len();

(*num_data_pages, column_page_index_per_row_group_per_column)
});
Expand Down Expand Up @@ -1408,7 +1412,9 @@ impl<'a> StatisticsConverter<'a> {
let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();
let num_data_pages = &column_offset_index[*rg_index][parquet_index]
.page_locations()
.len();

(*num_data_pages, column_page_index_per_row_group_per_column)
});
Expand Down Expand Up @@ -1450,7 +1456,7 @@ impl<'a> StatisticsConverter<'a> {

let mut row_count_total = Vec::new();
for rg_idx in row_group_indices {
let page_locations = &column_offset_index[*rg_idx][parquet_index];
let page_locations = &column_offset_index[*rg_idx][parquet_index].page_locations();

let row_count_per_page = page_locations
.windows(2)
Expand Down
14 changes: 7 additions & 7 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,7 @@ mod tests {
use crate::data_type::AsBytes;
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::read_pages_locations;
use crate::file::page_index::index_reader::read_offset_indexes;
use crate::file::properties::{
BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
};
Expand Down Expand Up @@ -1669,16 +1669,16 @@ mod tests {
"Expected a dictionary page"
);

let page_locations = read_pages_locations(&file, column).unwrap();
let offset_indexes = read_offset_indexes(&file, column).unwrap();

let offset_index = page_locations[0].clone();
let page_locations = offset_indexes[0].page_locations.clone();

// We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
// so we expect one dictionary encoded page and then a page per row thereafter.
assert_eq!(
offset_index.len(),
page_locations.len(),
10,
"Expected 9 pages but got {offset_index:#?}"
"Expected 9 pages but got {page_locations:#?}"
);
}

Expand Down Expand Up @@ -3020,8 +3020,8 @@ mod tests {

assert_eq!(index.len(), 1);
assert_eq!(index[0].len(), 2); // 2 columns
assert_eq!(index[0][0].len(), 1); // 1 page
assert_eq!(index[0][1].len(), 1); // 1 page
assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
}

#[test]
Expand Down
8 changes: 2 additions & 6 deletions parquet/src/arrow/async_reader/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::{
acc_range, decode_column_index, decode_page_locations,
};
use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::FutureExt;
Expand Down Expand Up @@ -179,9 +177,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
x.columns()
.iter()
.map(|c| match c.offset_index_range() {
Some(r) => {
decode_page_locations(&data[r.start - offset..r.end - offset])
}
Some(r) => decode_offset_index(&data[r.start - offset..r.end - offset]),
None => Err(general_err!("missing offset index")),
})
.collect::<Result<Vec<_>>>()
Expand Down
24 changes: 13 additions & 11 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ use crate::column::page::{PageIterator, PageReader};
use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
use crate::file::FOOTER_SIZE;
use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, PageLocation};
use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};

mod metadata;
pub use metadata::*;
Expand Down Expand Up @@ -489,7 +490,7 @@ where
// TODO: calling build_array multiple times is wasteful

let meta = self.metadata.row_group(row_group_idx);
let page_locations = self
let offset_index = self
.metadata
.offset_index()
.map(|x| x[row_group_idx].as_slice());
Expand All @@ -499,7 +500,7 @@ where
// schema: meta.schema_descr_ptr(),
row_count: meta.num_rows() as usize,
column_chunks: vec![None; meta.columns().len()],
page_locations,
offset_index,
};

if let Some(filter) = self.filter.as_mut() {
Expand Down Expand Up @@ -703,7 +704,7 @@ where
/// An in-memory collection of column chunks
struct InMemoryRowGroup<'a> {
metadata: &'a RowGroupMetaData,
page_locations: Option<&'a [Vec<PageLocation>]>,
offset_index: Option<&'a [OffsetIndexMetaData]>,
column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
row_count: usize,
}
Expand All @@ -716,7 +717,7 @@ impl<'a> InMemoryRowGroup<'a> {
projection: &ProjectionMask,
selection: Option<&RowSelection>,
) -> Result<()> {
if let Some((selection, page_locations)) = selection.zip(self.page_locations) {
if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
// If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
// `RowSelection`
let mut page_start_offsets: Vec<Vec<usize>> = vec![];
Expand All @@ -734,14 +735,14 @@ impl<'a> InMemoryRowGroup<'a> {
// then we need to also fetch a dictionary page.
let mut ranges = vec![];
let (start, _len) = chunk_meta.byte_range();
match page_locations[idx].first() {
match offset_index[idx].page_locations.first() {
Some(first) if first.offset as u64 != start => {
ranges.push(start as usize..first.offset as usize);
}
_ => (),
}

ranges.extend(selection.scan_ranges(&page_locations[idx]));
ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
page_start_offsets.push(ranges.iter().map(|range| range.start).collect());

ranges
Expand Down Expand Up @@ -812,7 +813,9 @@ impl<'a> RowGroups for InMemoryRowGroup<'a> {
"Invalid column index {i}, column was not fetched"
))),
Some(data) => {
let page_locations = self.page_locations.map(|index| index[i].clone());
let page_locations = self
.offset_index
.map(|index| index[i].page_locations.clone());
let page_reader: Box<dyn PageReader> = Box::new(SerializedPageReader::new(
data.clone(),
self.metadata.column(i),
Expand Down Expand Up @@ -1529,7 +1532,7 @@ mod tests {
let metadata = parse_metadata(&data).unwrap();

let offset_index =
index_reader::read_pages_locations(&data, metadata.row_group(0).columns())
index_reader::read_offset_indexes(&data, metadata.row_group(0).columns())
.expect("reading offset index");

let row_group_meta = metadata.row_group(0).clone();
Expand All @@ -1538,7 +1541,6 @@ mod tests {
vec![row_group_meta],
None,
Some(vec![offset_index.clone()]),
None,
);

let metadata = Arc::new(metadata);
Expand Down Expand Up @@ -1575,7 +1577,7 @@ mod tests {
};

let mut skip = true;
let mut pages = offset_index[0].iter().peekable();
let mut pages = offset_index[0].page_locations.iter().peekable();

// Setup `RowSelection` so that we can skip every other page, selecting the last page
let mut selectors = vec![];
Expand Down
12 changes: 7 additions & 5 deletions parquet/src/bin/parquet-index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
use clap::Parser;
use parquet::errors::{ParquetError, Result};
use parquet::file::page_index::index::{Index, PageIndex};
use parquet::file::page_index::offset_index::OffsetIndexMetaData;
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::file::serialized_reader::ReadOptionsBuilder;
use parquet::format::PageLocation;
Expand Down Expand Up @@ -93,7 +94,8 @@ impl Args {
))
})?;

let row_counts = compute_row_counts(offset_index, row_group.num_rows());
let row_counts =
compute_row_counts(offset_index.page_locations.as_slice(), row_group.num_rows());
match &column_indices[column_idx] {
Index::NONE => println!("NO INDEX"),
Index::BOOLEAN(v) => print_index(&v.indexes, offset_index, &row_counts)?,
Expand Down Expand Up @@ -131,20 +133,20 @@ fn compute_row_counts(offset_index: &[PageLocation], rows: i64) -> Vec<i64> {
/// Prints index information for a single column chunk
fn print_index<T: std::fmt::Display>(
column_index: &[PageIndex<T>],
offset_index: &[PageLocation],
offset_index: &OffsetIndexMetaData,
row_counts: &[i64],
) -> Result<()> {
if column_index.len() != offset_index.len() {
if column_index.len() != offset_index.page_locations.len() {
return Err(ParquetError::General(format!(
"Index length mismatch, got {} and {}",
column_index.len(),
offset_index.len()
offset_index.page_locations.len()
)));
}

for (idx, ((c, o), row_count)) in column_index
.iter()
.zip(offset_index)
.zip(offset_index.page_locations())
.zip(row_counts)
.enumerate()
{
Expand Down
7 changes: 7 additions & 0 deletions parquet/src/file/metadata/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::data_type::private::ParquetValueType;
use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, KeyValue, RowGroupMetaData};
use crate::file::page_encoding_stats::PageEncodingStats;
use crate::file::page_index::index::{Index, NativeIndex, PageIndex};
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::statistics::{Statistics, ValueStatistics};
use crate::format::{BoundaryOrder, PageLocation, SortingColumn};
use std::sync::Arc;
Expand Down Expand Up @@ -144,6 +145,12 @@ impl HeapSize for Statistics {
}
}

impl HeapSize for OffsetIndexMetaData {
fn heap_size(&self) -> usize {
self.page_locations.heap_size() + self.unencoded_byte_array_data_bytes.heap_size()
}
}

impl HeapSize for Index {
fn heap_size(&self) -> usize {
match self {
Expand Down
Loading
Loading