Skip to content

Commit

Permalink
Rewrite Filter Predicate (#192)
Browse files Browse the repository at this point in the history
  • Loading branch information
berkaysynnada authored and mustafasrepo committed Dec 21, 2023
1 parent 33860ec commit 497be96
Show file tree
Hide file tree
Showing 9 changed files with 3,573 additions and 2,287 deletions.
2 changes: 1 addition & 1 deletion SYNNADA-CONTRIBUTIONS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ c250200fdd8f10e65b52ca1ab7f41bb04992d10b
13c5774511ad2259907d3eebcd8897e1ec5aeed5
768927edfd06936416952031c776de97704b4986
89834f047a92ddbf892633046663bcba88da05d5
998140f5b53283adfa89da532a2c97cbfcccd88b
998140f5b53283adfa89da532a2c97cbfcccd88b
24 changes: 17 additions & 7 deletions datafusion/core/src/physical_optimizer/join_pipeline_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@
use std::sync::Arc;

use crate::physical_optimizer::utils::{is_hash_join, is_nested_loop_join, is_sort};
use crate::physical_plan::joins::utils::is_filter_expr_prunable;
use crate::physical_plan::joins::{
HashJoinExec, NestedLoopJoinExec, SlidingHashJoinExec, SlidingNestedLoopJoinExec,
SortMergeJoinExec, StreamJoinPartitionMode,
};
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use datafusion_physical_plan::joins::utils::swap_join_type;

use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
Expand All @@ -21,10 +19,14 @@ use datafusion_physical_expr::utils::{
ordering_satisfy_requirement_concrete,
};
use datafusion_physical_expr::PhysicalSortRequirement;

use datafusion_physical_plan::joins::prunability::{
is_filter_expr_prunable, separate_columns_of_filter_expression,
};
use datafusion_physical_plan::joins::utils::swap_join_type;
use datafusion_physical_plan::joins::{
swap_sliding_hash_join, swap_sliding_nested_loop_join, swap_sort_merge_join,
};

use itertools::{iproduct, izip, Itertools};

/// This object is used within the JoinSelection rule to track the closest
Expand Down Expand Up @@ -328,7 +330,11 @@ fn check_hash_join_convertable(
hash_join: &HashJoinExec,
config_options: &ConfigOptions,
) -> Result<Option<Vec<Arc<dyn ExecutionPlan>>>> {
let filter = hash_join.filter();
// To perform the prunability analysis correctly, the columns from the left table
// and the columns from the right table must be on the different sides of the join.
let filter = hash_join
.filter()
.map(|filter| separate_columns_of_filter_expression(filter.clone()));
let (on_left, on_right): (Vec<_>, Vec<_>) = hash_join.on.iter().cloned().unzip();
let left_order = hash_join.left().output_ordering();
let right_order = hash_join.right().output_ordering();
Expand All @@ -343,7 +349,7 @@ fn check_hash_join_convertable(
) {
(true, true, Some(filter), Some(left_order), Some(right_order)) => {
let (left_prunable, right_prunable) = is_filter_expr_prunable(
filter,
&filter,
Some(left_order[0].clone()),
Some(right_order[0].clone()),
|| hash_join.left().equivalence_properties(),
Expand Down Expand Up @@ -441,7 +447,11 @@ fn check_nested_loop_join_convertable(
nested_loop_join: &NestedLoopJoinExec,
_config_options: &ConfigOptions,
) -> Result<Option<Vec<Arc<dyn ExecutionPlan>>>> {
let filter = nested_loop_join.filter();
// To perform the prunability analysis correctly, the columns from the left table
// and the columns from the right table must be on the different sides of the join.
let filter = nested_loop_join
.filter()
.map(|filter| separate_columns_of_filter_expression(filter.clone()));
let left_order = nested_loop_join.left().output_ordering();
let right_order = nested_loop_join.right().output_ordering();
let is_left_streaming = is_plan_streaming(nested_loop_join.left());
Expand All @@ -455,7 +465,7 @@ fn check_nested_loop_join_convertable(
) {
(true, true, Some(filter), Some(left_order), Some(right_order)) => {
let (left_prunable, right_prunable) = is_filter_expr_prunable(
filter,
&filter,
Some(left_order[0].clone()),
Some(right_order[0].clone()),
|| nested_loop_join.left().equivalence_properties(),
Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::physical_plan::ExecutionPlan;
use datafusion_common::internal_err;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{DataFusionError, JoinType};
use datafusion_physical_plan::joins::prunability::separate_columns_of_filter_expression;
use datafusion_physical_plan::joins::utils::{
swap_join_filter, swap_join_type, swap_reverting_projection,
};
Expand Down Expand Up @@ -388,7 +389,9 @@ fn hash_join_convert_symmetric_subrule(
hash_join.left().clone(),
hash_join.right().clone(),
hash_join.on().to_vec(),
hash_join.filter().cloned(),
hash_join
.filter()
.map(|filter| separate_columns_of_filter_expression(filter.clone())),
hash_join.join_type(),
hash_join.null_equals_null(),
mode,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ use std::sync::Arc;
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::joins::utils::is_filter_expr_prunable;
use crate::physical_plan::joins::SymmetricHashJoinExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};

use datafusion_common::config::OptimizerOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::{plan_err, DataFusionError};
use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported};
use datafusion_physical_plan::joins::prunability::is_filter_expr_prunable;

/// The PipelineChecker rule rejects non-runnable query plans that use
/// pipeline-breaking operators on infinite input(s).
Expand Down
5 changes: 4 additions & 1 deletion datafusion/physical-plan/src/joins/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ pub use sliding_nested_loop_join::{
};
pub use sort_merge_join::{swap_sort_merge_join, SortMergeJoinExec};
pub use symmetric_hash_join::SymmetricHashJoinExec;

pub mod prunability;
pub mod utils;

mod cross_join;
mod hash_join;
mod nested_loop_join;
Expand All @@ -35,7 +39,6 @@ mod sliding_window_join_utils;
mod sort_merge_join;
mod stream_join_utils;
mod symmetric_hash_join;
pub mod utils;

#[cfg(test)]
pub mod test_utils;
Expand Down
Loading

0 comments on commit 497be96

Please sign in to comment.