From 4b9021139baba82e80ee1ac6d0becf1e15c69e63 Mon Sep 17 00:00:00 2001 From: Michael Maletich Date: Sun, 5 May 2024 13:15:13 -0500 Subject: [PATCH 1/3] feat: Expose Parquet Schema Adapter Allow users of the Datafusion Parquet physical executor to define how to map parquet schema to the table schema. This can be useful as there can be layers on top of parquet like Delta or Iceberg which may also define the schema and how the schema should evolve. --- .../src/datasource/file_format/parquet.rs | 6 +- .../core/src/datasource/physical_plan/mod.rs | 55 +++--- .../datasource/physical_plan/parquet/mod.rs | 31 ++- .../physical_plan/parquet/schema_adapter.rs | 69 +++++++ datafusion/core/tests/parquet/mod.rs | 1 + .../core/tests/parquet/schema_adapter.rs | 178 ++++++++++++++++++ 6 files changed, 303 insertions(+), 37 deletions(-) create mode 100644 datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs create mode 100644 datafusion/core/tests/parquet/schema_adapter.rs diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index fa379eb5b445..2e56b9a46a7b 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -31,7 +31,8 @@ use crate::arrow::array::{ use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef}; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::physical_plan::{ - FileGroupDisplay, FileSinkConfig, ParquetExec, SchemaAdapter, + DefaultSchemaAdapterFactory, FileGroupDisplay, FileSinkConfig, ParquetExec, + SchemaAdapterFactory, }; use crate::datasource::statistics::{create_max_min_accs, get_col_stats}; use crate::error::Result; @@ -474,7 +475,8 @@ async fn fetch_statistics( let mut null_counts = vec![Precision::Exact(0); num_fields]; let mut has_statistics = false; - let schema_adapter = SchemaAdapter::new(table_schema.clone()); + let schema_adapter = + DefaultSchemaAdapterFactory::default().create(table_schema.clone()); let (mut max_values, mut min_values) = create_max_min_accs(&table_schema); diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index c450774572db..607b60bf95be 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -31,7 +31,10 @@ mod statistics; pub(crate) use self::csv::plan_to_csv; pub(crate) use self::json::plan_to_json; #[cfg(feature = "parquet")] -pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory}; +pub use self::parquet::{ + ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory, SchemaAdapter, + SchemaAdapterFactory, SchemaMapper, +}; pub use arrow_file::ArrowExec; pub use avro::AvroExec; @@ -241,39 +244,27 @@ where Ok(()) } -/// A utility which can adapt file-level record batches to a table schema which may have a schema -/// obtained from merging multiple file-level schemas. -/// -/// This is useful for enabling schema evolution in partitioned datasets. -/// -/// This has to be done in two stages. -/// -/// 1. Before reading the file, we have to map projected column indexes from the table schema to -/// the file schema. -/// -/// 2. After reading a record batch we need to map the read columns back to the expected columns -/// indexes and insert null-valued columns wherever the file schema was missing a colum present -/// in the table schema. +#[derive(Clone, Debug, Default)] +pub(crate) struct DefaultSchemaAdapterFactory {} + +impl SchemaAdapterFactory for DefaultSchemaAdapterFactory { + fn create(&self, table_schema: SchemaRef) -> Box { + Box::new(DefaultSchemaAdapter { table_schema }) + } +} + #[derive(Clone, Debug)] -pub(crate) struct SchemaAdapter { +pub(crate) struct DefaultSchemaAdapter { /// Schema for the table table_schema: SchemaRef, } -impl SchemaAdapter { - pub(crate) fn new(table_schema: SchemaRef) -> SchemaAdapter { - Self { table_schema } - } - +impl SchemaAdapter for DefaultSchemaAdapter { /// Map a column index in the table schema to a column index in a particular /// file schema /// /// Panics if index is not in range for the table schema - pub(crate) fn map_column_index( - &self, - index: usize, - file_schema: &Schema, - ) -> Option { + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { let field = self.table_schema.field(index); Some(file_schema.fields.find(field.name())?.0) } @@ -286,10 +277,10 @@ impl SchemaAdapter { /// /// Returns a [`SchemaMapping`] that can be applied to the output batch /// along with an ordered list of columns to project from the file - pub fn map_schema( + fn map_schema( &self, file_schema: &Schema, - ) -> Result<(SchemaMapping, Vec)> { + ) -> Result<(Arc, Vec)> { let mut projection = Vec::with_capacity(file_schema.fields().len()); let mut field_mappings = vec![None; self.table_schema.fields().len()]; @@ -315,10 +306,10 @@ impl SchemaAdapter { } Ok(( - SchemaMapping { + Arc::new(SchemaMapping { table_schema: self.table_schema.clone(), field_mappings, - }, + }), projection, )) } @@ -334,7 +325,7 @@ pub struct SchemaMapping { field_mappings: Vec>, } -impl SchemaMapping { +impl SchemaMapper for SchemaMapping { /// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions. fn map_batch(&self, batch: RecordBatch) -> Result { let batch_rows = batch.num_rows(); @@ -636,7 +627,7 @@ mod tests { Field::new("c3", DataType::Float64, true), ])); - let adapter = SchemaAdapter::new(table_schema.clone()); + let adapter = DefaultSchemaAdapterFactory::default().create(table_schema.clone()); let file_schema = Schema::new(vec![ Field::new("c1", DataType::Utf8, true), @@ -693,7 +684,7 @@ mod tests { let indices = vec![1, 2, 4]; let schema = SchemaRef::from(table_schema.project(&indices).unwrap()); - let adapter = SchemaAdapter::new(schema); + let adapter = DefaultSchemaAdapterFactory::default().create(schema); let (mapping, projection) = adapter.map_schema(&file_schema).unwrap(); let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index a48f510adb56..7509d08ad88a 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -27,8 +27,8 @@ use crate::datasource::physical_plan::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; use crate::datasource::physical_plan::{ - parquet::page_filter::PagePruningPredicate, DisplayAs, FileGroupPartitioner, - FileMeta, FileScanConfig, SchemaAdapter, + parquet::page_filter::PagePruningPredicate, DefaultSchemaAdapterFactory, DisplayAs, + FileGroupPartitioner, FileMeta, FileScanConfig, }; use crate::{ config::{ConfigOptions, TableParquetOptions}, @@ -67,9 +67,11 @@ mod metrics; mod page_filter; mod row_filter; mod row_groups; +mod schema_adapter; mod statistics; pub use metrics::ParquetFileMetrics; +pub use schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] @@ -93,6 +95,8 @@ pub struct ParquetExec { cache: PlanProperties, /// Options for reading Parquet files table_parquet_options: TableParquetOptions, + /// Optional user defined schema adapter + schema_adapter_factory: Option>, } impl ParquetExec { @@ -157,6 +161,7 @@ impl ParquetExec { parquet_file_reader_factory: None, cache, table_parquet_options, + schema_adapter_factory: None, } } @@ -195,6 +200,19 @@ impl ParquetExec { self } + /// Optional schema adapter factory. + /// + /// `SchemaAdapterFactory` allows user to specify how fields from the parquet file get mapped to + /// that of the table schema. The default schema adapter uses arrow's cast library to map + /// the parquet fields to the table schema. + pub fn with_schema_adapter_factory( + mut self, + schema_adapter_factory: Arc, + ) -> Self { + self.schema_adapter_factory = Some(schema_adapter_factory); + self + } + /// If true, any filter [`Expr`]s on the scan will converted to a /// [`RowFilter`](parquet::arrow::arrow_reader::RowFilter) in the /// `ParquetRecordBatchStream`. These filters are applied by the @@ -402,6 +420,11 @@ impl ExecutionPlan for ParquetExec { }) })?; + let schema_adapter_factory = self + .schema_adapter_factory + .clone() + .unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory::default())); + let opener = ParquetOpener { partition_index, projection: Arc::from(projection), @@ -418,6 +441,7 @@ impl ExecutionPlan for ParquetExec { reorder_filters: self.reorder_filters(), enable_page_index: self.enable_page_index(), enable_bloom_filter: self.bloom_filter_on_read(), + schema_adapter_factory, }; let stream = @@ -452,6 +476,7 @@ struct ParquetOpener { reorder_filters: bool, enable_page_index: bool, enable_bloom_filter: bool, + schema_adapter_factory: Arc, } impl FileOpener for ParquetOpener { @@ -475,7 +500,7 @@ impl FileOpener for ParquetOpener { let batch_size = self.batch_size; let projection = self.projection.clone(); let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?); - let schema_adapter = SchemaAdapter::new(projected_schema); + let schema_adapter = self.schema_adapter_factory.create(projected_schema); let predicate = self.predicate.clone(); let pruning_predicate = self.pruning_predicate.clone(); let page_pruning_predicate = self.page_pruning_predicate.clone(); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs b/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs new file mode 100644 index 000000000000..193e5161a398 --- /dev/null +++ b/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::RecordBatch; +use arrow_schema::{Schema, SchemaRef}; +use std::fmt::Debug; +use std::sync::Arc; + +/// Factory of schema adapters. +/// +/// Provides means to implement custom schema adaptation. +pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static { + /// Provides `SchemaAdapter` for the ParquetExec. + fn create(&self, schema: SchemaRef) -> Box; +} + +/// A utility which can adapt file-level record batches to a table schema which may have a schema +/// obtained from merging multiple file-level schemas. +/// +/// This is useful for enabling schema evolution in partitioned datasets. +/// +/// This has to be done in two stages. +/// +/// 1. Before reading the file, we have to map projected column indexes from the table schema to +/// the file schema. +/// +/// 2. After reading a record batch we need to map the read columns back to the expected columns +/// indexes and insert null-valued columns wherever the file schema was missing a colum present +/// in the table schema. +pub trait SchemaAdapter: Send + Sync { + /// Map a column index in the table schema to a column index in a particular + /// file schema + /// + /// Panics if index is not in range for the table schema + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option; + + /// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema. + /// + /// If the provided `file_schema` contains columns of a different type to the expected + /// `table_schema`, the method will attempt to cast the array data from the file schema + /// to the table schema where possible. + /// + /// Returns a [`SchemaMapper`] that can be applied to the output batch + /// along with an ordered list of columns to project from the file + fn map_schema( + &self, + file_schema: &Schema, + ) -> datafusion_common::Result<(Arc, Vec)>; +} + +/// Transforms a RecordBatch from Parquet to a RecordBatch that meets the table schema. +pub trait SchemaMapper: Send + Sync { + /// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions. + fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result; +} diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index bb938e3af493..42d8fe0d8521 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -49,6 +49,7 @@ mod page_pruning; mod row_group_pruning; mod schema; mod schema_coercion; +mod schema_adapter; #[cfg(test)] #[ctor::ctor] diff --git a/datafusion/core/tests/parquet/schema_adapter.rs b/datafusion/core/tests/parquet/schema_adapter.rs new file mode 100644 index 000000000000..95ac386480e3 --- /dev/null +++ b/datafusion/core/tests/parquet/schema_adapter.rs @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fs; +use std::sync::Arc; + +use arrow::datatypes::{Field, Schema}; +use arrow::record_batch::RecordBatch; +use arrow_array::{Int32Array, StringArray}; +use arrow_schema::{DataType, SchemaRef}; +use object_store::ObjectMeta; +use object_store::path::Path; +use datafusion::assert_batches_sorted_eq; + +use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec, SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; +use datafusion::physical_plan::{collect, Statistics}; +use datafusion::prelude::SessionContext; + +use parquet::arrow::ArrowWriter; +use tempfile::TempDir; +use datafusion::datasource::listing::PartitionedFile; + +#[tokio::test] +async fn can_override_schema_adapter() { + // Create several parquet files in same directoty / table with + // same schema but different metadata + let tmp_dir = TempDir::new().unwrap(); + let table_dir = tmp_dir.path().join("parquet_test"); + fs::DirBuilder::new() + .create(table_dir.as_path()) + .unwrap(); + let f1 = Field::new("id", DataType::Int32, true); + + let file_schema = Arc::new(Schema::new(vec![f1.clone()])); + let filename = format!("part.parquet"); + let path = table_dir.as_path().join(filename.clone()); + let file = fs::File::create(path.clone()).unwrap(); + let mut writer = ArrowWriter::try_new(file, file_schema.clone(), None).unwrap(); + + let ids = Arc::new(Int32Array::from(vec![1 as i32])); + let rec_batch = RecordBatch::try_new(file_schema.clone(), vec![ids]).unwrap(); + + writer.write(&rec_batch).unwrap(); + writer.close().unwrap(); + + let location = Path::parse(path.to_str().unwrap()).unwrap(); + let metadata = std::fs::metadata(path.as_path()).expect("Local file metadata"); + let meta = ObjectMeta { + location, + last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(), + size: metadata.len() as usize, + e_tag: None, + version: None, + }; + + let partitioned_file = PartitionedFile { + object_meta: meta, + partition_values: vec![], + range: None, + statistics: None, + extensions: None, + }; + + let f1 = Field::new("id", DataType::Int32, true); + let f2 = Field::new("extra_column", DataType::Utf8, true); + + let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()])); + + // prepare the scan + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: vec![vec![partitioned_file]], + statistics: Statistics::new_unknown(&schema), + file_schema: schema, + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + }, + None, + None, + Default::default(), + ) + .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {})); + + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap(); + + let expected = [ + "+----+--------------+", + "| id | extra_column |", + "+----+--------------+", + "| 1 | foo |", + "+----+--------------+", + ]; + + assert_batches_sorted_eq!(expected, &read); +} + +#[derive(Debug)] +struct TestSchemaAdapterFactory {} + +impl SchemaAdapterFactory for TestSchemaAdapterFactory { + fn create(&self, schema: SchemaRef) -> Box { + Box::new(TestSchemaAdapter { + table_schema: schema, + }) + } +} + +struct TestSchemaAdapter { + /// Schema for the table + table_schema: SchemaRef, +} + +impl SchemaAdapter for TestSchemaAdapter { + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { + let field = self.table_schema.field(index); + Some(file_schema.fields.find(field.name())?.0) + } + + fn map_schema( + &self, + file_schema: &Schema, + ) -> datafusion_common::Result<(Arc, Vec)> { + let mut projection = Vec::with_capacity(file_schema.fields().len()); + + for (file_idx, file_field) in file_schema.fields.iter().enumerate() { + if let Some(_) = self.table_schema.fields().find(file_field.name()) + { + projection.push(file_idx); + } + } + + Ok(( + Arc::new(TestSchemaMapping {}), + projection, + )) + } +} + +#[derive(Debug)] +struct TestSchemaMapping { +} + +impl SchemaMapper for TestSchemaMapping { + fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result { + let f1 = Field::new("id", DataType::Int32, true); + let f2 = Field::new("extra_column", DataType::Utf8, true); + + let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()])); + + let extra_column = Arc::new(StringArray::from(vec!["foo"])); + let mut new_columns = batch.columns().to_vec(); + new_columns.push(extra_column); + + Ok(RecordBatch::try_new(schema, new_columns).unwrap()) + } +} + + From e0b4293cdcc0b16d5e0e002eeb45e8b58dbf8dc5 Mon Sep 17 00:00:00 2001 From: Michael Maletich Date: Tue, 14 May 2024 22:57:08 -0500 Subject: [PATCH 2/3] Fix building with no-default features --- .../core/src/datasource/physical_plan/mod.rs | 23 ++++++++++----- datafusion/core/tests/parquet/mod.rs | 2 +- .../core/tests/parquet/schema_adapter.rs | 29 +++++++------------ 3 files changed, 28 insertions(+), 26 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 607b60bf95be..6e19961f6028 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -35,6 +35,15 @@ pub use self::parquet::{ ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory, SchemaAdapter, SchemaAdapterFactory, SchemaMapper, }; +#[cfg(feature = "parquet")] +use arrow::{ + array::new_null_array, + compute::{can_cast_types, cast}, + datatypes::Schema, + record_batch::{RecordBatch, RecordBatchOptions}, +}; +#[cfg(feature = "parquet")] +use datafusion_common::plan_err; pub use arrow_file::ArrowExec; pub use avro::AvroExec; @@ -64,13 +73,7 @@ use crate::{ physical_plan::display::{display_orderings, ProjectSchemaDisplay}, }; -use arrow::{ - array::new_null_array, - compute::{can_cast_types, cast}, - datatypes::{DataType, Schema, SchemaRef}, - record_batch::{RecordBatch, RecordBatchOptions}, -}; -use datafusion_common::plan_err; +use arrow::datatypes::{DataType, SchemaRef}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::PhysicalSortExpr; @@ -244,21 +247,25 @@ where Ok(()) } +#[cfg(feature = "parquet")] #[derive(Clone, Debug, Default)] pub(crate) struct DefaultSchemaAdapterFactory {} +#[cfg(feature = "parquet")] impl SchemaAdapterFactory for DefaultSchemaAdapterFactory { fn create(&self, table_schema: SchemaRef) -> Box { Box::new(DefaultSchemaAdapter { table_schema }) } } +#[cfg(feature = "parquet")] #[derive(Clone, Debug)] pub(crate) struct DefaultSchemaAdapter { /// Schema for the table table_schema: SchemaRef, } +#[cfg(feature = "parquet")] impl SchemaAdapter for DefaultSchemaAdapter { /// Map a column index in the table schema to a column index in a particular /// file schema @@ -317,6 +324,7 @@ impl SchemaAdapter for DefaultSchemaAdapter { /// The SchemaMapping struct holds a mapping from the file schema to the table schema /// and any necessary type conversions that need to be applied. +#[cfg(feature = "parquet")] #[derive(Debug)] pub struct SchemaMapping { /// The schema of the table. This is the expected schema after conversion and it should match the schema of the query result. @@ -325,6 +333,7 @@ pub struct SchemaMapping { field_mappings: Vec>, } +#[cfg(feature = "parquet")] impl SchemaMapper for SchemaMapping { /// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions. fn map_batch(&self, batch: RecordBatch) -> Result { diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 42d8fe0d8521..fe839bf1bcec 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -48,8 +48,8 @@ mod filter_pushdown; mod page_pruning; mod row_group_pruning; mod schema; -mod schema_coercion; mod schema_adapter; +mod schema_coercion; #[cfg(test)] #[ctor::ctor] diff --git a/datafusion/core/tests/parquet/schema_adapter.rs b/datafusion/core/tests/parquet/schema_adapter.rs index 95ac386480e3..0f7ae6998530 100644 --- a/datafusion/core/tests/parquet/schema_adapter.rs +++ b/datafusion/core/tests/parquet/schema_adapter.rs @@ -22,18 +22,20 @@ use arrow::datatypes::{Field, Schema}; use arrow::record_batch::RecordBatch; use arrow_array::{Int32Array, StringArray}; use arrow_schema::{DataType, SchemaRef}; -use object_store::ObjectMeta; -use object_store::path::Path; use datafusion::assert_batches_sorted_eq; +use object_store::path::Path; +use object_store::ObjectMeta; use datafusion::datasource::object_store::ObjectStoreUrl; -use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec, SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; +use datafusion::datasource::physical_plan::{ + FileScanConfig, ParquetExec, SchemaAdapter, SchemaAdapterFactory, SchemaMapper, +}; use datafusion::physical_plan::{collect, Statistics}; use datafusion::prelude::SessionContext; +use datafusion::datasource::listing::PartitionedFile; use parquet::arrow::ArrowWriter; use tempfile::TempDir; -use datafusion::datasource::listing::PartitionedFile; #[tokio::test] async fn can_override_schema_adapter() { @@ -41,9 +43,7 @@ async fn can_override_schema_adapter() { // same schema but different metadata let tmp_dir = TempDir::new().unwrap(); let table_dir = tmp_dir.path().join("parquet_test"); - fs::DirBuilder::new() - .create(table_dir.as_path()) - .unwrap(); + fs::DirBuilder::new().create(table_dir.as_path()).unwrap(); let f1 = Field::new("id", DataType::Int32, true); let file_schema = Arc::new(Schema::new(vec![f1.clone()])); @@ -97,7 +97,7 @@ async fn can_override_schema_adapter() { None, Default::default(), ) - .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {})); + .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {})); let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); @@ -143,22 +143,17 @@ impl SchemaAdapter for TestSchemaAdapter { let mut projection = Vec::with_capacity(file_schema.fields().len()); for (file_idx, file_field) in file_schema.fields.iter().enumerate() { - if let Some(_) = self.table_schema.fields().find(file_field.name()) - { + if let Some(_) = self.table_schema.fields().find(file_field.name()) { projection.push(file_idx); } } - Ok(( - Arc::new(TestSchemaMapping {}), - projection, - )) + Ok((Arc::new(TestSchemaMapping {}), projection)) } } #[derive(Debug)] -struct TestSchemaMapping { -} +struct TestSchemaMapping {} impl SchemaMapper for TestSchemaMapping { fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result { @@ -174,5 +169,3 @@ impl SchemaMapper for TestSchemaMapping { Ok(RecordBatch::try_new(schema, new_columns).unwrap()) } } - - From ae0a6bc7ddf4a474de008f88655d038726bd8a13 Mon Sep 17 00:00:00 2001 From: Michael Maletich Date: Tue, 14 May 2024 23:28:05 -0500 Subject: [PATCH 3/3] clippy --- datafusion/core/tests/parquet/schema_adapter.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/core/tests/parquet/schema_adapter.rs b/datafusion/core/tests/parquet/schema_adapter.rs index 0f7ae6998530..10c4e8a4c059 100644 --- a/datafusion/core/tests/parquet/schema_adapter.rs +++ b/datafusion/core/tests/parquet/schema_adapter.rs @@ -47,12 +47,12 @@ async fn can_override_schema_adapter() { let f1 = Field::new("id", DataType::Int32, true); let file_schema = Arc::new(Schema::new(vec![f1.clone()])); - let filename = format!("part.parquet"); + let filename = "part.parquet".to_string(); let path = table_dir.as_path().join(filename.clone()); let file = fs::File::create(path.clone()).unwrap(); let mut writer = ArrowWriter::try_new(file, file_schema.clone(), None).unwrap(); - let ids = Arc::new(Int32Array::from(vec![1 as i32])); + let ids = Arc::new(Int32Array::from(vec![1i32])); let rec_batch = RecordBatch::try_new(file_schema.clone(), vec![ids]).unwrap(); writer.write(&rec_batch).unwrap(); @@ -143,7 +143,7 @@ impl SchemaAdapter for TestSchemaAdapter { let mut projection = Vec::with_capacity(file_schema.fields().len()); for (file_idx, file_field) in file_schema.fields.iter().enumerate() { - if let Some(_) = self.table_schema.fields().find(file_field.name()) { + if self.table_schema.fields().find(file_field.name()).is_some() { projection.push(file_idx); } }