Skip to content

Commit

Permalink
Deprecate MetadataLoader (#6474)
Browse files Browse the repository at this point in the history
* deprecate MetadataLoader

* change signature of the load functions

* fix up fetch_parquet_metadata

* can now use self.meta.size directly

* revert changes to load API

* revert change to test code
  • Loading branch information
etseidl authored Sep 30, 2024
1 parent f0e39cc commit 3293a8c
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 54 deletions.
12 changes: 10 additions & 2 deletions parquet/src/arrow/async_reader/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
/// Create a new [`MetadataLoader`] by reading the footer information
///
/// See [`fetch_parquet_metadata`] for the meaning of the individual parameters
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub async fn load(mut fetch: F, file_size: usize, prefetch: Option<usize>) -> Result<Self> {
if file_size < FOOTER_SIZE {
return Err(ParquetError::EOF(format!(
Expand Down Expand Up @@ -108,6 +109,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
}

/// Create a new [`MetadataLoader`] from an existing [`ParquetMetaData`]
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub fn new(fetch: F, metadata: ParquetMetaData) -> Self {
Self {
fetch,
Expand All @@ -120,6 +122,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
///
/// * `column_index`: if true will load column index
/// * `offset_index`: if true will load offset index
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub async fn load_page_index(&mut self, column_index: bool, offset_index: bool) -> Result<()> {
if !column_index && !offset_index {
return Ok(());
Expand Down Expand Up @@ -226,6 +229,7 @@ where
/// in the first request, instead of 8, and only issue further requests
/// if additional bytes are needed. Providing a `prefetch` hint can therefore
/// significantly reduce the number of `fetch` requests, and consequently latency
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub async fn fetch_parquet_metadata<F, Fut>(
fetch: F,
file_size: usize,
Expand All @@ -236,10 +240,14 @@ where
Fut: Future<Output = Result<Bytes>> + Send,
{
let fetch = MetadataFetchFn(fetch);
let loader = MetadataLoader::load(fetch, file_size, prefetch).await?;
Ok(loader.finish())
ParquetMetaDataReader::new()
.with_prefetch_hint(prefetch)
.load_and_finish(fetch, file_size)
.await
}

// these tests are all replicated in parquet::file::metadata::reader
#[allow(deprecated)]
#[cfg(test)]
mod tests {
use super::*;
Expand Down
8 changes: 5 additions & 3 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,18 @@ impl ArrowReaderMetadata {
input: &mut T,
options: ArrowReaderOptions,
) -> Result<Self> {
// TODO: this is all rather awkward. It would be nice if AsyncFileReader::get_metadata
// took an argument to fetch the page indexes.
let mut metadata = input.get_metadata().await?;

if options.page_index
&& metadata.column_index().is_none()
&& metadata.offset_index().is_none()
{
let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| e.as_ref().clone());
let mut loader = MetadataLoader::new(input, m);
loader.load_page_index(true, true).await?;
metadata = Arc::new(loader.finish())
let mut reader = ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true);
reader.load_page_index(input).await?;
metadata = Arc::new(reader.finish()?)
}
Self::try_new(metadata, options)
}
Expand Down
17 changes: 8 additions & 9 deletions parquet/src/arrow/async_reader/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use futures::{FutureExt, TryFutureExt};

use object_store::{ObjectMeta, ObjectStore};

use crate::arrow::async_reader::{AsyncFileReader, MetadataLoader};
use crate::arrow::async_reader::AsyncFileReader;
use crate::errors::Result;
use crate::file::metadata::ParquetMetaData;
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};

/// Reads Parquet files in object storage using [`ObjectStore`].
///
Expand Down Expand Up @@ -124,15 +124,14 @@ impl AsyncFileReader for ParquetObjectReader {

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let preload_column_index = self.preload_column_index;
let preload_offset_index = self.preload_offset_index;
let file_size = self.meta.size;
let prefetch = self.metadata_size_hint;
let mut loader = MetadataLoader::load(self, file_size, prefetch).await?;
loader
.load_page_index(preload_column_index, preload_offset_index)
let metadata = ParquetMetaDataReader::new()
.with_column_indexes(self.preload_column_index)
.with_offset_indexes(self.preload_offset_index)
.with_prefetch_hint(self.metadata_size_hint)
.load_and_finish(self, file_size)
.await?;
Ok(Arc::new(loader.finish()))
Ok(Arc::new(metadata))
})
}
}
Expand Down
48 changes: 22 additions & 26 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,18 @@ impl ParquetMetaDataReader {
return Ok(());
}

self.load_page_index(fetch, remainder).await
self.load_page_index_with_remainder(fetch, remainder).await
}

/// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already
/// been obtained. See [`Self::new_with_metadata()`].
#[cfg(feature = "async")]
pub async fn load_page_index<F: MetadataFetch>(
pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
self.load_page_index_with_remainder(fetch, None).await
}

#[cfg(feature = "async")]
async fn load_page_index_with_remainder<F: MetadataFetch>(
&mut self,
mut fetch: F,
remainder: Option<(usize, Bytes)>,
Expand Down Expand Up @@ -836,7 +841,7 @@ mod async_tests {

struct MetadataFetchFn<F>(F);

impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
impl<'a, F, Fut> MetadataFetch for &'a mut MetadataFetchFn<F>
where
F: FnMut(Range<usize>) -> Fut + Send,
Fut: Future<Output = Result<Bytes>> + Send,
Expand Down Expand Up @@ -865,74 +870,68 @@ mod async_tests {
let expected = expected.file_metadata().schema();
let fetch_count = AtomicUsize::new(0);

let mut fetch = |range| {
let fetch = |range| {
fetch_count.fetch_add(1, Ordering::SeqCst);
futures::future::ready(read_range(&mut file, range))
};

let input = MetadataFetchFn(&mut fetch);
let mut f = MetadataFetchFn(fetch);
let actual = ParquetMetaDataReader::new()
.load_and_finish(input, len)
.load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);

// Metadata hint too small - below footer size
fetch_count.store(0, Ordering::SeqCst);
let input = MetadataFetchFn(&mut fetch);
let actual = ParquetMetaDataReader::new()
.with_prefetch_hint(Some(7))
.load_and_finish(input, len)
.load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);

// Metadata hint too small
fetch_count.store(0, Ordering::SeqCst);
let input = MetadataFetchFn(&mut fetch);
let actual = ParquetMetaDataReader::new()
.with_prefetch_hint(Some(10))
.load_and_finish(input, len)
.load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);

// Metadata hint too large
fetch_count.store(0, Ordering::SeqCst);
let input = MetadataFetchFn(&mut fetch);
let actual = ParquetMetaDataReader::new()
.with_prefetch_hint(Some(500))
.load_and_finish(input, len)
.load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);

// Metadata hint exactly correct
fetch_count.store(0, Ordering::SeqCst);
let input = MetadataFetchFn(&mut fetch);
let actual = ParquetMetaDataReader::new()
.with_prefetch_hint(Some(428))
.load_and_finish(input, len)
.load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);

let input = MetadataFetchFn(&mut fetch);
let err = ParquetMetaDataReader::new()
.load_and_finish(input, 4)
.load_and_finish(&mut f, 4)
.await
.unwrap_err()
.to_string();
assert_eq!(err, "EOF: file size of 4 is less than footer");

let input = MetadataFetchFn(&mut fetch);
let err = ParquetMetaDataReader::new()
.load_and_finish(input, 20)
.load_and_finish(&mut f, 20)
.await
.unwrap_err()
.to_string();
Expand All @@ -949,42 +948,39 @@ mod async_tests {
futures::future::ready(read_range(&mut file, range))
};

let f = MetadataFetchFn(&mut fetch);
let mut f = MetadataFetchFn(&mut fetch);
let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
loader.try_load(f, len).await.unwrap();
loader.try_load(&mut f, len).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
let metadata = loader.finish().unwrap();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch just footer exactly
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let mut loader = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(1729));
loader.try_load(f, len).await.unwrap();
loader.try_load(&mut f, len).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
let metadata = loader.finish().unwrap();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch more than footer but not enough
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let mut loader = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(130649));
loader.try_load(f, len).await.unwrap();
loader.try_load(&mut f, len).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
let metadata = loader.finish().unwrap();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch exactly enough
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(130650))
.load_and_finish(f, len)
.load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
Expand Down
12 changes: 5 additions & 7 deletions parquet/src/file/metadata/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ mod tests {
/// Temporary function so we can test loading metadata with page indexes
/// while we haven't fully figured out how to load it cleanly
async fn load_metadata_from_bytes(file_size: usize, data: Bytes) -> ParquetMetaData {
use crate::arrow::async_reader::{MetadataFetch, MetadataLoader};
use crate::arrow::async_reader::MetadataFetch;
use crate::errors::Result as ParquetResult;
use futures::future::BoxFuture;
use futures::FutureExt;
Expand Down Expand Up @@ -569,13 +569,11 @@ mod tests {
Box::new(AsyncBytes::new(data)),
file_size - metadata_length..file_size,
);
let metadata = MetadataLoader::load(&mut reader, file_size, None)
ParquetMetaDataReader::new()
.with_page_indexes(true)
.load_and_finish(&mut reader, file_size)
.await
.unwrap();
let loaded_metadata = metadata.finish();
let mut metadata = MetadataLoader::new(&mut reader, loaded_metadata);
metadata.load_page_index(true, true).await.unwrap();
metadata.finish()
.unwrap()
}

fn check_columns_are_equivalent(left: &ColumnChunkMetaData, right: &ColumnChunkMetaData) {
Expand Down
17 changes: 10 additions & 7 deletions parquet/tests/arrow_reader/bad_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,20 +140,23 @@ fn read_file(name: &str) -> Result<usize, ParquetError> {
#[tokio::test]
async fn bad_metadata_err() {
use bytes::Bytes;
use parquet::arrow::async_reader::MetadataLoader;
use parquet::file::metadata::ParquetMetaDataReader;

let metadata_buffer = Bytes::from_static(include_bytes!("bad_raw_metadata.bin"));

let metadata_length = metadata_buffer.len();

let mut reader = std::io::Cursor::new(&metadata_buffer);
let mut loader = MetadataLoader::load(&mut reader, metadata_length, None)
.await
.unwrap();
loader.load_page_index(false, false).await.unwrap();
loader.load_page_index(false, true).await.unwrap();
let mut loader = ParquetMetaDataReader::new();
loader.try_load(&mut reader, metadata_length).await.unwrap();
loader = loader.with_page_indexes(false);
loader.load_page_index(&mut reader).await.unwrap();

let err = loader.load_page_index(true, false).await.unwrap_err();
loader = loader.with_offset_indexes(true);
loader.load_page_index(&mut reader).await.unwrap();

loader = loader.with_column_indexes(true);
let err = loader.load_page_index(&mut reader).await.unwrap_err();

assert_eq!(
err.to_string(),
Expand Down

0 comments on commit 3293a8c

Please sign in to comment.