diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index cd45d2abdbcd..b7fac6fe7c05 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -52,6 +52,7 @@ impl MetadataLoader { /// Create a new [`MetadataLoader`] by reading the footer information /// /// See [`fetch_parquet_metadata`] for the meaning of the individual parameters + #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")] pub async fn load(mut fetch: F, file_size: usize, prefetch: Option) -> Result { if file_size < FOOTER_SIZE { return Err(ParquetError::EOF(format!( @@ -108,6 +109,7 @@ impl MetadataLoader { } /// Create a new [`MetadataLoader`] from an existing [`ParquetMetaData`] + #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")] pub fn new(fetch: F, metadata: ParquetMetaData) -> Self { Self { fetch, @@ -120,6 +122,7 @@ impl MetadataLoader { /// /// * `column_index`: if true will load column index /// * `offset_index`: if true will load offset index + #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")] pub async fn load_page_index(&mut self, column_index: bool, offset_index: bool) -> Result<()> { if !column_index && !offset_index { return Ok(()); @@ -226,6 +229,7 @@ where /// in the first request, instead of 8, and only issue further requests /// if additional bytes are needed. Providing a `prefetch` hint can therefore /// significantly reduce the number of `fetch` requests, and consequently latency +#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")] pub async fn fetch_parquet_metadata( fetch: F, file_size: usize, @@ -236,10 +240,14 @@ where Fut: Future> + Send, { let fetch = MetadataFetchFn(fetch); - let loader = MetadataLoader::load(fetch, file_size, prefetch).await?; - Ok(loader.finish()) + ParquetMetaDataReader::new() + .with_prefetch_hint(prefetch) + .load_and_finish(fetch, file_size) + .await } +// these tests are all replicated in parquet::file::metadata::reader +#[allow(deprecated)] #[cfg(test)] mod tests { use super::*; diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 89e4d6adb552..5e8bdbc02eb1 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -212,6 +212,8 @@ impl ArrowReaderMetadata { input: &mut T, options: ArrowReaderOptions, ) -> Result { + // TODO: this is all rather awkward. It would be nice if AsyncFileReader::get_metadata + // took an argument to fetch the page indexes. let mut metadata = input.get_metadata().await?; if options.page_index @@ -219,9 +221,9 @@ impl ArrowReaderMetadata { && metadata.offset_index().is_none() { let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| e.as_ref().clone()); - let mut loader = MetadataLoader::new(input, m); - loader.load_page_index(true, true).await?; - metadata = Arc::new(loader.finish()) + let mut reader = ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true); + reader.load_page_index(input).await?; + metadata = Arc::new(reader.finish()?) } Self::try_new(metadata, options) } diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index 77c00e91a3aa..e6b47856ebe8 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -24,9 +24,9 @@ use futures::{FutureExt, TryFutureExt}; use object_store::{ObjectMeta, ObjectStore}; -use crate::arrow::async_reader::{AsyncFileReader, MetadataLoader}; +use crate::arrow::async_reader::AsyncFileReader; use crate::errors::Result; -use crate::file::metadata::ParquetMetaData; +use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; /// Reads Parquet files in object storage using [`ObjectStore`]. /// @@ -124,15 +124,14 @@ impl AsyncFileReader for ParquetObjectReader { fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { Box::pin(async move { - let preload_column_index = self.preload_column_index; - let preload_offset_index = self.preload_offset_index; let file_size = self.meta.size; - let prefetch = self.metadata_size_hint; - let mut loader = MetadataLoader::load(self, file_size, prefetch).await?; - loader - .load_page_index(preload_column_index, preload_offset_index) + let metadata = ParquetMetaDataReader::new() + .with_column_indexes(self.preload_column_index) + .with_offset_indexes(self.preload_offset_index) + .with_prefetch_hint(self.metadata_size_hint) + .load_and_finish(self, file_size) .await?; - Ok(Arc::new(loader.finish())) + Ok(Arc::new(metadata)) }) } } diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 9e00c6860434..3fd2bd76f6b8 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -329,13 +329,18 @@ impl ParquetMetaDataReader { return Ok(()); } - self.load_page_index(fetch, remainder).await + self.load_page_index_with_remainder(fetch, remainder).await } /// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already /// been obtained. See [`Self::new_with_metadata()`]. #[cfg(feature = "async")] - pub async fn load_page_index( + pub async fn load_page_index(&mut self, fetch: F) -> Result<()> { + self.load_page_index_with_remainder(fetch, None).await + } + + #[cfg(feature = "async")] + async fn load_page_index_with_remainder( &mut self, mut fetch: F, remainder: Option<(usize, Bytes)>, @@ -836,7 +841,7 @@ mod async_tests { struct MetadataFetchFn(F); - impl MetadataFetch for MetadataFetchFn + impl<'a, F, Fut> MetadataFetch for &'a mut MetadataFetchFn where F: FnMut(Range) -> Fut + Send, Fut: Future> + Send, @@ -865,14 +870,14 @@ mod async_tests { let expected = expected.file_metadata().schema(); let fetch_count = AtomicUsize::new(0); - let mut fetch = |range| { + let fetch = |range| { fetch_count.fetch_add(1, Ordering::SeqCst); futures::future::ready(read_range(&mut file, range)) }; - let input = MetadataFetchFn(&mut fetch); + let mut f = MetadataFetchFn(fetch); let actual = ParquetMetaDataReader::new() - .load_and_finish(input, len) + .load_and_finish(&mut f, len) .await .unwrap(); assert_eq!(actual.file_metadata().schema(), expected); @@ -880,10 +885,9 @@ mod async_tests { // Metadata hint too small - below footer size fetch_count.store(0, Ordering::SeqCst); - let input = MetadataFetchFn(&mut fetch); let actual = ParquetMetaDataReader::new() .with_prefetch_hint(Some(7)) - .load_and_finish(input, len) + .load_and_finish(&mut f, len) .await .unwrap(); assert_eq!(actual.file_metadata().schema(), expected); @@ -891,10 +895,9 @@ mod async_tests { // Metadata hint too small fetch_count.store(0, Ordering::SeqCst); - let input = MetadataFetchFn(&mut fetch); let actual = ParquetMetaDataReader::new() .with_prefetch_hint(Some(10)) - .load_and_finish(input, len) + .load_and_finish(&mut f, len) .await .unwrap(); assert_eq!(actual.file_metadata().schema(), expected); @@ -902,10 +905,9 @@ mod async_tests { // Metadata hint too large fetch_count.store(0, Ordering::SeqCst); - let input = MetadataFetchFn(&mut fetch); let actual = ParquetMetaDataReader::new() .with_prefetch_hint(Some(500)) - .load_and_finish(input, len) + .load_and_finish(&mut f, len) .await .unwrap(); assert_eq!(actual.file_metadata().schema(), expected); @@ -913,26 +915,23 @@ mod async_tests { // Metadata hint exactly correct fetch_count.store(0, Ordering::SeqCst); - let input = MetadataFetchFn(&mut fetch); let actual = ParquetMetaDataReader::new() .with_prefetch_hint(Some(428)) - .load_and_finish(input, len) + .load_and_finish(&mut f, len) .await .unwrap(); assert_eq!(actual.file_metadata().schema(), expected); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); - let input = MetadataFetchFn(&mut fetch); let err = ParquetMetaDataReader::new() - .load_and_finish(input, 4) + .load_and_finish(&mut f, 4) .await .unwrap_err() .to_string(); assert_eq!(err, "EOF: file size of 4 is less than footer"); - let input = MetadataFetchFn(&mut fetch); let err = ParquetMetaDataReader::new() - .load_and_finish(input, 20) + .load_and_finish(&mut f, 20) .await .unwrap_err() .to_string(); @@ -949,42 +948,39 @@ mod async_tests { futures::future::ready(read_range(&mut file, range)) }; - let f = MetadataFetchFn(&mut fetch); + let mut f = MetadataFetchFn(&mut fetch); let mut loader = ParquetMetaDataReader::new().with_page_indexes(true); - loader.try_load(f, len).await.unwrap(); + loader.try_load(&mut f, len).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 3); let metadata = loader.finish().unwrap(); assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); // Prefetch just footer exactly fetch_count.store(0, Ordering::SeqCst); - let f = MetadataFetchFn(&mut fetch); let mut loader = ParquetMetaDataReader::new() .with_page_indexes(true) .with_prefetch_hint(Some(1729)); - loader.try_load(f, len).await.unwrap(); + loader.try_load(&mut f, len).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 2); let metadata = loader.finish().unwrap(); assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); // Prefetch more than footer but not enough fetch_count.store(0, Ordering::SeqCst); - let f = MetadataFetchFn(&mut fetch); let mut loader = ParquetMetaDataReader::new() .with_page_indexes(true) .with_prefetch_hint(Some(130649)); - loader.try_load(f, len).await.unwrap(); + loader.try_load(&mut f, len).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 2); let metadata = loader.finish().unwrap(); assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); // Prefetch exactly enough fetch_count.store(0, Ordering::SeqCst); - let f = MetadataFetchFn(&mut fetch); let metadata = ParquetMetaDataReader::new() .with_page_indexes(true) .with_prefetch_hint(Some(130650)) - .load_and_finish(f, len) + .load_and_finish(&mut f, len) .await .unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); diff --git a/parquet/src/file/metadata/writer.rs b/parquet/src/file/metadata/writer.rs index db78606e42ea..44328c635fed 100644 --- a/parquet/src/file/metadata/writer.rs +++ b/parquet/src/file/metadata/writer.rs @@ -516,7 +516,7 @@ mod tests { /// Temporary function so we can test loading metadata with page indexes /// while we haven't fully figured out how to load it cleanly async fn load_metadata_from_bytes(file_size: usize, data: Bytes) -> ParquetMetaData { - use crate::arrow::async_reader::{MetadataFetch, MetadataLoader}; + use crate::arrow::async_reader::MetadataFetch; use crate::errors::Result as ParquetResult; use futures::future::BoxFuture; use futures::FutureExt; @@ -569,13 +569,11 @@ mod tests { Box::new(AsyncBytes::new(data)), file_size - metadata_length..file_size, ); - let metadata = MetadataLoader::load(&mut reader, file_size, None) + ParquetMetaDataReader::new() + .with_page_indexes(true) + .load_and_finish(&mut reader, file_size) .await - .unwrap(); - let loaded_metadata = metadata.finish(); - let mut metadata = MetadataLoader::new(&mut reader, loaded_metadata); - metadata.load_page_index(true, true).await.unwrap(); - metadata.finish() + .unwrap() } fn check_columns_are_equivalent(left: &ColumnChunkMetaData, right: &ColumnChunkMetaData) { diff --git a/parquet/tests/arrow_reader/bad_data.rs b/parquet/tests/arrow_reader/bad_data.rs index a73864070d9f..e2975c17c8b9 100644 --- a/parquet/tests/arrow_reader/bad_data.rs +++ b/parquet/tests/arrow_reader/bad_data.rs @@ -140,20 +140,23 @@ fn read_file(name: &str) -> Result { #[tokio::test] async fn bad_metadata_err() { use bytes::Bytes; - use parquet::arrow::async_reader::MetadataLoader; + use parquet::file::metadata::ParquetMetaDataReader; let metadata_buffer = Bytes::from_static(include_bytes!("bad_raw_metadata.bin")); let metadata_length = metadata_buffer.len(); let mut reader = std::io::Cursor::new(&metadata_buffer); - let mut loader = MetadataLoader::load(&mut reader, metadata_length, None) - .await - .unwrap(); - loader.load_page_index(false, false).await.unwrap(); - loader.load_page_index(false, true).await.unwrap(); + let mut loader = ParquetMetaDataReader::new(); + loader.try_load(&mut reader, metadata_length).await.unwrap(); + loader = loader.with_page_indexes(false); + loader.load_page_index(&mut reader).await.unwrap(); - let err = loader.load_page_index(true, false).await.unwrap_err(); + loader = loader.with_offset_indexes(true); + loader.load_page_index(&mut reader).await.unwrap(); + + loader = loader.with_column_indexes(true); + let err = loader.load_page_index(&mut reader).await.unwrap_err(); assert_eq!( err.to_string(),