From 6831e0c55b1e6010ba991e4cf94fe0d5bdfd823f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 11 Nov 2022 08:45:12 -0500 Subject: [PATCH] Remove unecessary level of indent, old workaround --- .../core/tests/parquet/custom_reader.rs | 446 +++++++++--------- 1 file changed, 216 insertions(+), 230 deletions(-) diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index ded5fad022db5..205d55f34bdad 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -1,5 +1,3 @@ -// TODO: Temporary workaround for https://github.com/apache/arrow-rs/issues/2372 (#3081) -#![allow(where_clauses_object_safety)] // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -17,247 +15,235 @@ // specific language governing permissions and limitations // under the License. -#[cfg(test)] -mod tests { - use arrow::array::{ArrayRef, Int64Array, Int8Array, StringArray}; - use arrow::datatypes::{Field, Schema}; - use arrow::record_batch::RecordBatch; - use bytes::Bytes; - use datafusion::assert_batches_sorted_eq; - use datafusion::config::ConfigOptions; - use datafusion::datasource::file_format::parquet::fetch_parquet_metadata; - use datafusion::datasource::listing::PartitionedFile; - use datafusion::datasource::object_store::ObjectStoreUrl; - use datafusion::physical_plan::file_format::{ - FileMeta, FileScanConfig, ParquetExec, ParquetFileMetrics, - ParquetFileReaderFactory, - }; - use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; - use datafusion::physical_plan::{collect, Statistics}; - use datafusion::prelude::SessionContext; - use datafusion_common::DataFusionError; - use futures::future::BoxFuture; - use futures::{FutureExt, TryFutureExt}; - use object_store::memory::InMemory; - use object_store::path::Path; - use object_store::{ObjectMeta, ObjectStore}; - use parquet::arrow::async_reader::AsyncFileReader; - use parquet::arrow::ArrowWriter; - use parquet::errors::ParquetError; - use parquet::file::metadata::ParquetMetaData; - use std::io::Cursor; - use std::ops::Range; - use std::sync::Arc; - use std::time::SystemTime; - - const EXPECTED_USER_DEFINED_METADATA: &str = "some-user-defined-metadata"; - - #[tokio::test] - async fn route_data_access_ops_to_parquet_file_reader_factory() { - let c1: ArrayRef = - Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); - let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); - let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); - - let batch = create_batch(vec![ - ("c1", c1.clone()), - ("c2", c2.clone()), - ("c3", c3.clone()), - ]); - - let file_schema = batch.schema().clone(); - let (in_memory_object_store, parquet_files_meta) = - store_parquet_in_memory(vec![batch]).await; - let file_groups = parquet_files_meta - .into_iter() - .map(|meta| PartitionedFile { - object_meta: meta, - partition_values: vec![], - range: None, - extensions: Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))), - }) - .collect(); - - // prepare the scan - let parquet_exec = ParquetExec::new( - FileScanConfig { - // just any url that doesn't point to in memory object store - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![file_groups], - file_schema, - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), - }, - None, - None, - ) - .with_parquet_file_reader_factory(Arc::new( - InMemoryParquetFileReaderFactory(Arc::clone(&in_memory_object_store)), - )); - - let session_ctx = SessionContext::new(); - - let task_ctx = session_ctx.task_ctx(); - let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap(); +use arrow::array::{ArrayRef, Int64Array, Int8Array, StringArray}; +use arrow::datatypes::{Field, Schema}; +use arrow::record_batch::RecordBatch; +use bytes::Bytes; +use datafusion::assert_batches_sorted_eq; +use datafusion::config::ConfigOptions; +use datafusion::datasource::file_format::parquet::fetch_parquet_metadata; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::physical_plan::file_format::{ + FileMeta, FileScanConfig, ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory, +}; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion::physical_plan::{collect, Statistics}; +use datafusion::prelude::SessionContext; +use datafusion_common::DataFusionError; +use futures::future::BoxFuture; +use futures::{FutureExt, TryFutureExt}; +use object_store::memory::InMemory; +use object_store::path::Path; +use object_store::{ObjectMeta, ObjectStore}; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::ArrowWriter; +use parquet::errors::ParquetError; +use parquet::file::metadata::ParquetMetaData; +use std::io::Cursor; +use std::ops::Range; +use std::sync::Arc; +use std::time::SystemTime; + +const EXPECTED_USER_DEFINED_METADATA: &str = "some-user-defined-metadata"; + +#[tokio::test] +async fn route_data_access_ops_to_parquet_file_reader_factory() { + let c1: ArrayRef = Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + + let batch = create_batch(vec![ + ("c1", c1.clone()), + ("c2", c2.clone()), + ("c3", c3.clone()), + ]); + + let file_schema = batch.schema().clone(); + let (in_memory_object_store, parquet_files_meta) = + store_parquet_in_memory(vec![batch]).await; + let file_groups = parquet_files_meta + .into_iter() + .map(|meta| PartitionedFile { + object_meta: meta, + partition_values: vec![], + range: None, + extensions: Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))), + }) + .collect(); + + // prepare the scan + let parquet_exec = ParquetExec::new( + FileScanConfig { + // just any url that doesn't point to in memory object store + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: vec![file_groups], + file_schema, + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + config_options: ConfigOptions::new().into_shareable(), + }, + None, + None, + ) + .with_parquet_file_reader_factory(Arc::new(InMemoryParquetFileReaderFactory( + Arc::clone(&in_memory_object_store), + ))); + + let session_ctx = SessionContext::new(); + + let task_ctx = session_ctx.task_ctx(); + let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap(); + + let expected = vec![ + "+-----+----+----+", + "| c1 | c2 | c3 |", + "+-----+----+----+", + "| Foo | 1 | 10 |", + "| | 2 | 20 |", + "| bar | | |", + "+-----+----+----+", + ]; + + assert_batches_sorted_eq!(expected, &read); +} - let expected = vec![ - "+-----+----+----+", - "| c1 | c2 | c3 |", - "+-----+----+----+", - "| Foo | 1 | 10 |", - "| | 2 | 20 |", - "| bar | | |", - "+-----+----+----+", - ]; +#[derive(Debug)] +struct InMemoryParquetFileReaderFactory(Arc); - assert_batches_sorted_eq!(expected, &read); +impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory { + fn create_reader( + &self, + partition_index: usize, + file_meta: FileMeta, + metadata_size_hint: Option, + metrics: &ExecutionPlanMetricsSet, + ) -> Result, DataFusionError> { + let metadata = file_meta + .extensions + .as_ref() + .expect("has user defined metadata"); + let metadata = metadata + .downcast_ref::() + .expect("has string metadata"); + + assert_eq!(EXPECTED_USER_DEFINED_METADATA, &metadata[..]); + + let parquet_file_metrics = ParquetFileMetrics::new( + partition_index, + file_meta.location().as_ref(), + metrics, + ); + + Ok(Box::new(ParquetFileReader { + store: Arc::clone(&self.0), + meta: file_meta.object_meta, + metrics: parquet_file_metrics, + metadata_size_hint, + })) } +} - #[derive(Debug)] - struct InMemoryParquetFileReaderFactory(Arc); - - impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory { - fn create_reader( - &self, - partition_index: usize, - file_meta: FileMeta, - metadata_size_hint: Option, - metrics: &ExecutionPlanMetricsSet, - ) -> Result, DataFusionError> { - let metadata = file_meta - .extensions - .as_ref() - .expect("has user defined metadata"); - let metadata = metadata - .downcast_ref::() - .expect("has string metadata"); - - assert_eq!(EXPECTED_USER_DEFINED_METADATA, &metadata[..]); - - let parquet_file_metrics = ParquetFileMetrics::new( - partition_index, - file_meta.location().as_ref(), - metrics, - ); - - Ok(Box::new(ParquetFileReader { - store: Arc::clone(&self.0), - meta: file_meta.object_meta, - metrics: parquet_file_metrics, - metadata_size_hint, - })) - } - } +fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch { + columns.into_iter().fold( + RecordBatch::new_empty(Arc::new(Schema::new(vec![]))), + |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()), + ) +} - fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch { - columns.into_iter().fold( - RecordBatch::new_empty(Arc::new(Schema::new(vec![]))), - |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()), - ) - } +fn add_to_batch(batch: &RecordBatch, field_name: &str, array: ArrayRef) -> RecordBatch { + let mut fields = batch.schema().fields().clone(); + fields.push(Field::new(field_name, array.data_type().clone(), true)); + let schema = Arc::new(Schema::new(fields)); - fn add_to_batch( - batch: &RecordBatch, - field_name: &str, - array: ArrayRef, - ) -> RecordBatch { - let mut fields = batch.schema().fields().clone(); - fields.push(Field::new(field_name, array.data_type().clone(), true)); - let schema = Arc::new(Schema::new(fields)); + let mut columns = batch.columns().to_vec(); + columns.push(array); + RecordBatch::try_new(schema, columns).expect("error; creating record batch") +} - let mut columns = batch.columns().to_vec(); - columns.push(array); - RecordBatch::try_new(schema, columns).expect("error; creating record batch") +async fn store_parquet_in_memory( + batches: Vec, +) -> (Arc, Vec) { + let in_memory = InMemory::new(); + + let parquet_batches: Vec<(ObjectMeta, Bytes)> = batches + .into_iter() + .enumerate() + .map(|(offset, batch)| { + let mut buf = Vec::::with_capacity(32 * 1024); + let mut output = Cursor::new(&mut buf); + + let mut writer = ArrowWriter::try_new(&mut output, batch.schema(), None) + .expect("creating writer"); + + writer.write(&batch).expect("Writing batch"); + writer.close().unwrap(); + + let meta = ObjectMeta { + location: Path::parse(format!("file-{offset}.parquet")) + .expect("creating path"), + last_modified: chrono::DateTime::from(SystemTime::now()), + size: buf.len(), + }; + + (meta, Bytes::from(buf)) + }) + .collect(); + + let mut objects = Vec::with_capacity(parquet_batches.len()); + for (meta, bytes) in parquet_batches { + in_memory + .put(&meta.location, bytes) + .await + .expect("put parquet file into in memory object store"); + objects.push(meta); } - async fn store_parquet_in_memory( - batches: Vec, - ) -> (Arc, Vec) { - let in_memory = InMemory::new(); - - let parquet_batches: Vec<(ObjectMeta, Bytes)> = batches - .into_iter() - .enumerate() - .map(|(offset, batch)| { - let mut buf = Vec::::with_capacity(32 * 1024); - let mut output = Cursor::new(&mut buf); - - let mut writer = ArrowWriter::try_new(&mut output, batch.schema(), None) - .expect("creating writer"); - - writer.write(&batch).expect("Writing batch"); - writer.close().unwrap(); + (Arc::new(in_memory), objects) +} - let meta = ObjectMeta { - location: Path::parse(format!("file-{offset}.parquet")) - .expect("creating path"), - last_modified: chrono::DateTime::from(SystemTime::now()), - size: buf.len(), - }; +/// Implements [`AsyncFileReader`] for a parquet file in object storage +struct ParquetFileReader { + store: Arc, + meta: ObjectMeta, + metrics: ParquetFileMetrics, + metadata_size_hint: Option, +} - (meta, Bytes::from(buf)) +impl AsyncFileReader for ParquetFileReader { + fn get_bytes( + &mut self, + range: Range, + ) -> BoxFuture<'_, parquet::errors::Result> { + self.metrics.bytes_scanned.add(range.end - range.start); + + self.store + .get_range(&self.meta.location, range) + .map_err(|e| { + ParquetError::General(format!("AsyncChunkReader::get_bytes error: {}", e)) }) - .collect(); - - let mut objects = Vec::with_capacity(parquet_batches.len()); - for (meta, bytes) in parquet_batches { - in_memory - .put(&meta.location, bytes) - .await - .expect("put parquet file into in memory object store"); - objects.push(meta); - } - - (Arc::new(in_memory), objects) - } - - /// Implements [`AsyncFileReader`] for a parquet file in object storage - struct ParquetFileReader { - store: Arc, - meta: ObjectMeta, - metrics: ParquetFileMetrics, - metadata_size_hint: Option, + .boxed() } - impl AsyncFileReader for ParquetFileReader { - fn get_bytes( - &mut self, - range: Range, - ) -> BoxFuture<'_, parquet::errors::Result> { - self.metrics.bytes_scanned.add(range.end - range.start); - - self.store - .get_range(&self.meta.location, range) - .map_err(|e| { - ParquetError::General(format!( - "AsyncChunkReader::get_bytes error: {}", - e - )) - }) - .boxed() - } - - fn get_metadata( - &mut self, - ) -> BoxFuture<'_, parquet::errors::Result>> { - Box::pin(async move { - let metadata = fetch_parquet_metadata( - self.store.as_ref(), - &self.meta, - self.metadata_size_hint, - ) - .await - .map_err(|e| { - ParquetError::General(format!( - "AsyncChunkReader::get_metadata error: {}", - e - )) - })?; - Ok(Arc::new(metadata)) - }) - } + fn get_metadata( + &mut self, + ) -> BoxFuture<'_, parquet::errors::Result>> { + Box::pin(async move { + let metadata = fetch_parquet_metadata( + self.store.as_ref(), + &self.meta, + self.metadata_size_hint, + ) + .await + .map_err(|e| { + ParquetError::General(format!( + "AsyncChunkReader::get_metadata error: {}", + e + )) + })?; + Ok(Arc::new(metadata)) + }) } }