Skip to content

Commit

Permalink
Deprecate read_page_locations() and simplify offset index in `Parquet…
Browse files Browse the repository at this point in the history
…MetaData` (#6095)

* deprecate read_page_locations

* add to_thrift() to OffsetIndexMetaData
  • Loading branch information
etseidl authored Jul 23, 2024
1 parent 81c34ac commit 3bc9987
Show file tree
Hide file tree
Showing 13 changed files with 118 additions and 113 deletions.
4 changes: 2 additions & 2 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ impl ArrowReaderMetadata {
let offset_index = metadata
.row_groups()
.iter()
.map(|rg| index_reader::read_pages_locations(reader, rg.columns()))
.map(|rg| index_reader::read_offset_indexes(reader, rg.columns()))
.collect::<Result<Vec<_>>>()?;

metadata.set_offset_index(Some(offset_index))
Expand Down Expand Up @@ -689,7 +689,7 @@ impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
// To avoid `i[rg_idx][self.oolumn_idx`] panic, we need to filter out empty `i[rg_idx]`.
let page_locations = offset_index
.filter(|i| !i[rg_idx].is_empty())
.map(|i| i[rg_idx][self.column_idx].clone());
.map(|i| i[rg_idx][self.column_idx].page_locations.clone());
let total_rows = rg.num_rows() as usize;
let reader = self.reader.clone();

Expand Down
14 changes: 10 additions & 4 deletions parquet/src/arrow/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1349,7 +1349,9 @@ impl<'a> StatisticsConverter<'a> {
let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();
let num_data_pages = &column_offset_index[*rg_index][parquet_index]
.page_locations()
.len();

(*num_data_pages, column_page_index_per_row_group_per_column)
});
Expand Down Expand Up @@ -1378,7 +1380,9 @@ impl<'a> StatisticsConverter<'a> {
let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();
let num_data_pages = &column_offset_index[*rg_index][parquet_index]
.page_locations()
.len();

(*num_data_pages, column_page_index_per_row_group_per_column)
});
Expand Down Expand Up @@ -1408,7 +1412,9 @@ impl<'a> StatisticsConverter<'a> {
let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();
let num_data_pages = &column_offset_index[*rg_index][parquet_index]
.page_locations()
.len();

(*num_data_pages, column_page_index_per_row_group_per_column)
});
Expand Down Expand Up @@ -1450,7 +1456,7 @@ impl<'a> StatisticsConverter<'a> {

let mut row_count_total = Vec::new();
for rg_idx in row_group_indices {
let page_locations = &column_offset_index[*rg_idx][parquet_index];
let page_locations = &column_offset_index[*rg_idx][parquet_index].page_locations();

let row_count_per_page = page_locations
.windows(2)
Expand Down
14 changes: 7 additions & 7 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,7 @@ mod tests {
use crate::data_type::AsBytes;
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::read_pages_locations;
use crate::file::page_index::index_reader::read_offset_indexes;
use crate::file::properties::{
BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
};
Expand Down Expand Up @@ -1669,16 +1669,16 @@ mod tests {
"Expected a dictionary page"
);

let page_locations = read_pages_locations(&file, column).unwrap();
let offset_indexes = read_offset_indexes(&file, column).unwrap();

let offset_index = page_locations[0].clone();
let page_locations = offset_indexes[0].page_locations.clone();

// We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
// so we expect one dictionary encoded page and then a page per row thereafter.
assert_eq!(
offset_index.len(),
page_locations.len(),
10,
"Expected 9 pages but got {offset_index:#?}"
"Expected 9 pages but got {page_locations:#?}"
);
}

Expand Down Expand Up @@ -3020,8 +3020,8 @@ mod tests {

assert_eq!(index.len(), 1);
assert_eq!(index[0].len(), 2); // 2 columns
assert_eq!(index[0][0].len(), 1); // 1 page
assert_eq!(index[0][1].len(), 1); // 1 page
assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
}

#[test]
Expand Down
8 changes: 2 additions & 6 deletions parquet/src/arrow/async_reader/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::{
acc_range, decode_column_index, decode_page_locations,
};
use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::FutureExt;
Expand Down Expand Up @@ -179,9 +177,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
x.columns()
.iter()
.map(|c| match c.offset_index_range() {
Some(r) => {
decode_page_locations(&data[r.start - offset..r.end - offset])
}
Some(r) => decode_offset_index(&data[r.start - offset..r.end - offset]),
None => Err(general_err!("missing offset index")),
})
.collect::<Result<Vec<_>>>()
Expand Down
24 changes: 13 additions & 11 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ use crate::column::page::{PageIterator, PageReader};
use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
use crate::file::FOOTER_SIZE;
use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, PageLocation};
use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};

mod metadata;
pub use metadata::*;
Expand Down Expand Up @@ -489,7 +490,7 @@ where
// TODO: calling build_array multiple times is wasteful

let meta = self.metadata.row_group(row_group_idx);
let page_locations = self
let offset_index = self
.metadata
.offset_index()
.map(|x| x[row_group_idx].as_slice());
Expand All @@ -499,7 +500,7 @@ where
// schema: meta.schema_descr_ptr(),
row_count: meta.num_rows() as usize,
column_chunks: vec![None; meta.columns().len()],
page_locations,
offset_index,
};

