diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index fa379eb5b445..2e56b9a46a7b 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -31,7 +31,8 @@ use crate::arrow::array::{ use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef}; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::physical_plan::{ - FileGroupDisplay, FileSinkConfig, ParquetExec, SchemaAdapter, + DefaultSchemaAdapterFactory, FileGroupDisplay, FileSinkConfig, ParquetExec, + SchemaAdapterFactory, }; use crate::datasource::statistics::{create_max_min_accs, get_col_stats}; use crate::error::Result; @@ -474,7 +475,8 @@ async fn fetch_statistics( let mut null_counts = vec![Precision::Exact(0); num_fields]; let mut has_statistics = false; - let schema_adapter = SchemaAdapter::new(table_schema.clone()); + let schema_adapter = + DefaultSchemaAdapterFactory::default().create(table_schema.clone()); let (mut max_values, mut min_values) = create_max_min_accs(&table_schema); diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index c450774572db..6e19961f6028 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -31,7 +31,19 @@ mod statistics; pub(crate) use self::csv::plan_to_csv; pub(crate) use self::json::plan_to_json; #[cfg(feature = "parquet")] -pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory}; +pub use self::parquet::{ + ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory, SchemaAdapter, + SchemaAdapterFactory, SchemaMapper, +}; +#[cfg(feature = "parquet")] +use arrow::{ + array::new_null_array, + compute::{can_cast_types, cast}, + datatypes::Schema, + record_batch::{RecordBatch, RecordBatchOptions}, +}; +#[cfg(feature = "parquet")] +use datafusion_common::plan_err; pub use arrow_file::ArrowExec; pub use avro::AvroExec; @@ -61,13 +73,7 @@ use crate::{ physical_plan::display::{display_orderings, ProjectSchemaDisplay}, }; -use arrow::{ - array::new_null_array, - compute::{can_cast_types, cast}, - datatypes::{DataType, Schema, SchemaRef}, - record_batch::{RecordBatch, RecordBatchOptions}, -}; -use datafusion_common::plan_err; +use arrow::datatypes::{DataType, SchemaRef}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::PhysicalSortExpr; @@ -241,39 +247,31 @@ where Ok(()) } -/// A utility which can adapt file-level record batches to a table schema which may have a schema -/// obtained from merging multiple file-level schemas. -/// -/// This is useful for enabling schema evolution in partitioned datasets. -/// -/// This has to be done in two stages. -/// -/// 1. Before reading the file, we have to map projected column indexes from the table schema to -/// the file schema. -/// -/// 2. After reading a record batch we need to map the read columns back to the expected columns -/// indexes and insert null-valued columns wherever the file schema was missing a colum present -/// in the table schema. +#[cfg(feature = "parquet")] +#[derive(Clone, Debug, Default)] +pub(crate) struct DefaultSchemaAdapterFactory {} + +#[cfg(feature = "parquet")] +impl SchemaAdapterFactory for DefaultSchemaAdapterFactory { + fn create(&self, table_schema: SchemaRef) -> Box { + Box::new(DefaultSchemaAdapter { table_schema }) + } +} + +#[cfg(feature = "parquet")] #[derive(Clone, Debug)] -pub(crate) struct SchemaAdapter { +pub(crate) struct DefaultSchemaAdapter { /// Schema for the table table_schema: SchemaRef, } -impl SchemaAdapter { - pub(crate) fn new(table_schema: SchemaRef) -> SchemaAdapter { - Self { table_schema } - } - +#[cfg(feature = "parquet")] +impl SchemaAdapter for DefaultSchemaAdapter { /// Map a column index in the table schema to a column index in a particular /// file schema /// /// Panics if index is not in range for the table schema - pub(crate) fn map_column_index( - &self, - index: usize, - file_schema: &Schema, - ) -> Option { + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { let field = self.table_schema.field(index); Some(file_schema.fields.find(field.name())?.0) } @@ -286,10 +284,10 @@ impl SchemaAdapter { /// /// Returns a [`SchemaMapping`] that can be applied to the output batch /// along with an ordered list of columns to project from the file - pub fn map_schema( + fn map_schema( &self, file_schema: &Schema, - ) -> Result<(SchemaMapping, Vec)> { + ) -> Result<(Arc, Vec)> { let mut projection = Vec::with_capacity(file_schema.fields().len()); let mut field_mappings = vec![None; self.table_schema.fields().len()]; @@ -315,10 +313,10 @@ impl SchemaAdapter { } Ok(( - SchemaMapping { + Arc::new(SchemaMapping { table_schema: self.table_schema.clone(), field_mappings, - }, + }), projection, )) } @@ -326,6 +324,7 @@ impl SchemaAdapter { /// The SchemaMapping struct holds a mapping from the file schema to the table schema /// and any necessary type conversions that need to be applied. +#[cfg(feature = "parquet")] #[derive(Debug)] pub struct SchemaMapping { /// The schema of the table. This is the expected schema after conversion and it should match the schema of the query result. @@ -334,7 +333,8 @@ pub struct SchemaMapping { field_mappings: Vec>, } -impl SchemaMapping { +#[cfg(feature = "parquet")] +impl SchemaMapper for SchemaMapping { /// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions. fn map_batch(&self, batch: RecordBatch) -> Result { let batch_rows = batch.num_rows(); @@ -636,7 +636,7 @@ mod tests { Field::new("c3", DataType::Float64, true), ])); - let adapter = SchemaAdapter::new(table_schema.clone()); + let adapter = DefaultSchemaAdapterFactory::default().create(table_schema.clone()); let file_schema = Schema::new(vec![ Field::new("c1", DataType::Utf8, true), @@ -693,7 +693,7 @@ mod tests { let indices = vec![1, 2, 4]; let schema = SchemaRef::from(table_schema.project(&indices).unwrap()); - let adapter = SchemaAdapter::new(schema); + let adapter = DefaultSchemaAdapterFactory::default().create(schema); let (mapping, projection) = adapter.map_schema(&file_schema).unwrap(); let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index a48f510adb56..7509d08ad88a 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -27,8 +27,8 @@ use crate::datasource::physical_plan::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; use crate::datasource::physical_plan::{ - parquet::page_filter::PagePruningPredicate, DisplayAs, FileGroupPartitioner, - FileMeta, FileScanConfig, SchemaAdapter, + parquet::page_filter::PagePruningPredicate, DefaultSchemaAdapterFactory, DisplayAs, + FileGroupPartitioner, FileMeta, FileScanConfig, }; use crate::{ config::{ConfigOptions, TableParquetOptions}, @@ -67,9 +67,11 @@ mod metrics; mod page_filter; mod row_filter; mod row_groups; +mod schema_adapter; mod statistics; pub use metrics::ParquetFileMetrics; +pub use schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] @@ -93,6 +95,8 @@ pub struct ParquetExec { cache: PlanProperties, /// Options for reading Parquet files table_parquet_options: TableParquetOptions, + /// Optional user defined schema adapter + schema_adapter_factory: Option>, } impl ParquetExec { @@ -157,6 +161,7 @@ impl ParquetExec { parquet_file_reader_factory: None, cache, table_parquet_options, + schema_adapter_factory: None, } } @@ -195,6 +200,19 @@ impl ParquetExec { self } + /// Optional schema adapter factory. + /// + /// `SchemaAdapterFactory` allows user to specify how fields from the parquet file get mapped to + /// that of the table schema. The default schema adapter uses arrow's cast library to map + /// the parquet fields to the table schema. + pub fn with_schema_adapter_factory( + mut self, + schema_adapter_factory: Arc, + ) -> Self { + self.schema_adapter_factory = Some(schema_adapter_factory); + self + } + /// If true, any filter [`Expr`]s on the scan will converted to a /// [`RowFilter`](parquet::arrow::arrow_reader::RowFilter) in the /// `ParquetRecordBatchStream`. These filters are applied by the @@ -402,6 +420,11 @@ impl ExecutionPlan for ParquetExec { }) })?; + let schema_adapter_factory = self + .schema_adapter_factory + .clone() + .unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory::default())); + let opener = ParquetOpener { partition_index, projection: Arc::from(projection), @@ -418,6 +441,7 @@ impl ExecutionPlan for ParquetExec { reorder_filters: self.reorder_filters(), enable_page_index: self.enable_page_index(), enable_bloom_filter: self.bloom_filter_on_read(), + schema_adapter_factory, }; let stream = @@ -452,6 +476,7 @@ struct ParquetOpener { reorder_filters: bool, enable_page_index: bool, enable_bloom_filter: bool, + schema_adapter_factory: Arc, } impl FileOpener for ParquetOpener { @@ -475,7 +500,7 @@ impl FileOpener for ParquetOpener { let batch_size = self.batch_size; let projection = self.projection.clone(); let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?); - let schema_adapter = SchemaAdapter::new(projected_schema); + let schema_adapter = self.schema_adapter_factory.create(projected_schema); let predicate = self.predicate.clone(); let pruning_predicate = self.pruning_predicate.clone(); let page_pruning_predicate = self.page_pruning_predicate.clone(); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs b/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs new file mode 100644 index 000000000000..193e5161a398 --- /dev/null +++ b/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::RecordBatch; +use arrow_schema::{Schema, SchemaRef}; +use std::fmt::Debug; +use std::sync::Arc; + +/// Factory of schema adapters. +/// +/// Provides means to implement custom schema adaptation. +pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static { + /// Provides `SchemaAdapter` for the ParquetExec. + fn create(&self, schema: SchemaRef) -> Box; +} + +/// A utility which can adapt file-level record batches to a table schema which may have a schema +/// obtained from merging multiple file-level schemas. +/// +/// This is useful for enabling schema evolution in partitioned datasets. +/// +/// This has to be done in two stages. +/// +/// 1. Before reading the file, we have to map projected column indexes from the table schema to +/// the file schema. +/// +/// 2. After reading a record batch we need to map the read columns back to the expected columns +/// indexes and insert null-valued columns wherever the file schema was missing a colum present +/// in the table schema. +pub trait SchemaAdapter: Send + Sync { + /// Map a column index in the table schema to a column index in a particular + /// file schema + /// + /// Panics if index is not in range for the table schema + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option; + + /// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema. + /// + /// If the provided `file_schema` contains columns of a different type to the expected + /// `table_schema`, the method will attempt to cast the array data from the file schema + /// to the table schema where possible. + /// + /// Returns a [`SchemaMapper`] that can be applied to the output batch + /// along with an ordered list of columns to project from the file + fn map_schema( + &self, + file_schema: &Schema, + ) -> datafusion_common::Result<(Arc, Vec)>; +} + +/// Transforms a RecordBatch from Parquet to a RecordBatch that meets the table schema. +pub trait SchemaMapper: Send + Sync { + /// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions. + fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result; +} diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index bb938e3af493..fe839bf1bcec 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -48,6 +48,7 @@ mod filter_pushdown; mod page_pruning; mod row_group_pruning; mod schema; +mod schema_adapter; mod schema_coercion; #[cfg(test)] diff --git a/datafusion/core/tests/parquet/schema_adapter.rs b/datafusion/core/tests/parquet/schema_adapter.rs new file mode 100644 index 000000000000..10c4e8a4c059 --- /dev/null +++ b/datafusion/core/tests/parquet/schema_adapter.rs @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fs; +use std::sync::Arc; + +use arrow::datatypes::{Field, Schema}; +use arrow::record_batch::RecordBatch; +use arrow_array::{Int32Array, StringArray}; +use arrow_schema::{DataType, SchemaRef}; +use datafusion::assert_batches_sorted_eq; +use object_store::path::Path; +use object_store::ObjectMeta; + +use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::datasource::physical_plan::{ + FileScanConfig, ParquetExec, SchemaAdapter, SchemaAdapterFactory, SchemaMapper, +}; +use datafusion::physical_plan::{collect, Statistics}; +use datafusion::prelude::SessionContext; + +use datafusion::datasource::listing::PartitionedFile; +use parquet::arrow::ArrowWriter; +use tempfile::TempDir; + +#[tokio::test] +async fn can_override_schema_adapter() { + // Create several parquet files in same directoty / table with + // same schema but different metadata + let tmp_dir = TempDir::new().unwrap(); + let table_dir = tmp_dir.path().join("parquet_test"); + fs::DirBuilder::new().create(table_dir.as_path()).unwrap(); + let f1 = Field::new("id", DataType::Int32, true); + + let file_schema = Arc::new(Schema::new(vec![f1.clone()])); + let filename = "part.parquet".to_string(); + let path = table_dir.as_path().join(filename.clone()); + let file = fs::File::create(path.clone()).unwrap(); + let mut writer = ArrowWriter::try_new(file, file_schema.clone(), None).unwrap(); + + let ids = Arc::new(Int32Array::from(vec![1i32])); + let rec_batch = RecordBatch::try_new(file_schema.clone(), vec![ids]).unwrap(); + + writer.write(&rec_batch).unwrap(); + writer.close().unwrap(); + + let location = Path::parse(path.to_str().unwrap()).unwrap(); + let metadata = std::fs::metadata(path.as_path()).expect("Local file metadata"); + let meta = ObjectMeta { + location, + last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(), + size: metadata.len() as usize, + e_tag: None, + version: None, + }; + + let partitioned_file = PartitionedFile { + object_meta: meta, + partition_values: vec![], + range: None, + statistics: None, + extensions: None, + }; + + let f1 = Field::new("id", DataType::Int32, true); + let f2 = Field::new("extra_column", DataType::Utf8, true); + + let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()])); + + // prepare the scan + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: vec![vec![partitioned_file]], + statistics: Statistics::new_unknown(&schema), + file_schema: schema, + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + }, + None, + None, + Default::default(), + ) + .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {})); + + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap(); + + let expected = [ + "+----+--------------+", + "| id | extra_column |", + "+----+--------------+", + "| 1 | foo |", + "+----+--------------+", + ]; + + assert_batches_sorted_eq!(expected, &read); +} + +#[derive(Debug)] +struct TestSchemaAdapterFactory {} + +impl SchemaAdapterFactory for TestSchemaAdapterFactory { + fn create(&self, schema: SchemaRef) -> Box { + Box::new(TestSchemaAdapter { + table_schema: schema, + }) + } +} + +struct TestSchemaAdapter { + /// Schema for the table + table_schema: SchemaRef, +} + +impl SchemaAdapter for TestSchemaAdapter { + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { + let field = self.table_schema.field(index); + Some(file_schema.fields.find(field.name())?.0) + } + + fn map_schema( + &self, + file_schema: &Schema, + ) -> datafusion_common::Result<(Arc, Vec)> { + let mut projection = Vec::with_capacity(file_schema.fields().len()); + + for (file_idx, file_field) in file_schema.fields.iter().enumerate() { + if self.table_schema.fields().find(file_field.name()).is_some() { + projection.push(file_idx); + } + } + + Ok((Arc::new(TestSchemaMapping {}), projection)) + } +} + +#[derive(Debug)] +struct TestSchemaMapping {} + +impl SchemaMapper for TestSchemaMapping { + fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result { + let f1 = Field::new("id", DataType::Int32, true); + let f2 = Field::new("extra_column", DataType::Utf8, true); + + let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()])); + + let extra_column = Arc::new(StringArray::from(vec!["foo"])); + let mut new_columns = batch.columns().to_vec(); + new_columns.push(extra_column); + + Ok(RecordBatch::try_new(schema, new_columns).unwrap()) + } +}