diff --git a/datafusion/core/src/physical_plan/file_format/row_filter.rs b/datafusion/core/src/physical_plan/file_format/row_filter.rs index 5d953d3ae20f..470dc9fbf4df 100644 --- a/datafusion/core/src/physical_plan/file_format/row_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/row_filter.rs @@ -96,11 +96,7 @@ impl DatafusionArrowPredicate { // on the order they appear in the file let projection = match candidate.projection.len() { 0 | 1 => vec![], - len => { - let mut projection: Vec<_> = (0..len).collect(); - projection.sort_unstable_by_key(|x| candidate.projection[*x]); - projection - } + _ => remap_projection(&candidate.projection), }; Ok(Self { @@ -278,6 +274,32 @@ impl<'a> ExprRewriter for FilterCandidateBuilder<'a> { } } +/// Computes the projection required to go from the file's schema order to the projected +/// order expected by this filter +/// +/// Effectively this computes the rank of each element in `src` +fn remap_projection(src: &[usize]) -> Vec { + let len = src.len(); + + // Compute the column mapping from projected order to file order + // i.e. the indices required to sort projected schema into the file schema + // + // e.g. projection: [5, 9, 0] -> [2, 0, 1] + let mut sorted_indexes: Vec<_> = (0..len).collect(); + sorted_indexes.sort_unstable_by_key(|x| src[*x]); + + // Compute the mapping from schema order to projected order + // i.e. the indices required to sort file schema into the projected schema + // + // Above we computed the order of the projected schema according to the file + // schema, and so we can use this as the comparator + // + // e.g. sorted_indexes [2, 0, 1] -> [1, 2, 0] + let mut projection: Vec<_> = (0..len).collect(); + projection.sort_unstable_by_key(|x| sorted_indexes[*x]); + projection +} + /// Calculate the total compressed size of all `Column's required for /// predicate `Expr`. This should represent the total amount of file IO /// required to evaluate the predicate. @@ -382,12 +404,13 @@ pub fn build_row_filter( #[cfg(test)] mod test { + use super::*; use crate::physical_plan::file_format::row_filter::FilterCandidateBuilder; - use arrow::datatypes::{DataType, Field, Schema}; - use datafusion_common::ScalarValue; + use arrow::datatypes::Field; use datafusion_expr::{cast, col, lit}; use parquet::arrow::parquet_to_arrow_schema; use parquet::file::reader::{FileReader, SerializedFileReader}; + use rand::prelude::*; // Assume a column expression for a column not in the table schema is a projected column and ignore it #[test] @@ -471,4 +494,22 @@ mod test { assert_eq!(candidate.unwrap().expr, expected_candidate_expr); } + + #[test] + fn test_remap_projection() { + let mut rng = thread_rng(); + for _ in 0..100 { + // A random selection of column indexes in arbitrary order + let projection: Vec<_> = (0..100).map(|_| rng.gen()).collect(); + + // File order is the projection sorted + let mut file_order = projection.clone(); + file_order.sort_unstable(); + + let remap = remap_projection(&projection); + // Applying the remapped projection to the file order should yield the original + let remapped: Vec<_> = remap.iter().map(|r| file_order[*r]).collect(); + assert_eq!(projection, remapped) + } + } }