if let Some(filter) = self.filter.as_mut() {
Expand Down Expand Up @@ -703,7 +704,7 @@ where
/// An in-memory collection of column chunks
struct InMemoryRowGroup<'a> {
metadata: &'a RowGroupMetaData,
page_locations: Option<&'a [Vec<PageLocation>]>,
offset_index: Option<&'a [OffsetIndexMetaData]>,
column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
row_count: usize,
}
Expand All @@ -716,7 +717,7 @@ impl<'a> InMemoryRowGroup<'a> {
projection: &ProjectionMask,
selection: Option<&RowSelection>,
) -> Result<()> {
if let Some((selection, page_locations)) = selection.zip(self.page_locations) {
if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
// If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
// `RowSelection`
let mut page_start_offsets: Vec<Vec<usize>> = vec![];
Expand All @@ -734,14 +735,14 @@ impl<'a> InMemoryRowGroup<'a> {
// then we need to also fetch a dictionary page.
let mut ranges = vec![];
let (start, _len) = chunk_meta.byte_range();
match page_locations[idx].first() {
match offset_index[idx].page_locations.first() {
Some(first) if first.offset as u64 != start => {
ranges.push(start as usize..first.offset as usize);
}
_ => (),
}

ranges.extend(selection.scan_ranges(&page_locations[idx]));
ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
page_start_offsets.push(ranges.iter().map(|range| range.start).collect());

ranges
Expand Down Expand Up @@ -812,7 +813,9 @@ impl<'a> RowGroups for InMemoryRowGroup<'a> {
"Invalid column index {i}, column was not fetched"
))),
Some(data) => {
let page_locations = self.page_locations.map(|index| index[i].clone());
let page_locations = self
.offset_index
.map(|index| index[i].page_locations.clone());
let page_reader: Box<dyn PageReader> = Box::new(SerializedPageReader::new(
data.clone(),
self.metadata.column(i),
Expand Down Expand Up @@ -1529,7 +1532,7 @@ mod tests {
let metadata = parse_metadata(&data).unwrap();

let offset_index =
index_reader::read_pages_locations(&data, metadata.row_group(0).columns())
index_reader::read_offset_indexes(&data, metadata.row_group(0).columns())
.expect("reading offset index");

let row_group_meta = metadata.row_group(0).clone();
Expand All @@ -1538,7 +1541,6 @@ mod tests {
vec![row_group_meta],
None,
Some(vec![offset_index.clone()]),
None,
);

let metadata = Arc::new(metadata);
Expand Down Expand Up @@ -1575,7 +1577,7 @@ mod tests {
};

let mut skip = true;
let mut pages = offset_index[0].iter().peekable();
let mut pages = offset_index[0].page_locations.iter().peekable();

// Setup `RowSelection` so that we can skip every other page, selecting the last page
let mut selectors = vec![];
Expand Down
12 changes: 7 additions & 5 deletions parquet/src/bin/parquet-index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
use clap::Parser;
use parquet::errors::{ParquetError, Result};
use parquet::file::page_index::index::{Index, PageIndex};
use parquet::file::page_index::offset_index::OffsetIndexMetaData;
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::file::serialized_reader::ReadOptionsBuilder;
use parquet::format::PageLocation;
Expand Down Expand Up @@ -93,7 +94,8 @@ impl Args {
))
})?;

let row_counts = compute_row_counts(offset_index, row_group.num_rows());
let row_counts =
compute_row_counts(offset_index.page_locations.as_slice(), row_group.num_rows());
match &column_indices[column_idx] {
Index::NONE => println!("NO INDEX"),
Index::BOOLEAN(v) => print_index(&v.indexes, offset_index, &row_counts)?,
Expand Down Expand Up @@ -131,20 +133,20 @@ fn compute_row_counts(offset_index: &[PageLocation], rows: i64) -> Vec<i64> {
/// Prints index information for a single column chunk
fn print_index<T: std::fmt::Display>(
column_index: &[PageIndex<T>],
offset_index: &[PageLocation],
offset_index: &OffsetIndexMetaData,
row_counts: &[i64],
) -> Result<()> {
if column_index.len() != offset_index.len() {
if column_index.len() != offset_index.page_locations.len() {
return Err(ParquetError::General(format!(
"Index length mismatch, got {} and {}",
column_index.len(),
offset_index.len()
offset_index.page_locations.len()
)));
}

for (idx, ((c, o), row_count)) in column_index
.iter()
.zip(offset_index)
.zip(offset_index.page_locations())
.zip(row_counts)
.enumerate()
{
Expand Down
7 changes: 7 additions & 0 deletions parquet/src/file/metadata/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::data_type::private::ParquetValueType;
use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, KeyValue, RowGroupMetaData};
use crate::file::page_encoding_stats::PageEncodingStats;
use crate::file::page_index::index::{Index, NativeIndex, PageIndex};
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::statistics::{Statistics, ValueStatistics};
use crate::format::{BoundaryOrder, PageLocation, SortingColumn};
use std::sync::Arc;
Expand Down Expand Up @@ -144,6 +145,12 @@ impl HeapSize for Statistics {
}
}

impl HeapSize for OffsetIndexMetaData {
fn heap_size(&self) -> usize {
self.page_locations.heap_size() + self.unencoded_byte_array_data_bytes.heap_size()
}
}

impl HeapSize for Index {
fn heap_size(&self) -> usize {
match self {
Expand Down
Loading

0 comments on commit 3bc9987

Please sign in to comment.