From 5843f452473d6f884265a791eea7b1e6a5200980 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Thu, 26 Sep 2024 09:07:16 -0700 Subject: [PATCH 1/6] deprecate MetadataLoader --- parquet/src/arrow/async_reader/metadata.rs | 12 ++++++++++-- parquet/src/arrow/async_reader/mod.rs | 8 +++++--- parquet/src/arrow/async_reader/store.rs | 17 ++++++++--------- parquet/src/file/metadata/reader.rs | 9 +++++++-- parquet/src/file/metadata/writer.rs | 12 +++++------- parquet/tests/arrow_reader/bad_data.rs | 17 ++++++++++------- 6 files changed, 45 insertions(+), 30 deletions(-) diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index cd45d2abdbcd..b7fac6fe7c05 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -52,6 +52,7 @@ impl MetadataLoader { /// Create a new [`MetadataLoader`] by reading the footer information /// /// See [`fetch_parquet_metadata`] for the meaning of the individual parameters + #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")] pub async fn load(mut fetch: F, file_size: usize, prefetch: Option) -> Result { if file_size < FOOTER_SIZE { return Err(ParquetError::EOF(format!( @@ -108,6 +109,7 @@ impl MetadataLoader { } /// Create a new [`MetadataLoader`] from an existing [`ParquetMetaData`] + #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")] pub fn new(fetch: F, metadata: ParquetMetaData) -> Self { Self { fetch, @@ -120,6 +122,7 @@ impl MetadataLoader { /// /// * `column_index`: if true will load column index /// * `offset_index`: if true will load offset index + #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")] pub async fn load_page_index(&mut self, column_index: bool, offset_index: bool) -> Result<()> { if !column_index && !offset_index { return Ok(()); @@ -226,6 +229,7 @@ where /// in the first request, instead of 8, and only issue further requests /// if additional bytes are needed. Providing a `prefetch` hint can therefore /// significantly reduce the number of `fetch` requests, and consequently latency +#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")] pub async fn fetch_parquet_metadata( fetch: F, file_size: usize, @@ -236,10 +240,14 @@ where Fut: Future> + Send, { let fetch = MetadataFetchFn(fetch); - let loader = MetadataLoader::load(fetch, file_size, prefetch).await?; - Ok(loader.finish()) + ParquetMetaDataReader::new() + .with_prefetch_hint(prefetch) + .load_and_finish(fetch, file_size) + .await } +// these tests are all replicated in parquet::file::metadata::reader +#[allow(deprecated)] #[cfg(test)] mod tests { use super::*; diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 89e4d6adb552..5e8bdbc02eb1 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -212,6 +212,8 @@ impl ArrowReaderMetadata { input: &mut T, options: ArrowReaderOptions, ) -> Result { + // TODO: this is all rather awkward. It would be nice if AsyncFileReader::get_metadata + // took an argument to fetch the page indexes. let mut metadata = input.get_metadata().await?; if options.page_index @@ -219,9 +221,9 @@ impl ArrowReaderMetadata { && metadata.offset_index().is_none() { let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| e.as_ref().clone()); - let mut loader = MetadataLoader::new(input, m); - loader.load_page_index(true, true).await?; - metadata = Arc::new(loader.finish()) + let mut reader = ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true); + reader.load_page_index(input).await?; + metadata = Arc::new(reader.finish()?) } Self::try_new(metadata, options) } diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index 77c00e91a3aa..e6b47856ebe8 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -24,9 +24,9 @@ use futures::{FutureExt, TryFutureExt}; use object_store::{ObjectMeta, ObjectStore}; -use crate::arrow::async_reader::{AsyncFileReader, MetadataLoader}; +use crate::arrow::async_reader::AsyncFileReader; use crate::errors::Result; -use crate::file::metadata::ParquetMetaData; +use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; /// Reads Parquet files in object storage using [`ObjectStore`]. /// @@ -124,15 +124,14 @@ impl AsyncFileReader for ParquetObjectReader { fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { Box::pin(async move { - let preload_column_index = self.preload_column_index; - let preload_offset_index = self.preload_offset_index; let file_size = self.meta.size; - let prefetch = self.metadata_size_hint; - let mut loader = MetadataLoader::load(self, file_size, prefetch).await?; - loader - .load_page_index(preload_column_index, preload_offset_index) + let metadata = ParquetMetaDataReader::new() + .with_column_indexes(self.preload_column_index) + .with_offset_indexes(self.preload_offset_index) + .with_prefetch_hint(self.metadata_size_hint) + .load_and_finish(self, file_size) .await?; - Ok(Arc::new(loader.finish())) + Ok(Arc::new(metadata)) }) } } diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 9e00c6860434..88f53d02f3f4 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -329,13 +329,18 @@ impl ParquetMetaDataReader { return Ok(()); } - self.load_page_index(fetch, remainder).await + self.load_page_index_with_remainder(fetch, remainder).await } /// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already /// been obtained. See [`Self::new_with_metadata()`]. #[cfg(feature = "async")] - pub async fn load_page_index( + pub async fn load_page_index(&mut self, fetch: F) -> Result<()> { + self.load_page_index_with_remainder(fetch, None).await + } + + #[cfg(feature = "async")] + async fn load_page_index_with_remainder( &mut self, mut fetch: F, remainder: Option<(usize, Bytes)>, diff --git a/parquet/src/file/metadata/writer.rs b/parquet/src/file/metadata/writer.rs index db78606e42ea..44328c635fed 100644 --- a/parquet/src/file/metadata/writer.rs +++ b/parquet/src/file/metadata/writer.rs @@ -516,7 +516,7 @@ mod tests { /// Temporary function so we can test loading metadata with page indexes /// while we haven't fully figured out how to load it cleanly async fn load_metadata_from_bytes(file_size: usize, data: Bytes) -> ParquetMetaData { - use crate::arrow::async_reader::{MetadataFetch, MetadataLoader}; + use crate::arrow::async_reader::MetadataFetch; use crate::errors::Result as ParquetResult; use futures::future::BoxFuture; use futures::FutureExt; @@ -569,13 +569,11 @@ mod tests { Box::new(AsyncBytes::new(data)), file_size - metadata_length..file_size, ); - let metadata = MetadataLoader::load(&mut reader, file_size, None) + ParquetMetaDataReader::new() + .with_page_indexes(true) + .load_and_finish(&mut reader, file_size) .await - .unwrap(); - let loaded_metadata = metadata.finish(); - let mut metadata = MetadataLoader::new(&mut reader, loaded_metadata); - metadata.load_page_index(true, true).await.unwrap(); - metadata.finish() + .unwrap() } fn check_columns_are_equivalent(left: &ColumnChunkMetaData, right: &ColumnChunkMetaData) { diff --git a/parquet/tests/arrow_reader/bad_data.rs b/parquet/tests/arrow_reader/bad_data.rs index a73864070d9f..e2975c17c8b9 100644 --- a/parquet/tests/arrow_reader/bad_data.rs +++ b/parquet/tests/arrow_reader/bad_data.rs @@ -140,20 +140,23 @@ fn read_file(name: &str) -> Result { #[tokio::test] async fn bad_metadata_err() { use bytes::Bytes; - use parquet::arrow::async_reader::MetadataLoader; + use parquet::file::metadata::ParquetMetaDataReader; let metadata_buffer = Bytes::from_static(include_bytes!("bad_raw_metadata.bin")); let metadata_length = metadata_buffer.len(); let mut reader = std::io::Cursor::new(&metadata_buffer); - let mut loader = MetadataLoader::load(&mut reader, metadata_length, None) - .await - .unwrap(); - loader.load_page_index(false, false).await.unwrap(); - loader.load_page_index(false, true).await.unwrap(); + let mut loader = ParquetMetaDataReader::new(); + loader.try_load(&mut reader, metadata_length).await.unwrap(); + loader = loader.with_page_indexes(false); + loader.load_page_index(&mut reader).await.unwrap(); - let err = loader.load_page_index(true, false).await.unwrap_err(); + loader = loader.with_offset_indexes(true); + loader.load_page_index(&mut reader).await.unwrap(); + + loader = loader.with_column_indexes(true); + let err = loader.load_page_index(&mut reader).await.unwrap_err(); assert_eq!( err.to_string(), From cb1498a7b53e6521879ded8ed215e1e88be997d5 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 27 Sep 2024 10:08:29 -0700 Subject: [PATCH 2/6] change signature of the load functions --- parquet/src/file/metadata/reader.rs | 82 +++++++++++++++-------------- parquet/src/file/metadata/writer.rs | 2 +- 2 files changed, 43 insertions(+), 41 deletions(-) diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 88f53d02f3f4..bd9f27319e2c 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -299,11 +299,14 @@ impl ParquetMetaDataReader { /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches /// performed by this function. #[cfg(feature = "async")] - pub async fn load_and_finish( + pub async fn load_and_finish( mut self, - fetch: F, + fetch: &mut F, file_size: usize, - ) -> Result { + ) -> Result + where + for<'a> &'a mut F: MetadataFetch, + { self.try_load(fetch, file_size).await?; self.finish() } @@ -314,13 +317,12 @@ impl ParquetMetaDataReader { /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches /// performed by this function. #[cfg(feature = "async")] - pub async fn try_load( - &mut self, - mut fetch: F, - file_size: usize, - ) -> Result<()> { + pub async fn try_load(&mut self, fetch: &mut F, file_size: usize) -> Result<()> + where + for<'a> &'a mut F: MetadataFetch, + { let (metadata, remainder) = - Self::load_metadata(&mut fetch, file_size, self.get_prefetch_size()).await?; + Self::load_metadata(fetch, file_size, self.get_prefetch_size()).await?; self.metadata = Some(metadata); @@ -335,16 +337,22 @@ impl ParquetMetaDataReader { /// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already /// been obtained. See [`Self::new_with_metadata()`]. #[cfg(feature = "async")] - pub async fn load_page_index(&mut self, fetch: F) -> Result<()> { + pub async fn load_page_index(&mut self, fetch: &mut F) -> Result<()> + where + for<'a> &'a mut F: MetadataFetch, + { self.load_page_index_with_remainder(fetch, None).await } #[cfg(feature = "async")] - async fn load_page_index_with_remainder( + async fn load_page_index_with_remainder( &mut self, - mut fetch: F, + mut fetch: &mut F, remainder: Option<(usize, Bytes)>, - ) -> Result<()> { + ) -> Result<()> + where + for<'a> &'a mut F: MetadataFetch, + { if self.metadata.is_none() { return Err(general_err!("Footer metadata is not present")); } @@ -499,11 +507,14 @@ impl ParquetMetaDataReader { } #[cfg(feature = "async")] - async fn load_metadata( - fetch: &mut F, + async fn load_metadata( + mut fetch: &mut F, file_size: usize, prefetch: usize, - ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> { + ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> + where + for<'a> &'a mut F: MetadataFetch, + { if file_size < FOOTER_SIZE { return Err(eof_err!("file size of {} is less than footer", file_size)); } @@ -841,7 +852,7 @@ mod async_tests { struct MetadataFetchFn(F); - impl MetadataFetch for MetadataFetchFn + impl<'a, F, Fut> MetadataFetch for &'a mut MetadataFetchFn where F: FnMut(Range) -> Fut + Send, Fut: Future> + Send, @@ -870,14 +881,14 @@ mod async_tests { let expected = expected.file_metadata().schema(); let fetch_count = AtomicUsize::new(0); - let mut fetch = |range| { + let fetch = |range| { fetch_count.fetch_add(1, Ordering::SeqCst); futures::future::ready(read_range(&mut file, range)) }; - let input = MetadataFetchFn(&mut fetch); + let mut f = MetadataFetchFn(fetch); let actual = ParquetMetaDataReader::new() - .load_and_finish(input, len) + .load_and_finish(&mut f, len) .await .unwrap(); assert_eq!(actual.file_metadata().schema(), expected); @@ -885,10 +896,9 @@ mod async_tests { // Metadata hint too small - below footer size fetch_count.store(0, Ordering::SeqCst); - let input = MetadataFetchFn(&mut fetch); let actual = ParquetMetaDataReader::new() .with_prefetch_hint(Some(7)) - .load_and_finish(input, len) + .load_and_finish(&mut f, len) .await .unwrap(); assert_eq!(actual.file_metadata().schema(), expected); @@ -896,10 +906,9 @@ mod async_tests { // Metadata hint too small fetch_count.store(0, Ordering::SeqCst); - let input = MetadataFetchFn(&mut fetch); let actual = ParquetMetaDataReader::new() .with_prefetch_hint(Some(10)) - .load_and_finish(input, len) + .load_and_finish(&mut f, len) .await .unwrap(); assert_eq!(actual.file_metadata().schema(), expected); @@ -907,10 +916,9 @@ mod async_tests { // Metadata hint too large fetch_count.store(0, Ordering::SeqCst); - let input = MetadataFetchFn(&mut fetch); let actual = ParquetMetaDataReader::new() .with_prefetch_hint(Some(500)) - .load_and_finish(input, len) + .load_and_finish(&mut f, len) .await .unwrap(); assert_eq!(actual.file_metadata().schema(), expected); @@ -918,26 +926,23 @@ mod async_tests { // Metadata hint exactly correct fetch_count.store(0, Ordering::SeqCst); - let input = MetadataFetchFn(&mut fetch); let actual = ParquetMetaDataReader::new() .with_prefetch_hint(Some(428)) - .load_and_finish(input, len) + .load_and_finish(&mut f, len) .await .unwrap(); assert_eq!(actual.file_metadata().schema(), expected); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); - let input = MetadataFetchFn(&mut fetch); let err = ParquetMetaDataReader::new() - .load_and_finish(input, 4) + .load_and_finish(&mut f, 4) .await .unwrap_err() .to_string(); assert_eq!(err, "EOF: file size of 4 is less than footer"); - let input = MetadataFetchFn(&mut fetch); let err = ParquetMetaDataReader::new() - .load_and_finish(input, 20) + .load_and_finish(&mut f, 20) .await .unwrap_err() .to_string(); @@ -954,42 +959,39 @@ mod async_tests { futures::future::ready(read_range(&mut file, range)) }; - let f = MetadataFetchFn(&mut fetch); + let mut f = MetadataFetchFn(&mut fetch); let mut loader = ParquetMetaDataReader::new().with_page_indexes(true); - loader.try_load(f, len).await.unwrap(); + loader.try_load(&mut f, len).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 3); let metadata = loader.finish().unwrap(); assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); // Prefetch just footer exactly fetch_count.store(0, Ordering::SeqCst); - let f = MetadataFetchFn(&mut fetch); let mut loader = ParquetMetaDataReader::new() .with_page_indexes(true) .with_prefetch_hint(Some(1729)); - loader.try_load(f, len).await.unwrap(); + loader.try_load(&mut f, len).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 2); let metadata = loader.finish().unwrap(); assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); // Prefetch more than footer but not enough fetch_count.store(0, Ordering::SeqCst); - let f = MetadataFetchFn(&mut fetch); let mut loader = ParquetMetaDataReader::new() .with_page_indexes(true) .with_prefetch_hint(Some(130649)); - loader.try_load(f, len).await.unwrap(); + loader.try_load(&mut f, len).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 2); let metadata = loader.finish().unwrap(); assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); // Prefetch exactly enough fetch_count.store(0, Ordering::SeqCst); - let f = MetadataFetchFn(&mut fetch); let metadata = ParquetMetaDataReader::new() .with_page_indexes(true) .with_prefetch_hint(Some(130650)) - .load_and_finish(f, len) + .load_and_finish(&mut f, len) .await .unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); diff --git a/parquet/src/file/metadata/writer.rs b/parquet/src/file/metadata/writer.rs index 44328c635fed..0bd6c8a6d173 100644 --- a/parquet/src/file/metadata/writer.rs +++ b/parquet/src/file/metadata/writer.rs @@ -552,7 +552,7 @@ mod tests { } } - impl MetadataFetch for &mut MaskedBytes { + impl<'a> MetadataFetch for &'a mut MaskedBytes { fn fetch(&mut self, range: Range) -> BoxFuture<'_, ParquetResult> { let inner_range = self.inner_range.clone(); println!("inner_range: {:?}", inner_range); From 5f077231da322606483676adc876149028d12e39 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 27 Sep 2024 10:18:03 -0700 Subject: [PATCH 3/6] fix up fetch_parquet_metadata --- parquet/src/arrow/async_reader/metadata.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index b7fac6fe7c05..b54e463248ec 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -213,6 +213,16 @@ where } } +impl<'a, F, Fut> MetadataFetch for &'a mut MetadataFetchFn +where + F: FnMut(Range) -> Fut + Send, + Fut: Future> + Send, +{ + fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + async move { self.0(range).await }.boxed() + } +} + /// Fetches parquet metadata /// /// Parameters: @@ -239,10 +249,10 @@ where F: FnMut(Range) -> Fut + Send, Fut: Future> + Send, { - let fetch = MetadataFetchFn(fetch); + let mut fetch = MetadataFetchFn(fetch); ParquetMetaDataReader::new() .with_prefetch_hint(prefetch) - .load_and_finish(fetch, file_size) + .load_and_finish(&mut fetch, file_size) .await } From 7e669cc4bd7af7bdb029d58b2b2c6aa8db63833f Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 27 Sep 2024 10:20:35 -0700 Subject: [PATCH 4/6] can now use self.meta.size directly --- parquet/src/arrow/async_reader/store.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index e6b47856ebe8..f98a2fa309bc 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -124,12 +124,11 @@ impl AsyncFileReader for ParquetObjectReader { fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { Box::pin(async move { - let file_size = self.meta.size; let metadata = ParquetMetaDataReader::new() .with_column_indexes(self.preload_column_index) .with_offset_indexes(self.preload_offset_index) .with_prefetch_hint(self.metadata_size_hint) - .load_and_finish(self, file_size) + .load_and_finish(self, self.meta.size) .await?; Ok(Arc::new(metadata)) }) From 8068a31e3536730d8606f82a4151d19d817f0869 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Sat, 28 Sep 2024 12:04:16 -0700 Subject: [PATCH 5/6] revert changes to load API --- parquet/src/arrow/async_reader/metadata.rs | 14 +------ parquet/src/arrow/async_reader/store.rs | 3 +- parquet/src/file/metadata/reader.rs | 43 ++++++++-------------- 3 files changed, 20 insertions(+), 40 deletions(-) diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index b54e463248ec..b7fac6fe7c05 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -213,16 +213,6 @@ where } } -impl<'a, F, Fut> MetadataFetch for &'a mut MetadataFetchFn -where - F: FnMut(Range) -> Fut + Send, - Fut: Future> + Send, -{ - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { - async move { self.0(range).await }.boxed() - } -} - /// Fetches parquet metadata /// /// Parameters: @@ -249,10 +239,10 @@ where F: FnMut(Range) -> Fut + Send, Fut: Future> + Send, { - let mut fetch = MetadataFetchFn(fetch); + let fetch = MetadataFetchFn(fetch); ParquetMetaDataReader::new() .with_prefetch_hint(prefetch) - .load_and_finish(&mut fetch, file_size) + .load_and_finish(fetch, file_size) .await } diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index f98a2fa309bc..e6b47856ebe8 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -124,11 +124,12 @@ impl AsyncFileReader for ParquetObjectReader { fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { Box::pin(async move { + let file_size = self.meta.size; let metadata = ParquetMetaDataReader::new() .with_column_indexes(self.preload_column_index) .with_offset_indexes(self.preload_offset_index) .with_prefetch_hint(self.metadata_size_hint) - .load_and_finish(self, self.meta.size) + .load_and_finish(self, file_size) .await?; Ok(Arc::new(metadata)) }) diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index bd9f27319e2c..3fd2bd76f6b8 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -299,14 +299,11 @@ impl ParquetMetaDataReader { /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches /// performed by this function. #[cfg(feature = "async")] - pub async fn load_and_finish( + pub async fn load_and_finish( mut self, - fetch: &mut F, + fetch: F, file_size: usize, - ) -> Result - where - for<'a> &'a mut F: MetadataFetch, - { + ) -> Result { self.try_load(fetch, file_size).await?; self.finish() } @@ -317,12 +314,13 @@ impl ParquetMetaDataReader { /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches /// performed by this function. #[cfg(feature = "async")] - pub async fn try_load(&mut self, fetch: &mut F, file_size: usize) -> Result<()> - where - for<'a> &'a mut F: MetadataFetch, - { + pub async fn try_load( + &mut self, + mut fetch: F, + file_size: usize, + ) -> Result<()> { let (metadata, remainder) = - Self::load_metadata(fetch, file_size, self.get_prefetch_size()).await?; + Self::load_metadata(&mut fetch, file_size, self.get_prefetch_size()).await?; self.metadata = Some(metadata); @@ -337,22 +335,16 @@ impl ParquetMetaDataReader { /// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already /// been obtained. See [`Self::new_with_metadata()`]. #[cfg(feature = "async")] - pub async fn load_page_index(&mut self, fetch: &mut F) -> Result<()> - where - for<'a> &'a mut F: MetadataFetch, - { + pub async fn load_page_index(&mut self, fetch: F) -> Result<()> { self.load_page_index_with_remainder(fetch, None).await } #[cfg(feature = "async")] - async fn load_page_index_with_remainder( + async fn load_page_index_with_remainder( &mut self, - mut fetch: &mut F, + mut fetch: F, remainder: Option<(usize, Bytes)>, - ) -> Result<()> - where - for<'a> &'a mut F: MetadataFetch, - { + ) -> Result<()> { if self.metadata.is_none() { return Err(general_err!("Footer metadata is not present")); } @@ -507,14 +499,11 @@ impl ParquetMetaDataReader { } #[cfg(feature = "async")] - async fn load_metadata( - mut fetch: &mut F, + async fn load_metadata( + fetch: &mut F, file_size: usize, prefetch: usize, - ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> - where - for<'a> &'a mut F: MetadataFetch, - { + ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> { if file_size < FOOTER_SIZE { return Err(eof_err!("file size of {} is less than footer", file_size)); } From 388cf5a29f40013b71ba5168b4e81cc66fe30feb Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Sat, 28 Sep 2024 12:18:19 -0700 Subject: [PATCH 6/6] revert change to test code --- parquet/src/file/metadata/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/file/metadata/writer.rs b/parquet/src/file/metadata/writer.rs index 0bd6c8a6d173..44328c635fed 100644 --- a/parquet/src/file/metadata/writer.rs +++ b/parquet/src/file/metadata/writer.rs @@ -552,7 +552,7 @@ mod tests { } } - impl<'a> MetadataFetch for &'a mut MaskedBytes { + impl MetadataFetch for &mut MaskedBytes { fn fetch(&mut self, range: Range) -> BoxFuture<'_, ParquetResult> { let inner_range = self.inner_range.clone(); println!("inner_range: {:?}", inner_range);