Skip to content

Commit

Permalink
Fix multicolumn parquet predicate pushdown (apache#4046) (apache#4048)
Browse files Browse the repository at this point in the history
* Fix multicolumn parquet predicate pushdown (apache#4046)

* Format
  • Loading branch information
tustvold authored and Dandandan committed Nov 5, 2022
1 parent 497ce73 commit 5f7dc1b
Showing 1 changed file with 48 additions and 7 deletions.
55 changes: 48 additions & 7 deletions datafusion/core/src/physical_plan/file_format/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,7 @@ impl DatafusionArrowPredicate {
// on the order they appear in the file
let projection = match candidate.projection.len() {
0 | 1 => vec![],
len => {
let mut projection: Vec<_> = (0..len).collect();
projection.sort_unstable_by_key(|x| candidate.projection[*x]);
projection
}
_ => remap_projection(&candidate.projection),
};

Ok(Self {
Expand Down Expand Up @@ -278,6 +274,32 @@ impl<'a> ExprRewriter for FilterCandidateBuilder<'a> {
}
}

/// Computes the projection required to go from the file's schema order to the projected
/// order expected by this filter
///
/// Effectively this computes the rank of each element in `src`
fn remap_projection(src: &[usize]) -> Vec<usize> {
let len = src.len();

// Compute the column mapping from projected order to file order
// i.e. the indices required to sort projected schema into the file schema
//
// e.g. projection: [5, 9, 0] -> [2, 0, 1]
let mut sorted_indexes: Vec<_> = (0..len).collect();
sorted_indexes.sort_unstable_by_key(|x| src[*x]);

// Compute the mapping from schema order to projected order
// i.e. the indices required to sort file schema into the projected schema
//
// Above we computed the order of the projected schema according to the file
// schema, and so we can use this as the comparator
//
// e.g. sorted_indexes [2, 0, 1] -> [1, 2, 0]
let mut projection: Vec<_> = (0..len).collect();
projection.sort_unstable_by_key(|x| sorted_indexes[*x]);
projection
}

/// Calculate the total compressed size of all `Column's required for
/// predicate `Expr`. This should represent the total amount of file IO
/// required to evaluate the predicate.
Expand Down Expand Up @@ -382,12 +404,13 @@ pub fn build_row_filter(

#[cfg(test)]
mod test {
use super::*;
use crate::physical_plan::file_format::row_filter::FilterCandidateBuilder;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::ScalarValue;
use arrow::datatypes::Field;
use datafusion_expr::{cast, col, lit};
use parquet::arrow::parquet_to_arrow_schema;
use parquet::file::reader::{FileReader, SerializedFileReader};
use rand::prelude::*;

// Assume a column expression for a column not in the table schema is a projected column and ignore it
#[test]
Expand Down Expand Up @@ -471,4 +494,22 @@ mod test {

assert_eq!(candidate.unwrap().expr, expected_candidate_expr);
}

#[test]
fn test_remap_projection() {
let mut rng = thread_rng();
for _ in 0..100 {
// A random selection of column indexes in arbitrary order
let projection: Vec<_> = (0..100).map(|_| rng.gen()).collect();

// File order is the projection sorted
let mut file_order = projection.clone();
file_order.sort_unstable();

let remap = remap_projection(&projection);
// Applying the remapped projection to the file order should yield the original
let remapped: Vec<_> = remap.iter().map(|r| file_order[*r]).collect();
assert_eq!(projection, remapped)
}
}
}

0 comments on commit 5f7dc1b

Please sign in to comment.