Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Expose Parquet Schema Adapter #2

Merged
merged 1 commit into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ use crate::arrow::array::{
use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::physical_plan::{
FileGroupDisplay, FileSinkConfig, ParquetExec, SchemaAdapter,
DefaultSchemaAdapterFactory, FileGroupDisplay, FileSinkConfig, ParquetExec,
SchemaAdapterFactory,
};
use crate::datasource::statistics::{create_max_min_accs, get_col_stats};
use crate::error::Result;
Expand Down Expand Up @@ -474,7 +475,8 @@ async fn fetch_statistics(
let mut null_counts = vec![Precision::Exact(0); num_fields];
let mut has_statistics = false;

let schema_adapter = SchemaAdapter::new(table_schema.clone());
let schema_adapter =
DefaultSchemaAdapterFactory::default().create(table_schema.clone());

let (mut max_values, mut min_values) = create_max_min_accs(&table_schema);

Expand Down
78 changes: 39 additions & 39 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,19 @@ pub mod parquet;
pub(crate) use self::csv::plan_to_csv;
pub(crate) use self::json::plan_to_json;
#[cfg(feature = "parquet")]
pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory};
pub use self::parquet::{
ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory, SchemaAdapter,
SchemaAdapterFactory, SchemaMapper,
};
#[cfg(feature = "parquet")]
use arrow::{
array::new_null_array,
compute::{can_cast_types, cast},
datatypes::Schema,
record_batch::{RecordBatch, RecordBatchOptions},
};
#[cfg(feature = "parquet")]
use datafusion_common::plan_err;

pub use arrow_file::ArrowExec;
pub use avro::AvroExec;
Expand Down Expand Up @@ -60,13 +72,7 @@ use crate::{
physical_plan::display::{display_orderings, ProjectSchemaDisplay},
};

use arrow::{
array::new_null_array,
compute::{can_cast_types, cast},
datatypes::{DataType, Schema, SchemaRef},
record_batch::{RecordBatch, RecordBatchOptions},
};
use datafusion_common::plan_err;
use arrow::datatypes::{DataType, SchemaRef};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalSortExpr;

Expand Down Expand Up @@ -240,39 +246,31 @@ where
Ok(())
}

/// A utility which can adapt file-level record batches to a table schema which may have a schema
/// obtained from merging multiple file-level schemas.
///
/// This is useful for enabling schema evolution in partitioned datasets.
///
/// This has to be done in two stages.
///
/// 1. Before reading the file, we have to map projected column indexes from the table schema to
/// the file schema.
///
/// 2. After reading a record batch we need to map the read columns back to the expected columns
/// indexes and insert null-valued columns wherever the file schema was missing a colum present
/// in the table schema.
#[cfg(feature = "parquet")]
#[derive(Clone, Debug, Default)]
pub(crate) struct DefaultSchemaAdapterFactory {}

#[cfg(feature = "parquet")]
impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
fn create(&self, table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
Box::new(DefaultSchemaAdapter { table_schema })
}
}

