Skip to content

Commit

Permalink
Don't add filters to used columns (apache#7670)
Browse files Browse the repository at this point in the history
Add test

WIP fix

Fix filter after scan

Totally reemove filter to column extraction

Fix test

Update tests 1

Update tests 2

Update tests 3
  • Loading branch information
Dandandan authored and Ted-Jiang committed Oct 7, 2023
1 parent 8e1441b commit d804793
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 140 deletions.
9 changes: 5 additions & 4 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1287,15 +1287,16 @@ impl TableProvider for DataFrameTableProvider {
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut expr = LogicalPlanBuilder::from(self.plan.clone());
if let Some(p) = projection {
expr = expr.select(p.iter().copied())?
}

// Add filter when given
let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
if let Some(filter) = filter {
expr = expr.filter(filter)?
}

if let Some(p) = projection {
expr = expr.select(p.iter().copied())?
}

// add a limit if given
if let Some(l) = limit {
expr = expr.limit(0, Some(l))?
Expand Down
27 changes: 14 additions & 13 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,20 @@ impl TableProvider for ViewTable {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan = if let Some(projection) = projection {
let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
let plan = self.logical_plan().clone();
let mut plan = LogicalPlanBuilder::from(plan);

if let Some(filter) = filter {
plan = plan.filter(filter)?;
}

let mut plan = if let Some(projection) = projection {
// avoiding adding a redundant projection (e.g. SELECT * FROM view)
let current_projection =
(0..self.logical_plan.schema().fields().len()).collect::<Vec<usize>>();
(0..plan.schema().fields().len()).collect::<Vec<usize>>();
if projection == &current_projection {
self.logical_plan().clone()
plan
} else {
let fields: Vec<Expr> = projection
.iter()
Expand All @@ -123,19 +131,11 @@ impl TableProvider for ViewTable {
)
})
.collect();
LogicalPlanBuilder::from(self.logical_plan.clone())
.project(fields)?
.build()?
plan.project(fields)?
}
} else {
self.logical_plan().clone()
plan
};
let mut plan = LogicalPlanBuilder::from(plan);
let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));

if let Some(filter) = filter {
plan = plan.filter(filter)?;
}

if let Some(limit) = limit {
plan = plan.limit(0, Some(limit))?;
Expand Down Expand Up @@ -439,6 +439,7 @@ mod tests {
.select_columns(&["bool_col", "int_col"])?;

let plan = df.explain(false, false)?.collect().await?;

// Filters all the way to Parquet
let formatted = arrow::util::pretty::pretty_format_batches(&plan)
.unwrap()
Expand Down
11 changes: 10 additions & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use crate::expr_rewriter::{
};
use crate::type_coercion::binary::comparison_coercion;
use crate::utils::{columnize_expr, compare_sort_expr};
use crate::{and, binary_expr, DmlStatement, Operator, WriteOp};
use crate::{
and, binary_expr, DmlStatement, Operator, TableProviderFilterPushDown, WriteOp,
};
use crate::{
logical_plan::{
Aggregate, Analyze, CrossJoin, Distinct, EmptyRelation, Explain, Filter, Join,
Expand Down Expand Up @@ -1402,6 +1404,13 @@ impl TableSource for LogicalTableSource {
fn schema(&self) -> SchemaRef {
self.table_schema.clone()
}

fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<crate::TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
}
}

/// Create a [`LogicalPlan::Unnest`] plan
Expand Down
Loading

0 comments on commit d804793

Please sign in to comment.