From 9a2dae0c83e935e68eb7e54c567e5051b6ab885b Mon Sep 17 00:00:00 2001 From: lvheyang Date: Mon, 19 Jul 2021 00:14:15 +0800 Subject: [PATCH] step1 add option in ExecutionConfig to enable/disable parquet pruning --- datafusion/src/datasource/parquet.rs | 22 +++++++++++++++++++- datafusion/src/execution/context.rs | 18 ++++++++++++---- datafusion/src/physical_optimizer/pruning.rs | 4 ++-- 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs index e53fbbdefd2f..28f79a6ae8dd 100644 --- a/datafusion/src/datasource/parquet.rs +++ b/datafusion/src/datasource/parquet.rs @@ -38,6 +38,7 @@ pub struct ParquetTable { schema: SchemaRef, statistics: Statistics, max_concurrency: usize, + enable_pruning: bool, } impl ParquetTable { @@ -51,6 +52,7 @@ impl ParquetTable { schema, statistics: parquet_exec.statistics().to_owned(), max_concurrency, + enable_pruning: true, }) } @@ -58,6 +60,17 @@ impl ParquetTable { pub fn path(&self) -> &str { &self.path } + + /// Get parquet pruning option + pub fn get_enable_pruning(&self) -> bool { + self.enable_pruning + } + + /// Set parquet pruning option + pub fn with_enable_pruning(mut self, enable_pruning: bool) -> Self { + self.enable_pruning = enable_pruning; + self + } } impl TableProvider for ParquetTable { @@ -86,7 +99,14 @@ impl TableProvider for ParquetTable { filters: &[Expr], limit: Option, ) -> Result> { - let predicate = combine_filters(filters); + // If enable pruning then combine the filters to build the predicate. + // If disable pruning then set the predicate to None, thus readers + // will not prune data based on the statistics. + let predicate = if self.enable_pruning { + combine_filters(filters) + } else { + None + }; Ok(Arc::new(ParquetExec::try_from_path( &self.path, projection.clone(), diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index d2dcec5f47d7..fe03b0e560f1 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -322,10 +322,11 @@ impl ExecutionContext { /// Registers a Parquet data source so that it can be referenced from SQL statements /// executed against this context. pub fn register_parquet(&mut self, name: &str, filename: &str) -> Result<()> { - let table = ParquetTable::try_new( - filename, - self.state.lock().unwrap().config.concurrency, - )?; + let table = { + let m = self.state.lock().unwrap(); + ParquetTable::try_new(filename, m.config.concurrency)? + .with_enable_pruning(m.config.parquet_pruning) + }; self.register_table(name, Arc::new(table))?; Ok(()) } @@ -633,6 +634,8 @@ pub struct ExecutionConfig { /// Should DataFusion repartition data using the partition keys to execute window functions in /// parallel using the provided `concurrency` level pub repartition_windows: bool, + /// Should Datafusion parquet reader using the predicate to prune data + parquet_pruning: bool, } impl Default for ExecutionConfig { @@ -663,6 +666,7 @@ impl Default for ExecutionConfig { repartition_joins: true, repartition_aggregations: true, repartition_windows: true, + parquet_pruning: true, } } } @@ -765,6 +769,12 @@ impl ExecutionConfig { self.repartition_windows = enabled; self } + + /// Enables or disables the use of pruning predicate for parquet readers to skip row groups + pub fn with_parquet_pruning(mut self, enabled: bool) -> Self { + self.parquet_pruning = enabled; + self + } } /// Holds per-execution properties and data (such as starting timestamps, etc). diff --git a/datafusion/src/physical_optimizer/pruning.rs b/datafusion/src/physical_optimizer/pruning.rs index 5585c4d08140..36253815414a 100644 --- a/datafusion/src/physical_optimizer/pruning.rs +++ b/datafusion/src/physical_optimizer/pruning.rs @@ -547,7 +547,7 @@ fn build_predicate_expression( // allow partial failure in predicate expression generation // this can still produce a useful predicate when multiple conditions are joined using AND Err(_) => { - return Ok(logical_plan::lit(true)); + return Ok(unhandled); } }; let corrected_op = expr_builder.correct_operator(op); @@ -596,7 +596,7 @@ fn build_predicate_expression( .lt_eq(expr_builder.scalar_expr().clone()) } // other expressions are not supported - _ => logical_plan::lit(true), + _ => unhandled, }; Ok(statistics_expr) }