Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encode all join conditions in a single expression field #7612

Merged
merged 6 commits into from
Oct 13, 2023

Conversation

nseekhao
Copy link
Contributor

@nseekhao nseekhao commented Sep 20, 2023

Which issue does this PR close?

Closes #7611 .

Rationale for this change

For more efficient fixing the incorrect representation of join condition. Please refer to the related issue for more details.

What changes are included in this PR?

  • Producer: instead of encoding the join on and filter separately in expression and post_join_filter, this PR encodes both in the expression field of the JoinRel.
  • Consumer: Adds a check for in each part of the conjunctive JoinRel expression. If the sub-expression is a Column Eq Column expression, then it gets added to the on field, otherwise, it gets AND-ed with the filter field.

Are these changes tested?

Yes.

No new tests were added. Since the producer now hardcodes post_join_filter as None, if the changes are not working correctly, the existing would fail.

Are there any user-facing changes?

No

let join_exprs: Vec<(Column, Column, bool)> = predicates
.iter()
.map(|p| match p {
// The predicates can contain both equal and non-equal ops.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is a good idea, it seems to make things more complex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Dandandan Thank you for pointing this out. I agree that this makes things more complicated, and my apologies for the inaccurate explanation of the issue. I added a comment to correct my description of the issue.

The high-level idea is that join filter and post_join_filter do not have the same meaning semantically. The former is for filtering input during/pre join, the latter is for filtering the output of the join (post-join). Currently in datafusion, we do not have a field in join relation that represents a post-join predicate (the parser/logical optimizer takes care of creating an appropriate filter relation if necessary). So the producer should only generate plans with None as post_join_filer.

I'll modify the consumer to throw an error for now if there's a post_join_filter. Later, we can wrap the join relation with a filter relation if we want to support post_join_filter, or if the later version of datafusion supports post_join_filter directly in the join relation, then we can also add the support in both producer and consumer as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into the substrait spec, and it doesn't really talk about what semantics of the post_join_filter is: https://substrait.io/relations/logical_relations/#join-operation
https://github.com/search?q=repo%3Asubstrait-io%2Fsubstrait%20post_join_filter&type=code

There is a subtle distinction between non equality filters applied during the join (in the ON clause) and applied post join for non-INNER joins: for non inner joins the filters during the join don't filter out input rows (they still come out, just with NULL matches)

So the producer should only generate plans with None as post_join_filer.

This makes sense to me

For the consumer, there is already code in DataFusion that breaks up an arbitrary Expr into equality predicates and others. This is how the SQL frontend creates a Join (a single expr):

https://github.com/apache/arrow-datafusion/blob/a514b6752b063a5a3006aa114297520a933339b0/datafusion/sql/src/relation/join.rs#L134-L141

I think we could do the same here in the subtrait consumer which would be much simpler, and would let the normal DataFusion optimization machinery work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @alamb for the pointer! I'll look into this and update the consumer.

@nseekhao nseekhao marked this pull request as draft September 21, 2023 16:48
@nseekhao nseekhao marked this pull request as ready for review September 21, 2023 20:01
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @nseekhao and @Dandandan

I think this code could be made simpler and left some suggestions about how to do so.

That being said the only thing I think is needed prior to merge is some tests (so we don't break this code in some future refactoring). I don't think the simplifications are necessary

@@ -331,6 +342,12 @@ pub async fn from_substrait_rel(
}
}
Some(RelType::Join(join)) => {
if join.post_join_filter.is_some() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


// create conjunction between `join_on` and `join_filter` to embed all join conditions,
// whether equal or non-equal in a single expression
let join_expr = match &join_on {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps you could use conjunction here and simplify the code https://docs.rs/datafusion/latest/datafusion/optimizer/utils/fn.conjunction.html

let join_exprs: Vec<(Column, Column, bool)> = predicates
.iter()
.map(|p| match p {
// The predicates can contain both equal and non-equal ops.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into the substrait spec, and it doesn't really talk about what semantics of the post_join_filter is: https://substrait.io/relations/logical_relations/#join-operation
https://github.com/search?q=repo%3Asubstrait-io%2Fsubstrait%20post_join_filter&type=code

There is a subtle distinction between non equality filters applied during the join (in the ON clause) and applied post join for non-INNER joins: for non inner joins the filters during the join don't filter out input rows (they still come out, just with NULL matches)

So the producer should only generate plans with None as post_join_filer.

This makes sense to me

For the consumer, there is already code in DataFusion that breaks up an arbitrary Expr into equality predicates and others. This is how the SQL frontend creates a Join (a single expr):

https://github.com/apache/arrow-datafusion/blob/a514b6752b063a5a3006aa114297520a933339b0/datafusion/sql/src/relation/join.rs#L134-L141

I think we could do the same here in the subtrait consumer which would be much simpler, and would let the normal DataFusion optimization machinery work.

@nseekhao
Copy link
Contributor Author

Thank you @nseekhao and @Dandandan

I think this code could be made simpler and left some suggestions about how to do so.

That being said the only thing I think is needed prior to merge is some tests (so we don't break this code in some future refactoring). I don't think the simplifications are necessary

I don't mind simplifying the code so I'll work on it following your earlier suggestions.

As for the tests, I agree. Since, existing cases already test for mixed join condition (roundtrip_non_equi_join()) and non-equi-only join (roundtrip_non_equi_inner_join()), are you thinking a new set of tests that specifically test for the absence of post_join_filter? Or did you have something else in mind? Thanks!

@alamb
Copy link
Contributor

alamb commented Sep 26, 2023

I don't mind simplifying the code so I'll work on it following your earlier suggestions.

Thank you! BTW I found the relevant code to extract equijoins predicates here: https://docs.rs/datafusion/latest/datafusion/optimizer/extract_equijoin_predicate/struct.ExtractEquijoinPredicate.html

are you thinking a new set of tests that specifically test for the absence of post_join_filter

Yes I guess I was thinking about that. Mostly I was thinking "if we broke / reverted the code in this PR accidentally, how would we know something wasn't correct"? Without some test coverage the behavior could change and we don't know.

@alamb
Copy link
Contributor

alamb commented Sep 28, 2023

Marking as draft as I believe this PR is not waiting for any feedback (there are some comments to address). Thanks again for pushing this forward @nseekhao

@nseekhao
Copy link
Contributor Author

nseekhao commented Oct 6, 2023

@alamb I extracted the predicate splits from the from_substrait_rel() so the code is easier to read. Unfortunately, I couldn't use the function split_eq_and_noneq_join_predicate() (from your ref) directly because the function will put the equal condition with nulls_equal_nulls = true (IsNotDistinctFrom) into a join filter, as opposed to putting it in the join condition.

As for the test, I added a test function to be used when join has a non-equi condition, to ensure that the join filter does not mistakenly get embedded into a Substrait plan as a post_join_filter. LMK if this is what you had in mind, otherwise we can discuss further changes. Thanks for the review!

@nseekhao nseekhao marked this pull request as ready for review October 6, 2023 21:52
.collect::<Result<Vec<_>>>()?;
let (left_cols, right_cols, null_eq_nulls): (Vec<_>, Vec<_>, Vec<_>) =
itertools::multiunzip(join_exprs);
// The join expression can contain both equal and non-equal ops.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the ExtractEquijoinPredicate optimizer pass already splits up join predicates into equijoin predicates and "other" predicates, I wonder if simply create the LogicalPlan::Join using join.expression (and let the subsequent optimizer pass sort it out)?

Something like

left.join(
  right.build()?,
  join_type,
  (vec![], vec![]),
  on, // <-- use the filter directly here, let optimizer pass extract the equijoin columns
  nulls_equal_nulls,
)?

It makes me realize when looking at the API for LogicalPlanBuilder::join that the API is super confusing. It would be nice to improve that API to make it clear that a join can just take a single Expr and DataFusion will sort out figuring out the join columns, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out this is exactly what DataFrame::join_on does -- I have filed a ticket with a way to make this clearer: #7766 (comment)

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @nseekhao -- I think we could merge this PR as is. I still think it could perhaps be made even simpler (by avoiding the join predicate analysis). Let me know what you think.

@@ -615,6 +621,91 @@ async fn extension_logical_plan() -> Result<()> {
Ok(())
}

fn check_post_join_filters(rel: &Rel) -> Result<()> {
// search for target_rel and field value in proto
match &rel.rel_type {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it might be helpful (eventually) do define TreeNode for Rel to implement walking efficiently.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR is better that what is on master, so I plan to merge it in once CI passes. I will propose a follow on PR that uses the newly added LogicalPlanBuilder::join_on to simplify it subsequently

@alamb
Copy link
Contributor

alamb commented Oct 13, 2023

I have merged up from main and plan to merge this PR once CI passes

@alamb
Copy link
Contributor

alamb commented Oct 13, 2023

I will propose a follow on PR that uses the newly added LogicalPlanBuilder::join_on to simplify it subsequently

I tried this and it turns out there is some subtlety involved with a = a vs a IS DISTINCT FROM that I was not able to solve when trying to simplify. PR is here: #7819

@alamb alamb merged commit f5a6d01 into apache:main Oct 13, 2023
22 checks passed
@alamb
Copy link
Contributor

alamb commented Oct 13, 2023

Thanks again @nseekhao

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Substrait: Combine join on and filter expressions in a single Substrait JoinRel's field
3 participants