From 1d110bc7831dd873b2d34e37cc8b991795768e19 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 19 May 2021 14:34:51 -0400 Subject: [PATCH 1/5] Move row group pruning logic into pruning.rs --- datafusion/src/physical_optimizer/mod.rs | 1 + datafusion/src/physical_optimizer/pruning.rs | 752 +++++++++++++++++++ datafusion/src/physical_plan/parquet.rs | 751 +----------------- 3 files changed, 770 insertions(+), 734 deletions(-) create mode 100644 datafusion/src/physical_optimizer/pruning.rs diff --git a/datafusion/src/physical_optimizer/mod.rs b/datafusion/src/physical_optimizer/mod.rs index eca63db9f3de..8e79fe932874 100644 --- a/datafusion/src/physical_optimizer/mod.rs +++ b/datafusion/src/physical_optimizer/mod.rs @@ -21,4 +21,5 @@ pub mod coalesce_batches; pub mod merge_exec; pub mod optimizer; +pub mod pruning; pub mod repartition; diff --git a/datafusion/src/physical_optimizer/pruning.rs b/datafusion/src/physical_optimizer/pruning.rs new file mode 100644 index 000000000000..1690cfbe67f4 --- /dev/null +++ b/datafusion/src/physical_optimizer/pruning.rs @@ -0,0 +1,752 @@ +//! This module contains code to rule out row groups / partitions / +//! etc based on statistics prior in order to skip evaluating entire +//! swaths of rows. +//! +//! This code is currently specific to Parquet, but soon (TM), via +//! https://github.com/apache/arrow-datafusion/issues/363 it will +//! be genericized. + +use std::{collections::HashSet, sync::Arc}; + +use arrow::{ + array::{ + make_array, new_null_array, ArrayData, ArrayRef, BooleanArray, + BooleanBufferBuilder, + }, + buffer::MutableBuffer, + datatypes::{DataType, Field, Schema}, + record_batch::RecordBatch, +}; + +use crate::{ + error::{DataFusionError, Result}, + execution::context::ExecutionContextState, + logical_plan::{Expr, Operator}, + optimizer::utils, + physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr}, +}; + +use parquet::file::{ + metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics, +}; + +#[derive(Debug, Clone)] +/// Predicate builder used for generating of predicate functions, used to filter row group metadata +pub struct RowGroupPredicateBuilder { + parquet_schema: Schema, + predicate_expr: Arc, + stat_column_req: Vec<(String, StatisticsType, Field)>, +} + +impl RowGroupPredicateBuilder { + /// Try to create a new instance of PredicateExpressionBuilder. + /// This will translate the filter expression into a statistics predicate expression + /// (for example (column / 2) = 4 becomes (column_min / 2) <= 4 && 4 <= (column_max / 2)), + /// then convert it to a DataFusion PhysicalExpression and cache it for later use by build_row_group_predicate. + pub fn try_new(expr: &Expr, parquet_schema: Schema) -> Result { + // build predicate expression once + let mut stat_column_req = Vec::<(String, StatisticsType, Field)>::new(); + let logical_predicate_expr = + build_predicate_expression(expr, &parquet_schema, &mut stat_column_req)?; + // println!( + // "RowGroupPredicateBuilder::try_new, logical_predicate_expr: {:?}", + // logical_predicate_expr + // ); + // build physical predicate expression + let stat_fields = stat_column_req + .iter() + .map(|(_, _, f)| f.clone()) + .collect::>(); + let stat_schema = Schema::new(stat_fields); + let execution_context_state = ExecutionContextState::new(); + let predicate_expr = DefaultPhysicalPlanner::default().create_physical_expr( + &logical_predicate_expr, + &stat_schema, + &execution_context_state, + )?; + // println!( + // "RowGroupPredicateBuilder::try_new, predicate_expr: {:?}", + // predicate_expr + // ); + Ok(Self { + parquet_schema, + predicate_expr, + stat_column_req, + }) + } + + /// Generate a predicate function used to filter row group metadata. + /// This function takes a list of all row groups as parameter, + /// so that DataFusion's physical expressions can be re-used by + /// generating a RecordBatch, containing statistics arrays, + /// on which the physical predicate expression is executed to generate a row group filter array. + /// The generated filter array is then used in the returned closure to filter row groups. + pub fn build_row_group_predicate( + &self, + row_group_metadata: &[RowGroupMetaData], + ) -> Box bool> { + // build statistics record batch + let predicate_result = build_statistics_record_batch( + row_group_metadata, + &self.parquet_schema, + &self.stat_column_req, + ) + .and_then(|statistics_batch| { + // execute predicate expression + self.predicate_expr.evaluate(&statistics_batch) + }) + .and_then(|v| match v { + ColumnarValue::Array(array) => Ok(array), + ColumnarValue::Scalar(_) => Err(DataFusionError::Plan( + "predicate expression didn't return an array".to_string(), + )), + }); + + let predicate_array = match predicate_result { + Ok(array) => array, + // row group filter array could not be built + // return a closure which will not filter out any row groups + _ => return Box::new(|_r, _i| true), + }; + + let predicate_array = predicate_array.as_any().downcast_ref::(); + match predicate_array { + // return row group predicate function + Some(array) => { + // when the result of the predicate expression for a row group is null / undefined, + // e.g. due to missing statistics, this row group can't be filtered out, + // so replace with true + let predicate_values = + array.iter().map(|x| x.unwrap_or(true)).collect::>(); + Box::new(move |_, i| predicate_values[i]) + } + // predicate result is not a BooleanArray + // return a closure which will not filter out any row groups + _ => Box::new(|_r, _i| true), + } + } +} + +/// Build a RecordBatch from a list of RowGroupMetadata structs, +/// creating arrays, one for each statistics column, +/// as requested in the stat_column_req parameter. +fn build_statistics_record_batch( + row_groups: &[RowGroupMetaData], + parquet_schema: &Schema, + stat_column_req: &[(String, StatisticsType, Field)], +) -> Result { + let mut fields = Vec::::new(); + let mut arrays = Vec::::new(); + for (column_name, statistics_type, stat_field) in stat_column_req { + if let Some((column_index, _)) = parquet_schema.column_with_name(column_name) { + let statistics = row_groups + .iter() + .map(|g| g.column(column_index).statistics()) + .collect::>(); + let array = build_statistics_array( + &statistics, + *statistics_type, + stat_field.data_type(), + ); + fields.push(stat_field.clone()); + arrays.push(array); + } + } + let schema = Arc::new(Schema::new(fields)); + RecordBatch::try_new(schema, arrays) + .map_err(|err| DataFusionError::Plan(err.to_string())) +} + +struct StatisticsExpressionBuilder<'a> { + column_name: String, + column_expr: &'a Expr, + scalar_expr: &'a Expr, + parquet_field: &'a Field, + stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>, + reverse_operator: bool, +} + +impl<'a> StatisticsExpressionBuilder<'a> { + fn try_new( + left: &'a Expr, + right: &'a Expr, + parquet_schema: &'a Schema, + stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>, + ) -> Result { + // find column name; input could be a more complicated expression + let mut left_columns = HashSet::::new(); + utils::expr_to_column_names(left, &mut left_columns)?; + let mut right_columns = HashSet::::new(); + utils::expr_to_column_names(right, &mut right_columns)?; + let (column_expr, scalar_expr, column_names, reverse_operator) = + match (left_columns.len(), right_columns.len()) { + (1, 0) => (left, right, left_columns, false), + (0, 1) => (right, left, right_columns, true), + _ => { + // if more than one column used in expression - not supported + return Err(DataFusionError::Plan( + "Multi-column expressions are not currently supported" + .to_string(), + )); + } + }; + let column_name = column_names.iter().next().unwrap().clone(); + let field = match parquet_schema.column_with_name(&column_name) { + Some((_, f)) => f, + _ => { + // field not found in parquet schema + return Err(DataFusionError::Plan( + "Field not found in parquet schema".to_string(), + )); + } + }; + + Ok(Self { + column_name, + column_expr, + scalar_expr, + parquet_field: field, + stat_column_req, + reverse_operator, + }) + } + + fn correct_operator(&self, op: Operator) -> Operator { + if !self.reverse_operator { + return op; + } + + match op { + Operator::Lt => Operator::Gt, + Operator::Gt => Operator::Lt, + Operator::LtEq => Operator::GtEq, + Operator::GtEq => Operator::LtEq, + _ => op, + } + } + + // fn column_expr(&self) -> &Expr { + // self.column_expr + // } + + fn scalar_expr(&self) -> &Expr { + self.scalar_expr + } + + // fn column_name(&self) -> &String { + // &self.column_name + // } + + fn is_stat_column_missing(&self, statistics_type: StatisticsType) -> bool { + self.stat_column_req + .iter() + .filter(|(c, t, _f)| c == &self.column_name && t == &statistics_type) + .count() + == 0 + } + + fn stat_column_expr( + &mut self, + stat_type: StatisticsType, + suffix: &str, + ) -> Result { + let stat_column_name = format!("{}_{}", self.column_name, suffix); + let stat_field = Field::new( + stat_column_name.as_str(), + self.parquet_field.data_type().clone(), + self.parquet_field.is_nullable(), + ); + if self.is_stat_column_missing(stat_type) { + // only add statistics column if not previously added + self.stat_column_req + .push((self.column_name.clone(), stat_type, stat_field)); + } + rewrite_column_expr( + self.column_expr, + self.column_name.as_str(), + stat_column_name.as_str(), + ) + } + + fn min_column_expr(&mut self) -> Result { + self.stat_column_expr(StatisticsType::Min, "min") + } + + fn max_column_expr(&mut self) -> Result { + self.stat_column_expr(StatisticsType::Max, "max") + } +} + +/// replaces a column with an old name with a new name in an expression +fn rewrite_column_expr( + expr: &Expr, + column_old_name: &str, + column_new_name: &str, +) -> Result { + let expressions = utils::expr_sub_expressions(&expr)?; + let expressions = expressions + .iter() + .map(|e| rewrite_column_expr(e, column_old_name, column_new_name)) + .collect::>>()?; + + if let Expr::Column(name) = expr { + if name == column_old_name { + return Ok(Expr::Column(column_new_name.to_string())); + } + } + utils::rewrite_expression(&expr, &expressions) +} + +/// Translate logical filter expression into parquet statistics predicate expression +fn build_predicate_expression( + expr: &Expr, + parquet_schema: &Schema, + stat_column_req: &mut Vec<(String, StatisticsType, Field)>, +) -> Result { + use crate::logical_plan; + // predicate expression can only be a binary expression + let (left, op, right) = match expr { + Expr::BinaryExpr { left, op, right } => (left, *op, right), + _ => { + // unsupported expression - replace with TRUE + // this can still be useful when multiple conditions are joined using AND + // such as: column > 10 AND TRUE + return Ok(logical_plan::lit(true)); + } + }; + + if op == Operator::And || op == Operator::Or { + let left_expr = + build_predicate_expression(left, parquet_schema, stat_column_req)?; + let right_expr = + build_predicate_expression(right, parquet_schema, stat_column_req)?; + return Ok(logical_plan::binary_expr(left_expr, op, right_expr)); + } + + let expr_builder = StatisticsExpressionBuilder::try_new( + left, + right, + parquet_schema, + stat_column_req, + ); + let mut expr_builder = match expr_builder { + Ok(builder) => builder, + // allow partial failure in predicate expression generation + // this can still produce a useful predicate when multiple conditions are joined using AND + Err(_) => { + return Ok(logical_plan::lit(true)); + } + }; + let corrected_op = expr_builder.correct_operator(op); + let statistics_expr = match corrected_op { + Operator::Eq => { + // column = literal => (min, max) = literal => min <= literal && literal <= max + // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2) + let min_column_expr = expr_builder.min_column_expr()?; + let max_column_expr = expr_builder.max_column_expr()?; + min_column_expr + .lt_eq(expr_builder.scalar_expr().clone()) + .and(expr_builder.scalar_expr().clone().lt_eq(max_column_expr)) + } + Operator::Gt => { + // column > literal => (min, max) > literal => max > literal + expr_builder + .max_column_expr()? + .gt(expr_builder.scalar_expr().clone()) + } + Operator::GtEq => { + // column >= literal => (min, max) >= literal => max >= literal + expr_builder + .max_column_expr()? + .gt_eq(expr_builder.scalar_expr().clone()) + } + Operator::Lt => { + // column < literal => (min, max) < literal => min < literal + expr_builder + .min_column_expr()? + .lt(expr_builder.scalar_expr().clone()) + } + Operator::LtEq => { + // column <= literal => (min, max) <= literal => min <= literal + expr_builder + .min_column_expr()? + .lt_eq(expr_builder.scalar_expr().clone()) + } + // other expressions are not supported + _ => logical_plan::lit(true), + }; + Ok(statistics_expr) +} + +#[derive(Debug, Copy, Clone, PartialEq)] +enum StatisticsType { + Min, + Max, +} + +fn build_statistics_array( + statistics: &[Option<&ParquetStatistics>], + statistics_type: StatisticsType, + data_type: &DataType, +) -> ArrayRef { + let statistics_count = statistics.len(); + let first_group_stats = statistics.iter().find(|s| s.is_some()); + let first_group_stats = if let Some(Some(statistics)) = first_group_stats { + // found first row group with statistics defined + statistics + } else { + // no row group has statistics defined + return new_null_array(data_type, statistics_count); + }; + + let (data_size, arrow_type) = match first_group_stats { + ParquetStatistics::Int32(_) => (std::mem::size_of::(), DataType::Int32), + ParquetStatistics::Int64(_) => (std::mem::size_of::(), DataType::Int64), + ParquetStatistics::Float(_) => (std::mem::size_of::(), DataType::Float32), + ParquetStatistics::Double(_) => (std::mem::size_of::(), DataType::Float64), + ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => { + (0, DataType::Utf8) + } + _ => { + // type of statistics not supported + return new_null_array(data_type, statistics_count); + } + }; + + let statistics = statistics.iter().map(|s| { + s.filter(|s| s.has_min_max_set()) + .map(|s| match statistics_type { + StatisticsType::Min => s.min_bytes(), + StatisticsType::Max => s.max_bytes(), + }) + }); + + if arrow_type == DataType::Utf8 { + let data_size = statistics + .clone() + .map(|x| x.map(|b| b.len()).unwrap_or(0)) + .sum(); + let mut builder = + arrow::array::StringBuilder::with_capacity(statistics_count, data_size); + let string_statistics = + statistics.map(|x| x.and_then(|bytes| std::str::from_utf8(bytes).ok())); + for maybe_string in string_statistics { + match maybe_string { + Some(string_value) => builder.append_value(string_value).unwrap(), + None => builder.append_null().unwrap(), + }; + } + return Arc::new(builder.finish()); + } + + let mut data_buffer = MutableBuffer::new(statistics_count * data_size); + let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count); + let mut null_count = 0; + for s in statistics { + if let Some(stat_data) = s { + bitmap_builder.append(true); + data_buffer.extend_from_slice(stat_data); + } else { + bitmap_builder.append(false); + data_buffer.resize(data_buffer.len() + data_size, 0); + null_count += 1; + } + } + + let mut builder = ArrayData::builder(arrow_type) + .len(statistics_count) + .add_buffer(data_buffer.into()); + if null_count > 0 { + builder = builder.null_bit_buffer(bitmap_builder.finish()); + } + let array_data = builder.build(); + let statistics_array = make_array(array_data); + if statistics_array.data_type() == data_type { + return statistics_array; + } + // cast statistics array to required data type + arrow::compute::cast(&statistics_array, data_type) + .unwrap_or_else(|_| new_null_array(data_type, statistics_count)) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::physical_optimizer::pruning::StatisticsType; + use arrow::{ + array::{Int32Array, StringArray}, + datatypes::DataType, + }; + use parquet::file::statistics::Statistics as ParquetStatistics; + + #[test] + fn build_statistics_array_int32() { + // build row group metadata array + let s1 = ParquetStatistics::int32(None, Some(10), None, 0, false); + let s2 = ParquetStatistics::int32(Some(2), Some(20), None, 0, false); + let s3 = ParquetStatistics::int32(Some(3), Some(30), None, 0, false); + let statistics = vec![Some(&s1), Some(&s2), Some(&s3)]; + + let statistics_array = + build_statistics_array(&statistics, StatisticsType::Min, &DataType::Int32); + let int32_array = statistics_array + .as_any() + .downcast_ref::() + .unwrap(); + let int32_vec = int32_array.into_iter().collect::>(); + assert_eq!(int32_vec, vec![None, Some(2), Some(3)]); + + let statistics_array = + build_statistics_array(&statistics, StatisticsType::Max, &DataType::Int32); + let int32_array = statistics_array + .as_any() + .downcast_ref::() + .unwrap(); + let int32_vec = int32_array.into_iter().collect::>(); + // here the first max value is None and not the Some(10) value which was actually set + // because the min value is None + assert_eq!(int32_vec, vec![None, Some(20), Some(30)]); + } + + #[test] + fn build_statistics_array_utf8() { + // build row group metadata array + let s1 = ParquetStatistics::byte_array(None, Some("10".into()), None, 0, false); + let s2 = ParquetStatistics::byte_array( + Some("2".into()), + Some("20".into()), + None, + 0, + false, + ); + let s3 = ParquetStatistics::byte_array( + Some("3".into()), + Some("30".into()), + None, + 0, + false, + ); + let statistics = vec![Some(&s1), Some(&s2), Some(&s3)]; + + let statistics_array = + build_statistics_array(&statistics, StatisticsType::Min, &DataType::Utf8); + let string_array = statistics_array + .as_any() + .downcast_ref::() + .unwrap(); + let string_vec = string_array.into_iter().collect::>(); + assert_eq!(string_vec, vec![None, Some("2"), Some("3")]); + + let statistics_array = + build_statistics_array(&statistics, StatisticsType::Max, &DataType::Utf8); + let string_array = statistics_array + .as_any() + .downcast_ref::() + .unwrap(); + let string_vec = string_array.into_iter().collect::>(); + // here the first max value is None and not the Some("10") value which was actually set + // because the min value is None + assert_eq!(string_vec, vec![None, Some("20"), Some("30")]); + } + + #[test] + fn build_statistics_array_empty_stats() { + let data_type = DataType::Int32; + let statistics = vec![]; + let statistics_array = + build_statistics_array(&statistics, StatisticsType::Min, &data_type); + assert_eq!(statistics_array.len(), 0); + + let statistics = vec![None, None]; + let statistics_array = + build_statistics_array(&statistics, StatisticsType::Min, &data_type); + assert_eq!(statistics_array.len(), statistics.len()); + assert_eq!(statistics_array.data_type(), &data_type); + for i in 0..statistics_array.len() { + assert_eq!(statistics_array.is_null(i), true); + assert_eq!(statistics_array.is_valid(i), false); + } + } + + #[test] + fn build_statistics_array_unsupported_type() { + // boolean is not currently a supported type for statistics + let s1 = ParquetStatistics::boolean(Some(false), Some(true), None, 0, false); + let s2 = ParquetStatistics::boolean(Some(false), Some(true), None, 0, false); + let statistics = vec![Some(&s1), Some(&s2)]; + let data_type = DataType::Boolean; + let statistics_array = + build_statistics_array(&statistics, StatisticsType::Min, &data_type); + assert_eq!(statistics_array.len(), statistics.len()); + assert_eq!(statistics_array.data_type(), &data_type); + for i in 0..statistics_array.len() { + assert_eq!(statistics_array.is_null(i), true); + assert_eq!(statistics_array.is_valid(i), false); + } + } + + #[test] + fn row_group_predicate_eq() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let expected_expr = "#c1_min LtEq Int32(1) And Int32(1) LtEq #c1_max"; + + // test column on the left + let expr = col("c1").eq(lit(1)); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + // test column on the right + let expr = lit(1).eq(col("c1")); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + Ok(()) + } + + #[test] + fn row_group_predicate_gt() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let expected_expr = "#c1_max Gt Int32(1)"; + + // test column on the left + let expr = col("c1").gt(lit(1)); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + // test column on the right + let expr = lit(1).lt(col("c1")); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + Ok(()) + } + + #[test] + fn row_group_predicate_gt_eq() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let expected_expr = "#c1_max GtEq Int32(1)"; + + // test column on the left + let expr = col("c1").gt_eq(lit(1)); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + // test column on the right + let expr = lit(1).lt_eq(col("c1")); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + Ok(()) + } + + #[test] + fn row_group_predicate_lt() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let expected_expr = "#c1_min Lt Int32(1)"; + + // test column on the left + let expr = col("c1").lt(lit(1)); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + // test column on the right + let expr = lit(1).gt(col("c1")); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + Ok(()) + } + + #[test] + fn row_group_predicate_lt_eq() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let expected_expr = "#c1_min LtEq Int32(1)"; + + // test column on the left + let expr = col("c1").lt_eq(lit(1)); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + // test column on the right + let expr = lit(1).gt_eq(col("c1")); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + Ok(()) + } + + #[test] + fn row_group_predicate_and() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![ + Field::new("c1", DataType::Int32, false), + Field::new("c2", DataType::Int32, false), + Field::new("c3", DataType::Int32, false), + ]); + // test AND operator joining supported c1 < 1 expression and unsupported c2 > c3 expression + let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3"))); + let expected_expr = "#c1_min Lt Int32(1) And Boolean(true)"; + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + Ok(()) + } + + #[test] + fn row_group_predicate_or() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![ + Field::new("c1", DataType::Int32, false), + Field::new("c2", DataType::Int32, false), + ]); + // test OR operator joining supported c1 < 1 expression and unsupported c2 % 2 expression + let expr = col("c1").lt(lit(1)).or(col("c2").modulus(lit(2))); + let expected_expr = "#c1_min Lt Int32(1) Or Boolean(true)"; + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + Ok(()) + } + + #[test] + fn row_group_predicate_stat_column_req() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![ + Field::new("c1", DataType::Int32, false), + Field::new("c2", DataType::Int32, false), + ]); + let mut stat_column_req = vec![]; + // c1 < 1 and (c2 = 2 or c2 = 3) + let expr = col("c1") + .lt(lit(1)) + .and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3)))); + let expected_expr = "#c1_min Lt Int32(1) And #c2_min LtEq Int32(2) And Int32(2) LtEq #c2_max Or #c2_min LtEq Int32(3) And Int32(3) LtEq #c2_max"; + let predicate_expr = + build_predicate_expression(&expr, &schema, &mut stat_column_req)?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + // c1 < 1 should add c1_min + let c1_min_field = Field::new("c1_min", DataType::Int32, false); + assert_eq!( + stat_column_req[0], + ("c1".to_owned(), StatisticsType::Min, c1_min_field) + ); + // c2 = 2 should add c2_min and c2_max + let c2_min_field = Field::new("c2_min", DataType::Int32, false); + assert_eq!( + stat_column_req[1], + ("c2".to_owned(), StatisticsType::Min, c2_min_field) + ); + let c2_max_field = Field::new("c2_max", DataType::Int32, false); + assert_eq!( + stat_column_req[2], + ("c2".to_owned(), StatisticsType::Max, c2_max_field) + ); + // c2 = 3 shouldn't add any new statistics fields + assert_eq!(stat_column_req.len(), 3); + + Ok(()) + } +} diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index dd5e77bc21eb..0c18aab434b8 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -17,38 +17,28 @@ //! Execution plan for reading Parquet files +use std::any::Any; use std::fmt; use std::fs::File; use std::sync::Arc; use std::task::{Context, Poll}; -use std::{any::Any, collections::HashSet}; -use super::{ - planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr, RecordBatchStream, - SendableRecordBatchStream, -}; -use crate::physical_plan::{common, DisplayFormatType, ExecutionPlan, Partitioning}; use crate::{ error::{DataFusionError, Result}, - execution::context::ExecutionContextState, - logical_plan::{Expr, Operator}, - optimizer::utils, + logical_plan::Expr, + physical_optimizer::pruning::RowGroupPredicateBuilder, + physical_plan::{ + common, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, + SendableRecordBatchStream, + }, }; -use arrow::record_batch::RecordBatch; + use arrow::{ - array::new_null_array, + datatypes::{Schema, SchemaRef}, error::{ArrowError, Result as ArrowResult}, + record_batch::RecordBatch, }; -use arrow::{ - array::{make_array, ArrayData, ArrayRef, BooleanArray, BooleanBufferBuilder}, - buffer::MutableBuffer, - datatypes::{DataType, Field, Schema, SchemaRef}, -}; -use parquet::file::{ - metadata::RowGroupMetaData, - reader::{FileReader, SerializedFileReader}, - statistics::Statistics as ParquetStatistics, -}; +use parquet::file::reader::{FileReader, SerializedFileReader}; use fmt::Debug; use parquet::arrow::{ArrowReader, ParquetFileArrowReader}; @@ -358,445 +348,6 @@ impl ParquetPartition { } } -#[derive(Debug, Clone)] -/// Predicate builder used for generating of predicate functions, used to filter row group metadata -pub struct RowGroupPredicateBuilder { - parquet_schema: Schema, - predicate_expr: Arc, - stat_column_req: Vec<(String, StatisticsType, Field)>, -} - -impl RowGroupPredicateBuilder { - /// Try to create a new instance of PredicateExpressionBuilder. - /// This will translate the filter expression into a statistics predicate expression - /// (for example (column / 2) = 4 becomes (column_min / 2) <= 4 && 4 <= (column_max / 2)), - /// then convert it to a DataFusion PhysicalExpression and cache it for later use by build_row_group_predicate. - pub fn try_new(expr: &Expr, parquet_schema: Schema) -> Result { - // build predicate expression once - let mut stat_column_req = Vec::<(String, StatisticsType, Field)>::new(); - let logical_predicate_expr = - build_predicate_expression(expr, &parquet_schema, &mut stat_column_req)?; - // println!( - // "RowGroupPredicateBuilder::try_new, logical_predicate_expr: {:?}", - // logical_predicate_expr - // ); - // build physical predicate expression - let stat_fields = stat_column_req - .iter() - .map(|(_, _, f)| f.clone()) - .collect::>(); - let stat_schema = Schema::new(stat_fields); - let execution_context_state = ExecutionContextState::new(); - let predicate_expr = DefaultPhysicalPlanner::default().create_physical_expr( - &logical_predicate_expr, - &stat_schema, - &execution_context_state, - )?; - // println!( - // "RowGroupPredicateBuilder::try_new, predicate_expr: {:?}", - // predicate_expr - // ); - Ok(Self { - parquet_schema, - predicate_expr, - stat_column_req, - }) - } - - /// Generate a predicate function used to filter row group metadata. - /// This function takes a list of all row groups as parameter, - /// so that DataFusion's physical expressions can be re-used by - /// generating a RecordBatch, containing statistics arrays, - /// on which the physical predicate expression is executed to generate a row group filter array. - /// The generated filter array is then used in the returned closure to filter row groups. - pub fn build_row_group_predicate( - &self, - row_group_metadata: &[RowGroupMetaData], - ) -> Box bool> { - // build statistics record batch - let predicate_result = build_statistics_record_batch( - row_group_metadata, - &self.parquet_schema, - &self.stat_column_req, - ) - .and_then(|statistics_batch| { - // execute predicate expression - self.predicate_expr.evaluate(&statistics_batch) - }) - .and_then(|v| match v { - ColumnarValue::Array(array) => Ok(array), - ColumnarValue::Scalar(_) => Err(DataFusionError::Plan( - "predicate expression didn't return an array".to_string(), - )), - }); - - let predicate_array = match predicate_result { - Ok(array) => array, - // row group filter array could not be built - // return a closure which will not filter out any row groups - _ => return Box::new(|_r, _i| true), - }; - - let predicate_array = predicate_array.as_any().downcast_ref::(); - match predicate_array { - // return row group predicate function - Some(array) => { - // when the result of the predicate expression for a row group is null / undefined, - // e.g. due to missing statistics, this row group can't be filtered out, - // so replace with true - let predicate_values = - array.iter().map(|x| x.unwrap_or(true)).collect::>(); - Box::new(move |_, i| predicate_values[i]) - } - // predicate result is not a BooleanArray - // return a closure which will not filter out any row groups - _ => Box::new(|_r, _i| true), - } - } -} - -/// Build a RecordBatch from a list of RowGroupMetadata structs, -/// creating arrays, one for each statistics column, -/// as requested in the stat_column_req parameter. -fn build_statistics_record_batch( - row_groups: &[RowGroupMetaData], - parquet_schema: &Schema, - stat_column_req: &[(String, StatisticsType, Field)], -) -> Result { - let mut fields = Vec::::new(); - let mut arrays = Vec::::new(); - for (column_name, statistics_type, stat_field) in stat_column_req { - if let Some((column_index, _)) = parquet_schema.column_with_name(column_name) { - let statistics = row_groups - .iter() - .map(|g| g.column(column_index).statistics()) - .collect::>(); - let array = build_statistics_array( - &statistics, - *statistics_type, - stat_field.data_type(), - ); - fields.push(stat_field.clone()); - arrays.push(array); - } - } - let schema = Arc::new(Schema::new(fields)); - RecordBatch::try_new(schema, arrays) - .map_err(|err| DataFusionError::Plan(err.to_string())) -} - -struct StatisticsExpressionBuilder<'a> { - column_name: String, - column_expr: &'a Expr, - scalar_expr: &'a Expr, - parquet_field: &'a Field, - stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>, - reverse_operator: bool, -} - -impl<'a> StatisticsExpressionBuilder<'a> { - fn try_new( - left: &'a Expr, - right: &'a Expr, - parquet_schema: &'a Schema, - stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>, - ) -> Result { - // find column name; input could be a more complicated expression - let mut left_columns = HashSet::::new(); - utils::expr_to_column_names(left, &mut left_columns)?; - let mut right_columns = HashSet::::new(); - utils::expr_to_column_names(right, &mut right_columns)?; - let (column_expr, scalar_expr, column_names, reverse_operator) = - match (left_columns.len(), right_columns.len()) { - (1, 0) => (left, right, left_columns, false), - (0, 1) => (right, left, right_columns, true), - _ => { - // if more than one column used in expression - not supported - return Err(DataFusionError::Plan( - "Multi-column expressions are not currently supported" - .to_string(), - )); - } - }; - let column_name = column_names.iter().next().unwrap().clone(); - let field = match parquet_schema.column_with_name(&column_name) { - Some((_, f)) => f, - _ => { - // field not found in parquet schema - return Err(DataFusionError::Plan( - "Field not found in parquet schema".to_string(), - )); - } - }; - - Ok(Self { - column_name, - column_expr, - scalar_expr, - parquet_field: field, - stat_column_req, - reverse_operator, - }) - } - - fn correct_operator(&self, op: Operator) -> Operator { - if !self.reverse_operator { - return op; - } - - match op { - Operator::Lt => Operator::Gt, - Operator::Gt => Operator::Lt, - Operator::LtEq => Operator::GtEq, - Operator::GtEq => Operator::LtEq, - _ => op, - } - } - - // fn column_expr(&self) -> &Expr { - // self.column_expr - // } - - fn scalar_expr(&self) -> &Expr { - self.scalar_expr - } - - // fn column_name(&self) -> &String { - // &self.column_name - // } - - fn is_stat_column_missing(&self, statistics_type: StatisticsType) -> bool { - self.stat_column_req - .iter() - .filter(|(c, t, _f)| c == &self.column_name && t == &statistics_type) - .count() - == 0 - } - - fn stat_column_expr( - &mut self, - stat_type: StatisticsType, - suffix: &str, - ) -> Result { - let stat_column_name = format!("{}_{}", self.column_name, suffix); - let stat_field = Field::new( - stat_column_name.as_str(), - self.parquet_field.data_type().clone(), - self.parquet_field.is_nullable(), - ); - if self.is_stat_column_missing(stat_type) { - // only add statistics column if not previously added - self.stat_column_req - .push((self.column_name.clone(), stat_type, stat_field)); - } - rewrite_column_expr( - self.column_expr, - self.column_name.as_str(), - stat_column_name.as_str(), - ) - } - - fn min_column_expr(&mut self) -> Result { - self.stat_column_expr(StatisticsType::Min, "min") - } - - fn max_column_expr(&mut self) -> Result { - self.stat_column_expr(StatisticsType::Max, "max") - } -} - -/// replaces a column with an old name with a new name in an expression -fn rewrite_column_expr( - expr: &Expr, - column_old_name: &str, - column_new_name: &str, -) -> Result { - let expressions = utils::expr_sub_expressions(&expr)?; - let expressions = expressions - .iter() - .map(|e| rewrite_column_expr(e, column_old_name, column_new_name)) - .collect::>>()?; - - if let Expr::Column(name) = expr { - if name == column_old_name { - return Ok(Expr::Column(column_new_name.to_string())); - } - } - utils::rewrite_expression(&expr, &expressions) -} - -/// Translate logical filter expression into parquet statistics predicate expression -fn build_predicate_expression( - expr: &Expr, - parquet_schema: &Schema, - stat_column_req: &mut Vec<(String, StatisticsType, Field)>, -) -> Result { - use crate::logical_plan; - // predicate expression can only be a binary expression - let (left, op, right) = match expr { - Expr::BinaryExpr { left, op, right } => (left, *op, right), - _ => { - // unsupported expression - replace with TRUE - // this can still be useful when multiple conditions are joined using AND - // such as: column > 10 AND TRUE - return Ok(logical_plan::lit(true)); - } - }; - - if op == Operator::And || op == Operator::Or { - let left_expr = - build_predicate_expression(left, parquet_schema, stat_column_req)?; - let right_expr = - build_predicate_expression(right, parquet_schema, stat_column_req)?; - return Ok(logical_plan::binary_expr(left_expr, op, right_expr)); - } - - let expr_builder = StatisticsExpressionBuilder::try_new( - left, - right, - parquet_schema, - stat_column_req, - ); - let mut expr_builder = match expr_builder { - Ok(builder) => builder, - // allow partial failure in predicate expression generation - // this can still produce a useful predicate when multiple conditions are joined using AND - Err(_) => { - return Ok(logical_plan::lit(true)); - } - }; - let corrected_op = expr_builder.correct_operator(op); - let statistics_expr = match corrected_op { - Operator::Eq => { - // column = literal => (min, max) = literal => min <= literal && literal <= max - // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2) - let min_column_expr = expr_builder.min_column_expr()?; - let max_column_expr = expr_builder.max_column_expr()?; - min_column_expr - .lt_eq(expr_builder.scalar_expr().clone()) - .and(expr_builder.scalar_expr().clone().lt_eq(max_column_expr)) - } - Operator::Gt => { - // column > literal => (min, max) > literal => max > literal - expr_builder - .max_column_expr()? - .gt(expr_builder.scalar_expr().clone()) - } - Operator::GtEq => { - // column >= literal => (min, max) >= literal => max >= literal - expr_builder - .max_column_expr()? - .gt_eq(expr_builder.scalar_expr().clone()) - } - Operator::Lt => { - // column < literal => (min, max) < literal => min < literal - expr_builder - .min_column_expr()? - .lt(expr_builder.scalar_expr().clone()) - } - Operator::LtEq => { - // column <= literal => (min, max) <= literal => min <= literal - expr_builder - .min_column_expr()? - .lt_eq(expr_builder.scalar_expr().clone()) - } - // other expressions are not supported - _ => logical_plan::lit(true), - }; - Ok(statistics_expr) -} - -#[derive(Debug, Copy, Clone, PartialEq)] -enum StatisticsType { - Min, - Max, -} - -fn build_statistics_array( - statistics: &[Option<&ParquetStatistics>], - statistics_type: StatisticsType, - data_type: &DataType, -) -> ArrayRef { - let statistics_count = statistics.len(); - let first_group_stats = statistics.iter().find(|s| s.is_some()); - let first_group_stats = if let Some(Some(statistics)) = first_group_stats { - // found first row group with statistics defined - statistics - } else { - // no row group has statistics defined - return new_null_array(data_type, statistics_count); - }; - - let (data_size, arrow_type) = match first_group_stats { - ParquetStatistics::Int32(_) => (std::mem::size_of::(), DataType::Int32), - ParquetStatistics::Int64(_) => (std::mem::size_of::(), DataType::Int64), - ParquetStatistics::Float(_) => (std::mem::size_of::(), DataType::Float32), - ParquetStatistics::Double(_) => (std::mem::size_of::(), DataType::Float64), - ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => { - (0, DataType::Utf8) - } - _ => { - // type of statistics not supported - return new_null_array(data_type, statistics_count); - } - }; - - let statistics = statistics.iter().map(|s| { - s.filter(|s| s.has_min_max_set()) - .map(|s| match statistics_type { - StatisticsType::Min => s.min_bytes(), - StatisticsType::Max => s.max_bytes(), - }) - }); - - if arrow_type == DataType::Utf8 { - let data_size = statistics - .clone() - .map(|x| x.map(|b| b.len()).unwrap_or(0)) - .sum(); - let mut builder = - arrow::array::StringBuilder::with_capacity(statistics_count, data_size); - let string_statistics = - statistics.map(|x| x.and_then(|bytes| std::str::from_utf8(bytes).ok())); - for maybe_string in string_statistics { - match maybe_string { - Some(string_value) => builder.append_value(string_value).unwrap(), - None => builder.append_null().unwrap(), - }; - } - return Arc::new(builder.finish()); - } - - let mut data_buffer = MutableBuffer::new(statistics_count * data_size); - let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count); - let mut null_count = 0; - for s in statistics { - if let Some(stat_data) = s { - bitmap_builder.append(true); - data_buffer.extend_from_slice(stat_data); - } else { - bitmap_builder.append(false); - data_buffer.resize(data_buffer.len() + data_size, 0); - null_count += 1; - } - } - - let mut builder = ArrayData::builder(arrow_type) - .len(statistics_count) - .add_buffer(data_buffer.into()); - if null_count > 0 { - builder = builder.null_bit_buffer(bitmap_builder.finish()); - } - let array_data = builder.build(); - let statistics_array = make_array(array_data); - if statistics_array.data_type() == data_type { - return statistics_array; - } - // cast statistics array to required data type - arrow::compute::cast(&statistics_array, data_type) - .unwrap_or_else(|_| new_null_array(data_type, statistics_count)) -} - #[async_trait] impl ExecutionPlan for ParquetExec { /// Return a reference to Any that can be used for downcasting @@ -992,10 +543,13 @@ impl RecordBatchStream for ParquetStream { #[cfg(test)] mod tests { use super::*; - use arrow::array::{Int32Array, StringArray}; + use arrow::datatypes::{DataType, Field}; use futures::StreamExt; - use parquet::basic::Type as PhysicalType; - use parquet::schema::types::SchemaDescPtr; + use parquet::{ + basic::Type as PhysicalType, + file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics}, + schema::types::SchemaDescPtr, + }; #[test] fn test_split_files() { @@ -1070,277 +624,6 @@ mod tests { Ok(()) } - #[test] - fn build_statistics_array_int32() { - // build row group metadata array - let s1 = ParquetStatistics::int32(None, Some(10), None, 0, false); - let s2 = ParquetStatistics::int32(Some(2), Some(20), None, 0, false); - let s3 = ParquetStatistics::int32(Some(3), Some(30), None, 0, false); - let statistics = vec![Some(&s1), Some(&s2), Some(&s3)]; - - let statistics_array = - build_statistics_array(&statistics, StatisticsType::Min, &DataType::Int32); - let int32_array = statistics_array - .as_any() - .downcast_ref::() - .unwrap(); - let int32_vec = int32_array.into_iter().collect::>(); - assert_eq!(int32_vec, vec![None, Some(2), Some(3)]); - - let statistics_array = - build_statistics_array(&statistics, StatisticsType::Max, &DataType::Int32); - let int32_array = statistics_array - .as_any() - .downcast_ref::() - .unwrap(); - let int32_vec = int32_array.into_iter().collect::>(); - // here the first max value is None and not the Some(10) value which was actually set - // because the min value is None - assert_eq!(int32_vec, vec![None, Some(20), Some(30)]); - } - - #[test] - fn build_statistics_array_utf8() { - // build row group metadata array - let s1 = ParquetStatistics::byte_array(None, Some("10".into()), None, 0, false); - let s2 = ParquetStatistics::byte_array( - Some("2".into()), - Some("20".into()), - None, - 0, - false, - ); - let s3 = ParquetStatistics::byte_array( - Some("3".into()), - Some("30".into()), - None, - 0, - false, - ); - let statistics = vec![Some(&s1), Some(&s2), Some(&s3)]; - - let statistics_array = - build_statistics_array(&statistics, StatisticsType::Min, &DataType::Utf8); - let string_array = statistics_array - .as_any() - .downcast_ref::() - .unwrap(); - let string_vec = string_array.into_iter().collect::>(); - assert_eq!(string_vec, vec![None, Some("2"), Some("3")]); - - let statistics_array = - build_statistics_array(&statistics, StatisticsType::Max, &DataType::Utf8); - let string_array = statistics_array - .as_any() - .downcast_ref::() - .unwrap(); - let string_vec = string_array.into_iter().collect::>(); - // here the first max value is None and not the Some("10") value which was actually set - // because the min value is None - assert_eq!(string_vec, vec![None, Some("20"), Some("30")]); - } - - #[test] - fn build_statistics_array_empty_stats() { - let data_type = DataType::Int32; - let statistics = vec![]; - let statistics_array = - build_statistics_array(&statistics, StatisticsType::Min, &data_type); - assert_eq!(statistics_array.len(), 0); - - let statistics = vec![None, None]; - let statistics_array = - build_statistics_array(&statistics, StatisticsType::Min, &data_type); - assert_eq!(statistics_array.len(), statistics.len()); - assert_eq!(statistics_array.data_type(), &data_type); - for i in 0..statistics_array.len() { - assert_eq!(statistics_array.is_null(i), true); - assert_eq!(statistics_array.is_valid(i), false); - } - } - - #[test] - fn build_statistics_array_unsupported_type() { - // boolean is not currently a supported type for statistics - let s1 = ParquetStatistics::boolean(Some(false), Some(true), None, 0, false); - let s2 = ParquetStatistics::boolean(Some(false), Some(true), None, 0, false); - let statistics = vec![Some(&s1), Some(&s2)]; - let data_type = DataType::Boolean; - let statistics_array = - build_statistics_array(&statistics, StatisticsType::Min, &data_type); - assert_eq!(statistics_array.len(), statistics.len()); - assert_eq!(statistics_array.data_type(), &data_type); - for i in 0..statistics_array.len() { - assert_eq!(statistics_array.is_null(i), true); - assert_eq!(statistics_array.is_valid(i), false); - } - } - - #[test] - fn row_group_predicate_eq() -> Result<()> { - use crate::logical_plan::{col, lit}; - let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); - let expected_expr = "#c1_min LtEq Int32(1) And Int32(1) LtEq #c1_max"; - - // test column on the left - let expr = col("c1").eq(lit(1)); - let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; - assert_eq!(format!("{:?}", predicate_expr), expected_expr); - - // test column on the right - let expr = lit(1).eq(col("c1")); - let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; - assert_eq!(format!("{:?}", predicate_expr), expected_expr); - - Ok(()) - } - - #[test] - fn row_group_predicate_gt() -> Result<()> { - use crate::logical_plan::{col, lit}; - let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); - let expected_expr = "#c1_max Gt Int32(1)"; - - // test column on the left - let expr = col("c1").gt(lit(1)); - let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; - assert_eq!(format!("{:?}", predicate_expr), expected_expr); - - // test column on the right - let expr = lit(1).lt(col("c1")); - let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; - assert_eq!(format!("{:?}", predicate_expr), expected_expr); - - Ok(()) - } - - #[test] - fn row_group_predicate_gt_eq() -> Result<()> { - use crate::logical_plan::{col, lit}; - let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); - let expected_expr = "#c1_max GtEq Int32(1)"; - - // test column on the left - let expr = col("c1").gt_eq(lit(1)); - let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; - assert_eq!(format!("{:?}", predicate_expr), expected_expr); - // test column on the right - let expr = lit(1).lt_eq(col("c1")); - let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; - assert_eq!(format!("{:?}", predicate_expr), expected_expr); - - Ok(()) - } - - #[test] - fn row_group_predicate_lt() -> Result<()> { - use crate::logical_plan::{col, lit}; - let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); - let expected_expr = "#c1_min Lt Int32(1)"; - - // test column on the left - let expr = col("c1").lt(lit(1)); - let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; - assert_eq!(format!("{:?}", predicate_expr), expected_expr); - - // test column on the right - let expr = lit(1).gt(col("c1")); - let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; - assert_eq!(format!("{:?}", predicate_expr), expected_expr); - - Ok(()) - } - - #[test] - fn row_group_predicate_lt_eq() -> Result<()> { - use crate::logical_plan::{col, lit}; - let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); - let expected_expr = "#c1_min LtEq Int32(1)"; - - // test column on the left - let expr = col("c1").lt_eq(lit(1)); - let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; - assert_eq!(format!("{:?}", predicate_expr), expected_expr); - // test column on the right - let expr = lit(1).gt_eq(col("c1")); - let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; - assert_eq!(format!("{:?}", predicate_expr), expected_expr); - - Ok(()) - } - - #[test] - fn row_group_predicate_and() -> Result<()> { - use crate::logical_plan::{col, lit}; - let schema = Schema::new(vec![ - Field::new("c1", DataType::Int32, false), - Field::new("c2", DataType::Int32, false), - Field::new("c3", DataType::Int32, false), - ]); - // test AND operator joining supported c1 < 1 expression and unsupported c2 > c3 expression - let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3"))); - let expected_expr = "#c1_min Lt Int32(1) And Boolean(true)"; - let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; - assert_eq!(format!("{:?}", predicate_expr), expected_expr); - - Ok(()) - } - - #[test] - fn row_group_predicate_or() -> Result<()> { - use crate::logical_plan::{col, lit}; - let schema = Schema::new(vec![ - Field::new("c1", DataType::Int32, false), - Field::new("c2", DataType::Int32, false), - ]); - // test OR operator joining supported c1 < 1 expression and unsupported c2 % 2 expression - let expr = col("c1").lt(lit(1)).or(col("c2").modulus(lit(2))); - let expected_expr = "#c1_min Lt Int32(1) Or Boolean(true)"; - let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; - assert_eq!(format!("{:?}", predicate_expr), expected_expr); - - Ok(()) - } - - #[test] - fn row_group_predicate_stat_column_req() -> Result<()> { - use crate::logical_plan::{col, lit}; - let schema = Schema::new(vec![ - Field::new("c1", DataType::Int32, false), - Field::new("c2", DataType::Int32, false), - ]); - let mut stat_column_req = vec![]; - // c1 < 1 and (c2 = 2 or c2 = 3) - let expr = col("c1") - .lt(lit(1)) - .and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3)))); - let expected_expr = "#c1_min Lt Int32(1) And #c2_min LtEq Int32(2) And Int32(2) LtEq #c2_max Or #c2_min LtEq Int32(3) And Int32(3) LtEq #c2_max"; - let predicate_expr = - build_predicate_expression(&expr, &schema, &mut stat_column_req)?; - assert_eq!(format!("{:?}", predicate_expr), expected_expr); - // c1 < 1 should add c1_min - let c1_min_field = Field::new("c1_min", DataType::Int32, false); - assert_eq!( - stat_column_req[0], - ("c1".to_owned(), StatisticsType::Min, c1_min_field) - ); - // c2 = 2 should add c2_min and c2_max - let c2_min_field = Field::new("c2_min", DataType::Int32, false); - assert_eq!( - stat_column_req[1], - ("c2".to_owned(), StatisticsType::Min, c2_min_field) - ); - let c2_max_field = Field::new("c2_max", DataType::Int32, false); - assert_eq!( - stat_column_req[2], - ("c2".to_owned(), StatisticsType::Max, c2_max_field) - ); - // c2 = 3 shouldn't add any new statistics fields - assert_eq!(stat_column_req.len(), 3); - - Ok(()) - } - #[test] fn row_group_predicate_builder_simple_expr() -> Result<()> { use crate::logical_plan::{col, lit}; From 53a77a30ef509f5372105591c3d6af909ab6b792 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 19 May 2021 14:43:23 -0400 Subject: [PATCH 2/5] Rename RowGroup Predicates --> Pruning Predicates, touch up comments --- datafusion/src/physical_optimizer/pruning.rs | 98 ++++++++++---------- datafusion/src/physical_plan/parquet.rs | 32 +++---- 2 files changed, 65 insertions(+), 65 deletions(-) diff --git a/datafusion/src/physical_optimizer/pruning.rs b/datafusion/src/physical_optimizer/pruning.rs index 1690cfbe67f4..b40d7ffe6891 100644 --- a/datafusion/src/physical_optimizer/pruning.rs +++ b/datafusion/src/physical_optimizer/pruning.rs @@ -18,6 +18,10 @@ use arrow::{ record_batch::RecordBatch, }; +use parquet::file::{ + metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics, +}; + use crate::{ error::{DataFusionError, Result}, execution::context::ExecutionContextState, @@ -26,30 +30,28 @@ use crate::{ physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr}, }; -use parquet::file::{ - metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics, -}; - #[derive(Debug, Clone)] -/// Predicate builder used for generating of predicate functions, used to filter row group metadata -pub struct RowGroupPredicateBuilder { - parquet_schema: Schema, +/// Builder used for generating predicate functions that can be used +/// to prune data based on statistics (e.g. parquet row group metadata) +pub struct PruningPredicateBuilder { + schema: Schema, predicate_expr: Arc, stat_column_req: Vec<(String, StatisticsType, Field)>, } -impl RowGroupPredicateBuilder { - /// Try to create a new instance of PredicateExpressionBuilder. +impl PruningPredicateBuilder { + /// Try to create a new instance of [`PruningPredicateBuilder`] + /// /// This will translate the filter expression into a statistics predicate expression - /// (for example (column / 2) = 4 becomes (column_min / 2) <= 4 && 4 <= (column_max / 2)), - /// then convert it to a DataFusion PhysicalExpression and cache it for later use by build_row_group_predicate. - pub fn try_new(expr: &Expr, parquet_schema: Schema) -> Result { + /// + /// For example, `(column / 2) = 4` becomes `(column_min / 2) <= 4 && 4 <= (column_max / 2))` + pub fn try_new(expr: &Expr, schema: Schema) -> Result { // build predicate expression once let mut stat_column_req = Vec::<(String, StatisticsType, Field)>::new(); let logical_predicate_expr = - build_predicate_expression(expr, &parquet_schema, &mut stat_column_req)?; + build_predicate_expression(expr, &schema, &mut stat_column_req)?; // println!( - // "RowGroupPredicateBuilder::try_new, logical_predicate_expr: {:?}", + // "PruningPredicateBuilder::try_new, logical_predicate_expr: {:?}", // logical_predicate_expr // ); // build physical predicate expression @@ -65,30 +67,35 @@ impl RowGroupPredicateBuilder { &execution_context_state, )?; // println!( - // "RowGroupPredicateBuilder::try_new, predicate_expr: {:?}", + // "PruningPredicateBuilder::try_new, predicate_expr: {:?}", // predicate_expr // ); Ok(Self { - parquet_schema, + schema, predicate_expr, stat_column_req, }) } - /// Generate a predicate function used to filter row group metadata. - /// This function takes a list of all row groups as parameter, - /// so that DataFusion's physical expressions can be re-used by - /// generating a RecordBatch, containing statistics arrays, - /// on which the physical predicate expression is executed to generate a row group filter array. - /// The generated filter array is then used in the returned closure to filter row groups. - pub fn build_row_group_predicate( + /// Generate a predicate function used to filter based on + /// statistics + /// + /// This function takes a slice of statistics as parameter, so + /// that DataFusion's physical expressions can be executed once + /// against a single RecordBatch, containing statistics arrays, on + /// which the physical predicate expression is executed to + /// generate a row group filter array. + /// + /// The generated filter function is then used in the returned + /// closure to filter row groups. NOTE this is parquet specific at the moment + pub fn build_pruning_predicate( &self, row_group_metadata: &[RowGroupMetaData], ) -> Box bool> { // build statistics record batch let predicate_result = build_statistics_record_batch( row_group_metadata, - &self.parquet_schema, + &self.schema, &self.stat_column_req, ) .and_then(|statistics_batch| { @@ -127,18 +134,18 @@ impl RowGroupPredicateBuilder { } } -/// Build a RecordBatch from a list of RowGroupMetadata structs, -/// creating arrays, one for each statistics column, -/// as requested in the stat_column_req parameter. +/// Build a RecordBatch from a list of statistics (currently parquet +/// [`RowGroupMetadata`] structs), creating arrays, one for each +/// statistics column, as requested in the stat_column_req parameter. fn build_statistics_record_batch( row_groups: &[RowGroupMetaData], - parquet_schema: &Schema, + schema: &Schema, stat_column_req: &[(String, StatisticsType, Field)], ) -> Result { let mut fields = Vec::::new(); let mut arrays = Vec::::new(); for (column_name, statistics_type, stat_field) in stat_column_req { - if let Some((column_index, _)) = parquet_schema.column_with_name(column_name) { + if let Some((column_index, _)) = schema.column_with_name(column_name) { let statistics = row_groups .iter() .map(|g| g.column(column_index).statistics()) @@ -161,7 +168,7 @@ struct StatisticsExpressionBuilder<'a> { column_name: String, column_expr: &'a Expr, scalar_expr: &'a Expr, - parquet_field: &'a Field, + field: &'a Field, stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>, reverse_operator: bool, } @@ -170,7 +177,7 @@ impl<'a> StatisticsExpressionBuilder<'a> { fn try_new( left: &'a Expr, right: &'a Expr, - parquet_schema: &'a Schema, + schema: &'a Schema, stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>, ) -> Result { // find column name; input could be a more complicated expression @@ -191,12 +198,11 @@ impl<'a> StatisticsExpressionBuilder<'a> { } }; let column_name = column_names.iter().next().unwrap().clone(); - let field = match parquet_schema.column_with_name(&column_name) { + let field = match schema.column_with_name(&column_name) { Some((_, f)) => f, _ => { - // field not found in parquet schema return Err(DataFusionError::Plan( - "Field not found in parquet schema".to_string(), + "Field not found in schema".to_string(), )); } }; @@ -205,7 +211,7 @@ impl<'a> StatisticsExpressionBuilder<'a> { column_name, column_expr, scalar_expr, - parquet_field: field, + field, stat_column_req, reverse_operator, }) @@ -253,8 +259,8 @@ impl<'a> StatisticsExpressionBuilder<'a> { let stat_column_name = format!("{}_{}", self.column_name, suffix); let stat_field = Field::new( stat_column_name.as_str(), - self.parquet_field.data_type().clone(), - self.parquet_field.is_nullable(), + self.field.data_type().clone(), + self.field.is_nullable(), ); if self.is_stat_column_missing(stat_type) { // only add statistics column if not previously added @@ -297,10 +303,10 @@ fn rewrite_column_expr( utils::rewrite_expression(&expr, &expressions) } -/// Translate logical filter expression into parquet statistics predicate expression +/// Translate logical filter expression into statistics predicate expression fn build_predicate_expression( expr: &Expr, - parquet_schema: &Schema, + schema: &Schema, stat_column_req: &mut Vec<(String, StatisticsType, Field)>, ) -> Result { use crate::logical_plan; @@ -316,19 +322,13 @@ fn build_predicate_expression( }; if op == Operator::And || op == Operator::Or { - let left_expr = - build_predicate_expression(left, parquet_schema, stat_column_req)?; - let right_expr = - build_predicate_expression(right, parquet_schema, stat_column_req)?; + let left_expr = build_predicate_expression(left, schema, stat_column_req)?; + let right_expr = build_predicate_expression(right, schema, stat_column_req)?; return Ok(logical_plan::binary_expr(left_expr, op, right_expr)); } - let expr_builder = StatisticsExpressionBuilder::try_new( - left, - right, - parquet_schema, - stat_column_req, - ); + let expr_builder = + StatisticsExpressionBuilder::try_new(left, right, schema, stat_column_req); let mut expr_builder = match expr_builder { Ok(builder) => builder, // allow partial failure in predicate expression generation diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index 0c18aab434b8..66b1253db3d4 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -26,7 +26,7 @@ use std::task::{Context, Poll}; use crate::{ error::{DataFusionError, Result}, logical_plan::Expr, - physical_optimizer::pruning::RowGroupPredicateBuilder, + physical_optimizer::pruning::PruningPredicateBuilder, physical_plan::{ common, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, @@ -66,7 +66,7 @@ pub struct ParquetExec { /// Statistics for the data set (sum of statistics for all partitions) statistics: Statistics, /// Optional predicate builder - predicate_builder: Option, + predicate_builder: Option, /// Optional limit of the number of rows limit: Option, } @@ -219,7 +219,7 @@ impl ParquetExec { } let schema = schemas[0].clone(); let predicate_builder = predicate.and_then(|predicate_expr| { - RowGroupPredicateBuilder::try_new(&predicate_expr, schema.clone()).ok() + PruningPredicateBuilder::try_new(&predicate_expr, schema.clone()).ok() }); Ok(Self::new( @@ -237,7 +237,7 @@ impl ParquetExec { partitions: Vec, schema: Schema, projection: Option>, - predicate_builder: Option, + predicate_builder: Option, batch_size: usize, limit: Option, ) -> Self { @@ -457,7 +457,7 @@ fn send_result( fn read_files( filenames: &[String], projection: &[usize], - predicate_builder: &Option, + predicate_builder: &Option, batch_size: usize, response_tx: Sender>, limit: Option, @@ -468,7 +468,7 @@ fn read_files( let mut file_reader = SerializedFileReader::new(file)?; if let Some(predicate_builder) = predicate_builder { let row_group_predicate = predicate_builder - .build_row_group_predicate(file_reader.metadata().row_groups()); + .build_pruning_predicate(file_reader.metadata().row_groups()); file_reader.filter_row_groups(&row_group_predicate); } let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); @@ -630,7 +630,7 @@ mod tests { // int > 1 => c1_max > 1 let expr = col("c1").gt(lit(15)); let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); - let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema)?; + let predicate_builder = PruningPredicateBuilder::try_new(&expr, schema)?; let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]); let rgm1 = get_row_group_meta_data( @@ -643,7 +643,7 @@ mod tests { ); let row_group_metadata = vec![rgm1, rgm2]; let row_group_predicate = - predicate_builder.build_row_group_predicate(&row_group_metadata); + predicate_builder.build_pruning_predicate(&row_group_metadata); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -660,7 +660,7 @@ mod tests { // int > 1 => c1_max > 1 let expr = col("c1").gt(lit(15)); let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); - let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema)?; + let predicate_builder = PruningPredicateBuilder::try_new(&expr, schema)?; let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]); let rgm1 = get_row_group_meta_data( @@ -673,7 +673,7 @@ mod tests { ); let row_group_metadata = vec![rgm1, rgm2]; let row_group_predicate = - predicate_builder.build_row_group_predicate(&row_group_metadata); + predicate_builder.build_pruning_predicate(&row_group_metadata); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -696,7 +696,7 @@ mod tests { Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Int32, false), ]); - let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema.clone())?; + let predicate_builder = PruningPredicateBuilder::try_new(&expr, schema.clone())?; let schema_descr = get_test_schema_descr(vec![ ("c1", PhysicalType::INT32), @@ -718,7 +718,7 @@ mod tests { ); let row_group_metadata = vec![rgm1, rgm2]; let row_group_predicate = - predicate_builder.build_row_group_predicate(&row_group_metadata); + predicate_builder.build_pruning_predicate(&row_group_metadata); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -731,9 +731,9 @@ mod tests { // if conditions in predicate are joined with OR and an unsupported expression is used // this bypasses the entire predicate expression and no row groups are filtered out let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2))); - let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema)?; + let predicate_builder = PruningPredicateBuilder::try_new(&expr, schema)?; let row_group_predicate = - predicate_builder.build_row_group_predicate(&row_group_metadata); + predicate_builder.build_pruning_predicate(&row_group_metadata); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -755,7 +755,7 @@ mod tests { Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Boolean, false), ]); - let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema)?; + let predicate_builder = PruningPredicateBuilder::try_new(&expr, schema)?; let schema_descr = get_test_schema_descr(vec![ ("c1", PhysicalType::INT32), @@ -777,7 +777,7 @@ mod tests { ); let row_group_metadata = vec![rgm1, rgm2]; let row_group_predicate = - predicate_builder.build_row_group_predicate(&row_group_metadata); + predicate_builder.build_pruning_predicate(&row_group_metadata); let row_group_filter = row_group_metadata .iter() .enumerate() From cc98907e0862112f062feb80433095d433ee3d16 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 19 May 2021 14:53:26 -0400 Subject: [PATCH 3/5] add rat --- datafusion/src/physical_optimizer/pruning.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/datafusion/src/physical_optimizer/pruning.rs b/datafusion/src/physical_optimizer/pruning.rs index b40d7ffe6891..c91cc020ee9d 100644 --- a/datafusion/src/physical_optimizer/pruning.rs +++ b/datafusion/src/physical_optimizer/pruning.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + //! This module contains code to rule out row groups / partitions / //! etc based on statistics prior in order to skip evaluating entire //! swaths of rows. From 688dde41b748801183d3484c6116442c22bdf591 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 20 May 2021 08:51:37 -0400 Subject: [PATCH 4/5] Remove commented out println! and code --- datafusion/src/physical_optimizer/pruning.rs | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/datafusion/src/physical_optimizer/pruning.rs b/datafusion/src/physical_optimizer/pruning.rs index c91cc020ee9d..fd08f225e0e0 100644 --- a/datafusion/src/physical_optimizer/pruning.rs +++ b/datafusion/src/physical_optimizer/pruning.rs @@ -67,11 +67,6 @@ impl PruningPredicateBuilder { let mut stat_column_req = Vec::<(String, StatisticsType, Field)>::new(); let logical_predicate_expr = build_predicate_expression(expr, &schema, &mut stat_column_req)?; - // println!( - // "PruningPredicateBuilder::try_new, logical_predicate_expr: {:?}", - // logical_predicate_expr - // ); - // build physical predicate expression let stat_fields = stat_column_req .iter() .map(|(_, _, f)| f.clone()) @@ -83,10 +78,6 @@ impl PruningPredicateBuilder { &stat_schema, &execution_context_state, )?; - // println!( - // "PruningPredicateBuilder::try_new, predicate_expr: {:?}", - // predicate_expr - // ); Ok(Self { schema, predicate_expr, @@ -248,18 +239,10 @@ impl<'a> StatisticsExpressionBuilder<'a> { } } - // fn column_expr(&self) -> &Expr { - // self.column_expr - // } - fn scalar_expr(&self) -> &Expr { self.scalar_expr } - // fn column_name(&self) -> &String { - // &self.column_name - // } - fn is_stat_column_missing(&self, statistics_type: StatisticsType) -> bool { self.stat_column_req .iter() From 2c6a23b58f6097dc17092413f4a384713e03973d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 20 May 2021 08:53:56 -0400 Subject: [PATCH 5/5] use .any instead of filter/count --- datafusion/src/physical_optimizer/pruning.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/datafusion/src/physical_optimizer/pruning.rs b/datafusion/src/physical_optimizer/pruning.rs index fd08f225e0e0..a13ca56630bc 100644 --- a/datafusion/src/physical_optimizer/pruning.rs +++ b/datafusion/src/physical_optimizer/pruning.rs @@ -244,11 +244,10 @@ impl<'a> StatisticsExpressionBuilder<'a> { } fn is_stat_column_missing(&self, statistics_type: StatisticsType) -> bool { - self.stat_column_req + !self + .stat_column_req .iter() - .filter(|(c, t, _f)| c == &self.column_name && t == &statistics_type) - .count() - == 0 + .any(|(c, t, _f)| c == &self.column_name && t == &statistics_type) } fn stat_column_expr(