Skip to content

Commit

Permalink
step1 add option in ExecutionConfig to enable/disable parquet pruning
Browse files Browse the repository at this point in the history
  • Loading branch information
lvheyang committed Jul 19, 2021
1 parent 002ca5d commit 9a2dae0
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 7 deletions.
22 changes: 21 additions & 1 deletion datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub struct ParquetTable {
schema: SchemaRef,
statistics: Statistics,
max_concurrency: usize,
enable_pruning: bool,
}

impl ParquetTable {
Expand All @@ -51,13 +52,25 @@ impl ParquetTable {
schema,
statistics: parquet_exec.statistics().to_owned(),
max_concurrency,
enable_pruning: true,
})
}

/// Get the path for the Parquet file(s) represented by this ParquetTable instance
pub fn path(&self) -> &str {
&self.path
}

/// Get parquet pruning option
pub fn get_enable_pruning(&self) -> bool {
self.enable_pruning
}

/// Set parquet pruning option
pub fn with_enable_pruning(mut self, enable_pruning: bool) -> Self {
self.enable_pruning = enable_pruning;
self
}
}

impl TableProvider for ParquetTable {
Expand Down Expand Up @@ -86,7 +99,14 @@ impl TableProvider for ParquetTable {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let predicate = combine_filters(filters);
// If enable pruning then combine the filters to build the predicate.
// If disable pruning then set the predicate to None, thus readers
// will not prune data based on the statistics.
let predicate = if self.enable_pruning {
combine_filters(filters)
} else {
None
};
Ok(Arc::new(ParquetExec::try_from_path(
&self.path,
projection.clone(),
Expand Down
18 changes: 14 additions & 4 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,11 @@ impl ExecutionContext {
/// Registers a Parquet data source so that it can be referenced from SQL statements
/// executed against this context.
pub fn register_parquet(&mut self, name: &str, filename: &str) -> Result<()> {
let table = ParquetTable::try_new(
filename,
self.state.lock().unwrap().config.concurrency,
)?;
let table = {
let m = self.state.lock().unwrap();
ParquetTable::try_new(filename, m.config.concurrency)?
.with_enable_pruning(m.config.parquet_pruning)
};
self.register_table(name, Arc::new(table))?;
Ok(())
}
Expand Down Expand Up @@ -633,6 +634,8 @@ pub struct ExecutionConfig {
/// Should DataFusion repartition data using the partition keys to execute window functions in
/// parallel using the provided `concurrency` level
pub repartition_windows: bool,
/// Should Datafusion parquet reader using the predicate to prune data
parquet_pruning: bool,
}

impl Default for ExecutionConfig {
Expand Down Expand Up @@ -663,6 +666,7 @@ impl Default for ExecutionConfig {
repartition_joins: true,
repartition_aggregations: true,
repartition_windows: true,
parquet_pruning: true,
}
}
}
Expand Down Expand Up @@ -765,6 +769,12 @@ impl ExecutionConfig {
self.repartition_windows = enabled;
self
}

/// Enables or disables the use of pruning predicate for parquet readers to skip row groups
pub fn with_parquet_pruning(mut self, enabled: bool) -> Self {
self.parquet_pruning = enabled;
self
}
}

/// Holds per-execution properties and data (such as starting timestamps, etc).
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ fn build_predicate_expression(
// allow partial failure in predicate expression generation
// this can still produce a useful predicate when multiple conditions are joined using AND
Err(_) => {
return Ok(logical_plan::lit(true));
return Ok(unhandled);
}
};
let corrected_op = expr_builder.correct_operator(op);
Expand Down Expand Up @@ -596,7 +596,7 @@ fn build_predicate_expression(
.lt_eq(expr_builder.scalar_expr().clone())
}
// other expressions are not supported
_ => logical_plan::lit(true),
_ => unhandled,
};
Ok(statistics_expr)
}
Expand Down

0 comments on commit 9a2dae0

Please sign in to comment.