Skip to content

Commit

Permalink
rewrite predicates before pushing to union inputs
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Feb 8, 2022
1 parent 09c67d5 commit 6077261
Showing 1 changed file with 55 additions and 7 deletions.
62 changes: 55 additions & 7 deletions datafusion/src/optimizer/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

use crate::datasource::datasource::TableProviderFilterPushDown;
use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection};
use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection, Union};
use crate::logical_plan::{
and, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, TableScan,
and, col, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, TableScan,
};
use crate::logical_plan::{DFSchema, Expr};
use crate::optimizer::optimizer::OptimizerRule;
Expand Down Expand Up @@ -346,14 +346,18 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
.fields()
.iter()
.enumerate()
.map(|(i, field)| {
.flat_map(|(i, field)| {
// strip alias, as they should not be part of filters
let expr = match &expr[i] {
Expr::Alias(expr, _) => expr.as_ref().clone(),
expr => expr.clone(),
};

(field.qualified_name(), expr)
// considering both qualified and unqualified names as rewriting option
[
(field.name().clone(), expr.clone()),
(field.qualified_name(), expr),
]
})
.collect::<HashMap<_, _>>();

Expand Down Expand Up @@ -394,8 +398,29 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
// sort is filter-commutable
push_down(&state, plan)
}
LogicalPlan::Union(_) => {
// union all is filter-commutable
LogicalPlan::Union(Union {
inputs: _,
schema,
alias: _,
}) => {
// union changing all qualifiers while building logical plan so we need
// to rewrite filters to push unqualified columns to inputs
let projection = schema
.fields()
.iter()
.map(|field| (field.qualified_name(), col(field.name())))
.collect::<HashMap<_, _>>();

// rewriting predicate expressions using unqualified names as replacements
if !projection.is_empty() {
for (predicate, columns) in state.filters.iter_mut() {
*predicate = rewrite(predicate, &projection)?;

columns.clear();
utils::expr_to_columns(predicate, columns)?;
}
}

push_down(&state, plan)
}
LogicalPlan::Limit(Limit { input, .. }) => {
Expand Down Expand Up @@ -574,7 +599,9 @@ fn rewrite(expr: &Expr, projection: &HashMap<String, Expr>) -> Result<Expr> {
mod tests {
use super::*;
use crate::datasource::TableProvider;
use crate::logical_plan::{lit, sum, DFSchema, Expr, LogicalPlanBuilder, Operator};
use crate::logical_plan::{
lit, sum, union_with_alias, DFSchema, Expr, LogicalPlanBuilder, Operator,
};
use crate::physical_plan::ExecutionPlan;
use crate::test::*;
use crate::{logical_plan::col, prelude::JoinType};
Expand Down Expand Up @@ -901,6 +928,27 @@ mod tests {
Ok(())
}

#[test]
fn union_all_with_alias() -> Result<()> {
let table_scan = test_table_scan()?;
let union =
union_with_alias(table_scan.clone(), table_scan, Some("t".to_string()))?;

let plan = LogicalPlanBuilder::from(union)
.filter(col("t.a").eq(lit(1i64)))?
.build()?;

// filter appears below Union without relation qualifier
let expected = "\
Union\
\n Filter: #a = Int64(1)\
\n TableScan: test projection=None\
\n Filter: #a = Int64(1)\
\n TableScan: test projection=None";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}

/// verifies that filters with the same columns are correctly placed
#[test]
fn filter_2_breaks_limits() -> Result<()> {
Expand Down

0 comments on commit 6077261

Please sign in to comment.