From 0ca234b034987f41a0e465749afbbccb290decb3 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Mon, 23 Sep 2024 14:43:27 +0200 Subject: [PATCH 1/2] parquet: Add support for user-provided metadata loaders This allows users to, for example, cache the Page Index so it does not need to be parsed every time we open the file. --- .../examples/advanced_parquet_index.rs | 24 +++++++-- .../datasource/physical_plan/parquet/mod.rs | 4 +- .../physical_plan/parquet/opener.rs | 14 ++--- .../physical_plan/parquet/reader.rs | 51 +++++++++++++++++-- .../physical_plan/parquet/row_group_filter.rs | 4 +- .../core/tests/parquet/custom_reader.rs | 23 +++++++-- 6 files changed, 97 insertions(+), 23 deletions(-) diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index f6860bb5b87a..ab3e9bb7a304 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -24,13 +24,15 @@ use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::physical_plan::parquet::{ ParquetAccessPlan, ParquetExecBuilder, }; -use datafusion::datasource::physical_plan::{ - parquet::ParquetFileReaderFactory, FileMeta, FileScanConfig, +use datafusion::datasource::physical_plan::parquet::{ + ParquetFileReader, ParquetFileReaderFactory, }; +use datafusion::datasource::physical_plan::{FileMeta, FileScanConfig}; use datafusion::datasource::TableProvider; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::parquet::arrow::arrow_reader::{ - ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector, + ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, + RowSelection, RowSelector, }; use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; use datafusion::parquet::arrow::ArrowWriter; @@ -552,7 +554,7 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { file_meta: FileMeta, metadata_size_hint: Option, _metrics: &ExecutionPlanMetricsSet, - ) -> Result> { + ) -> Result> { // for this example we ignore the partition index and metrics // but in a real system you would likely use them to report details on // the performance of the reader. @@ -621,6 +623,20 @@ impl AsyncFileReader for ParquetReaderWithCache { } } +impl ParquetFileReader for ParquetReaderWithCache { + fn upcast(self: Box) -> Box { + Box::new(*self) + } + + fn load_metadata( + &mut self, + options: ArrowReaderOptions, + ) -> BoxFuture<'_, datafusion::parquet::errors::Result> { + // This could be cached too, if CPU time is a concern in addition to storage latency + Box::pin(ArrowReaderMetadata::load_async(self, options)) + } +} + /// Creates a new parquet file at the specified path. /// /// * id: Int32 diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index ce679bfa76c5..ac561307260f 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -60,7 +60,9 @@ use crate::datasource::schema_adapter::{ pub use access_plan::{ParquetAccessPlan, RowGroupAccess}; pub use metrics::ParquetFileMetrics; use opener::ParquetOpener; -pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory}; +pub use reader::{ + DefaultParquetFileReaderFactory, ParquetFileReader, ParquetFileReaderFactory, +}; pub use row_filter::can_expr_be_pushed_down_with_schemas; pub use writer::plan_to_parquet; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs index 9880c30ddb6b..6597bb61475b 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -21,7 +21,7 @@ use crate::datasource::file_format::coerce_file_schema_to_view_type; use crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter; use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter; use crate::datasource::physical_plan::parquet::{ - row_filter, should_enable_page_index, ParquetAccessPlan, + row_filter, should_enable_page_index, ParquetAccessPlan, ParquetFileReader, }; use crate::datasource::physical_plan::{ FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics, ParquetFileReaderFactory, @@ -35,7 +35,6 @@ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{StreamExt, TryStreamExt}; use log::debug; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; -use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use std::sync::Arc; @@ -87,7 +86,7 @@ impl FileOpener for ParquetOpener { let file_metrics = ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics); - let mut reader: Box = + let mut reader: Box = self.parquet_file_reader_factory.create_reader( self.partition_index, file_meta, @@ -118,8 +117,7 @@ impl FileOpener for ParquetOpener { Ok(Box::pin(async move { let options = ArrowReaderOptions::new().with_page_index(enable_page_index); - let metadata = - ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?; + let metadata = reader.load_metadata(options.clone()).await?; let mut schema = metadata.schema().clone(); // read with view types if let Some(merged) = coerce_file_schema_to_view_type(&table_schema, &schema) @@ -133,8 +131,10 @@ impl FileOpener for ParquetOpener { let metadata = ArrowReaderMetadata::try_new(metadata.metadata().clone(), options)?; - let mut builder = - ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata); + let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata( + reader.upcast(), + metadata, + ); let file_schema = builder.schema().clone(); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/reader.rs b/datafusion/core/src/datasource/physical_plan/parquet/reader.rs index 8a4ba136fc96..390879f35a8d 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/reader.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/reader.rs @@ -23,6 +23,7 @@ use bytes::Bytes; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::future::BoxFuture; use object_store::ObjectStore; +use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; use parquet::file::metadata::ParquetMetaData; use std::fmt::Debug; @@ -57,9 +58,49 @@ pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static { file_meta: FileMeta, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, - ) -> datafusion_common::Result>; + ) -> datafusion_common::Result>; } +/// [`AsyncFileReader`] augmented with a method to customize how file metadata is loaded. +pub trait ParquetFileReader: AsyncFileReader + Send + 'static { + /// Returns a [`AsyncFileReader`] trait object + /// + /// This can usually be implemented as `Box::new(*self)` + fn upcast(self: Box) -> Box; + + /// Parses the file's metadata + /// + /// The default implementation is: + /// + /// ``` + /// Box::pin(ArrowReaderMetadata::load_async(self, options)) + /// ``` + fn load_metadata( + &mut self, + options: ArrowReaderOptions, + ) -> BoxFuture<'_, parquet::errors::Result>; +} + +macro_rules! impl_ParquetFileReader { + ($type:ty) => { + impl ParquetFileReader for $type { + fn upcast(self: Box) -> Box { + Box::new(*self) + } + + fn load_metadata( + &mut self, + options: ArrowReaderOptions, + ) -> BoxFuture<'_, parquet::errors::Result> { + Box::pin(ArrowReaderMetadata::load_async(self, options)) + } + } + }; +} + +impl_ParquetFileReader!(ParquetObjectReader); +impl_ParquetFileReader!(DefaultParquetFileReader); + /// Default implementation of [`ParquetFileReaderFactory`] /// /// This implementation: @@ -86,12 +127,12 @@ impl DefaultParquetFileReaderFactory { /// This implementation does not coalesce I/O operations or cache bytes. Such /// optimizations can be done either at the object store level or by providing a /// custom implementation of [`ParquetFileReaderFactory`]. -pub(crate) struct ParquetFileReader { +pub(crate) struct DefaultParquetFileReader { pub file_metrics: ParquetFileMetrics, pub inner: ParquetObjectReader, } -impl AsyncFileReader for ParquetFileReader { +impl AsyncFileReader for DefaultParquetFileReader { fn get_bytes( &mut self, range: Range, @@ -126,7 +167,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { file_meta: FileMeta, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, - ) -> datafusion_common::Result> { + ) -> datafusion_common::Result> { let file_metrics = ParquetFileMetrics::new( partition_index, file_meta.location().as_ref(), @@ -139,7 +180,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { inner = inner.with_footer_size_hint(hint) }; - Ok(Box::new(ParquetFileReader { + Ok(Box::new(DefaultParquetFileReader { inner, file_metrics, })) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs index 4cdcb005018e..eb5cb08e35a6 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs @@ -416,7 +416,7 @@ mod tests { use std::sync::Arc; use super::*; - use crate::datasource::physical_plan::parquet::reader::ParquetFileReader; + use crate::datasource::physical_plan::parquet::reader::DefaultParquetFileReader; use crate::physical_plan::metrics::ExecutionPlanMetricsSet; use arrow::datatypes::DataType::Decimal128; @@ -1516,7 +1516,7 @@ mod tests { let metrics = ExecutionPlanMetricsSet::new(); let file_metrics = ParquetFileMetrics::new(0, object_meta.location.as_ref(), &metrics); - let reader = ParquetFileReader { + let reader = DefaultParquetFileReader { inner: ParquetObjectReader::new(Arc::new(in_memory), object_meta), file_metrics: file_metrics.clone(), }; diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 7c1e199ceb95..4d8529ec3a1d 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -27,6 +27,7 @@ use datafusion::assert_batches_sorted_eq; use datafusion::datasource::file_format::parquet::fetch_parquet_metadata; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::datasource::physical_plan::parquet::ParquetFileReader; use datafusion::datasource::physical_plan::{ FileMeta, FileScanConfig, ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory, }; @@ -41,6 +42,7 @@ use futures::{FutureExt, TryFutureExt}; use object_store::memory::InMemory; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; +use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::ArrowWriter; use parquet::errors::ParquetError; @@ -115,7 +117,7 @@ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory { file_meta: FileMeta, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, - ) -> Result> { + ) -> Result> { let metadata = file_meta .extensions .as_ref() @@ -132,7 +134,7 @@ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory { metrics, ); - Ok(Box::new(ParquetFileReader { + Ok(Box::new(CustomParquetFileReader { store: Arc::clone(&self.0), meta: file_meta.object_meta, metrics: parquet_file_metrics, @@ -202,14 +204,14 @@ async fn store_parquet_in_memory( } /// Implements [`AsyncFileReader`] for a parquet file in object storage -struct ParquetFileReader { +struct CustomParquetFileReader { store: Arc, meta: ObjectMeta, metrics: ParquetFileMetrics, metadata_size_hint: Option, } -impl AsyncFileReader for ParquetFileReader { +impl AsyncFileReader for CustomParquetFileReader { fn get_bytes( &mut self, range: Range, @@ -243,3 +245,16 @@ impl AsyncFileReader for ParquetFileReader { }) } } + +impl ParquetFileReader for CustomParquetFileReader { + fn upcast(self: Box) -> Box { + Box::new(*self) + } + + fn load_metadata( + &mut self, + options: ArrowReaderOptions, + ) -> BoxFuture<'_, parquet::errors::Result> { + Box::pin(ArrowReaderMetadata::load_async(self, options)) + } +} From 84ff21d9fd3cb1344b0d1959498cb998ba137479 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Mon, 23 Sep 2024 19:20:55 +0200 Subject: [PATCH 2/2] Fix accidental doctest --- datafusion/core/src/datasource/physical_plan/parquet/reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/reader.rs b/datafusion/core/src/datasource/physical_plan/parquet/reader.rs index 390879f35a8d..136f39193c13 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/reader.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/reader.rs @@ -72,7 +72,7 @@ pub trait ParquetFileReader: AsyncFileReader + Send + 'static { /// /// The default implementation is: /// - /// ``` + /// ```ignore /// Box::pin(ArrowReaderMetadata::load_async(self, options)) /// ``` fn load_metadata(