#[cfg(feature = "parquet")]
#[derive(Clone, Debug)]
pub(crate) struct SchemaAdapter {
pub(crate) struct DefaultSchemaAdapter {
/// Schema for the table
table_schema: SchemaRef,
}

impl SchemaAdapter {
pub(crate) fn new(table_schema: SchemaRef) -> SchemaAdapter {
Self { table_schema }
}

#[cfg(feature = "parquet")]
impl SchemaAdapter for DefaultSchemaAdapter {
/// Map a column index in the table schema to a column index in a particular
/// file schema
///
/// Panics if index is not in range for the table schema
pub(crate) fn map_column_index(
&self,
index: usize,
file_schema: &Schema,
) -> Option<usize> {
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
let field = self.table_schema.field(index);
Some(file_schema.fields.find(field.name())?.0)
}
Expand All @@ -285,10 +283,10 @@ impl SchemaAdapter {
///
/// Returns a [`SchemaMapping`] that can be applied to the output batch
/// along with an ordered list of columns to project from the file
pub fn map_schema(
fn map_schema(
&self,
file_schema: &Schema,
) -> Result<(SchemaMapping, Vec<usize>)> {
) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
let mut projection = Vec::with_capacity(file_schema.fields().len());
let mut field_mappings = vec![None; self.table_schema.fields().len()];

Expand All @@ -314,15 +312,16 @@ impl SchemaAdapter {
}

Ok((
SchemaMapping {
Arc::new(SchemaMapping {
table_schema: self.table_schema.clone(),
field_mappings,
},
}),
projection,
))
}
}

#[cfg(feature = "parquet")]
/// The SchemaMapping struct holds a mapping from the file schema to the table schema
/// and any necessary type conversions that need to be applied.
#[derive(Debug)]
Expand All @@ -333,7 +332,8 @@ pub struct SchemaMapping {
field_mappings: Vec<Option<usize>>,
}

impl SchemaMapping {
#[cfg(feature = "parquet")]
impl SchemaMapper for SchemaMapping {
/// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions.
fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
let batch_rows = batch.num_rows();
Expand Down Expand Up @@ -606,7 +606,7 @@ mod tests {
Field::new("c3", DataType::Float64, true),
]));

let adapter = SchemaAdapter::new(table_schema.clone());
let adapter = DefaultSchemaAdapterFactory::default().create(table_schema.clone());

let file_schema = Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Expand Down Expand Up @@ -663,7 +663,7 @@ mod tests {

let indices = vec![1, 2, 4];
let schema = SchemaRef::from(table_schema.project(&indices).unwrap());
let adapter = SchemaAdapter::new(schema);
let adapter = DefaultSchemaAdapterFactory::default().create(schema);
let (mapping, projection) = adapter.map_schema(&file_schema).unwrap();

let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
Expand Down
31 changes: 28 additions & 3 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use crate::datasource::physical_plan::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::datasource::physical_plan::{
parquet::page_filter::PagePruningPredicate, DisplayAs, FileGroupPartitioner,
FileMeta, FileScanConfig, SchemaAdapter,
parquet::page_filter::PagePruningPredicate, DefaultSchemaAdapterFactory, DisplayAs,
FileGroupPartitioner, FileMeta, FileScanConfig,
};
use crate::{
config::{ConfigOptions, TableParquetOptions},
Expand Down Expand Up @@ -67,9 +67,11 @@ mod metrics;
mod page_filter;
mod row_filter;
mod row_groups;
mod schema_adapter;
mod statistics;

pub use metrics::ParquetFileMetrics;
pub use schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};

/// Execution plan for scanning one or more Parquet partitions
#[derive(Debug, Clone)]
Expand All @@ -92,6 +94,8 @@ pub struct ParquetExec {
cache: PlanProperties,
/// Parquet Options
parquet_options: TableParquetOptions,
/// Optional user defined schema adapter
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
}

impl ParquetExec {
Expand Down Expand Up @@ -156,6 +160,7 @@ impl ParquetExec {
parquet_file_reader_factory: None,
cache,
parquet_options,
schema_adapter_factory: None,
}
}

Expand Down Expand Up @@ -189,6 +194,19 @@ impl ParquetExec {
self
}

/// Optional schema adapter factory.
///
/// `SchemaAdapterFactory` allows user to specify how fields from the parquet file get mapped to
/// that of the table schema. The default schema adapter uses arrow's cast library to map
/// the parquet fields to the table schema.
pub fn with_schema_adapter_factory(
mut self,
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
) -> Self {
self.schema_adapter_factory = Some(schema_adapter_factory);
self
}

/// If true, any filter [`Expr`]s on the scan will converted to a
/// [`RowFilter`](parquet::arrow::arrow_reader::RowFilter) in the
/// `ParquetRecordBatchStream`. These filters are applied by the
Expand Down Expand Up @@ -386,6 +404,11 @@ impl ExecutionPlan for ParquetExec {
})
})?;

let schema_adapter_factory = self
.schema_adapter_factory
.clone()
.unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory::default()));

let opener = ParquetOpener {
partition_index,
projection: Arc::from(projection),
Expand All @@ -402,6 +425,7 @@ impl ExecutionPlan for ParquetExec {
reorder_filters: self.reorder_filters(),
enable_page_index: self.enable_page_index(),
enable_bloom_filter: self.enable_bloom_filter(),
schema_adapter_factory,
};

let stream =
Expand Down Expand Up @@ -436,6 +460,7 @@ struct ParquetOpener {
reorder_filters: bool,
enable_page_index: bool,
enable_bloom_filter: bool,
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
}

impl FileOpener for ParquetOpener {
Expand All @@ -459,7 +484,7 @@ impl FileOpener for ParquetOpener {
let batch_size = self.batch_size;
let projection = self.projection.clone();
let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?);
let schema_adapter = SchemaAdapter::new(projected_schema);
let schema_adapter = self.schema_adapter_factory.create(projected_schema);
let predicate = self.predicate.clone();
let pruning_predicate = self.pruning_predicate.clone();
let page_pruning_predicate = self.page_pruning_predicate.clone();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::RecordBatch;
use arrow_schema::{Schema, SchemaRef};
use std::fmt::Debug;
use std::sync::Arc;

/// Factory of schema adapters.
///
/// Provides means to implement custom schema adaptation.
pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
/// Provides `SchemaAdapter` for the ParquetExec.
fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter>;
}

/// A utility which can adapt file-level record batches to a table schema which may have a schema
/// obtained from merging multiple file-level schemas.
///
/// This is useful for enabling schema evolution in partitioned datasets.
///
/// This has to be done in two stages.
///
/// 1. Before reading the file, we have to map projected column indexes from the table schema to
/// the file schema.
///
/// 2. After reading a record batch we need to map the read columns back to the expected columns
/// indexes and insert null-valued columns wherever the file schema was missing a colum present
/// in the table schema.
pub trait SchemaAdapter: Send + Sync {
/// Map a column index in the table schema to a column index in a particular
/// file schema
///
/// Panics if index is not in range for the table schema
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize>;

/// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema.
///
/// If the provided `file_schema` contains columns of a different type to the expected
/// `table_schema`, the method will attempt to cast the array data from the file schema
/// to the table schema where possible.
///
/// Returns a [`SchemaMapper`] that can be applied to the output batch
/// along with an ordered list of columns to project from the file
fn map_schema(
&self,
file_schema: &Schema,
) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
}

/// Transforms a RecordBatch from Parquet to a RecordBatch that meets the table schema.
pub trait SchemaMapper: Send + Sync {
/// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions.
fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;
}
Loading