Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46037][SQL] When Left Join build Left and codegen is turned off, ShuffledHashJoinExec may result in incorrect results #43938

Closed
wants to merge 2 commits into from

Conversation

mcdull-zhang
Copy link
Contributor

@mcdull-zhang mcdull-zhang commented Nov 21, 2023

What changes were proposed in this pull request?

When Left Join build Left and codegen is turned off, ShuffledHashJoinExec may have incorrect results.

The cause of the problem is: the left side (buildPlan) of LeftJoinBuildLeft is placed in the first half of joinRow

Why are the changes needed?

Make the query results correct.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

unit test.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Nov 21, 2023
@mcdull-zhang
Copy link
Contributor Author

ping @cloud-fan

// Put join left side before right side. This is to be consistent with
// `ShuffledHashJoinExec.fullOuterJoin`.
if ((joinType == FullOuter || joinType == LeftOuter) && buildSide == BuildLeft) {
// Put join left side before right side. This is to be consistent with ShuffledHashJoinExec.
Copy link
Contributor

@JoshRosen JoshRosen Nov 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether this whole boundCondition block can be simplified to

Predicate.create(condition.get, left.output ++ right.output).eval _

since the left side is always on the left and the right side is always on the right.

(Edit: the proposed simplification in ⬆️ is not correct (the bound condition input schema is not necessarily the join's output schema); see later comments for discussion of an alternative simplification).

Further down in this file, we have

protected lazy val (buildPlan, streamedPlan) = buildSide match {
case BuildLeft => (left, right)
case BuildRight => (right, left)
}

showing that buildPlan and streamPlan are just re-mappings of left and right depending on the build side.


Several years ago, it looks like we used to do something similar to my proposal:

@transient private[this] lazy val boundCondition = if (condition.isDefined) {
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
} else {
(r: InternalRow) => true
}

but we switched to the current use of buildPlan and streamPlan in a refactoring in #12102 (I'm not fully clear on why).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, It's fragile to keep 2 pieces of code in sync.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I guess it's wrong for inner join + build left?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this

  protected def createResultProjection(): (InternalRow) => InternalRow = joinType match {
    case LeftExistence(_) =>
      UnsafeProjection.create(output, output)
    case _ =>
      // Always put the stream side on left to simplify implementation
      // both of left and right side could be null
      UnsafeProjection.create(
        output, (streamedPlan.output ++ buildPlan.output).map(_.withNullability(true)))
  }

I think stream side left and build side right is the common case, but we have some special cases for outer joins.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I don't know why outer join needs to be a special case. Can't we always put streaming side left and build side right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashJoin.scala

override def output: Seq[Attribute] = {
    joinType match {
      case _: InnerLike =>
        left.output ++ right.output
      case LeftOuter =>
        left.output ++ right.output.map(_.withNullability(true))
      case RightOuter =>
        left.output.map(_.withNullability(true)) ++ right.output
      case j: ExistenceJoin =>
        left.output :+ j.exists
      case LeftExistence(_) =>
        left.output
      case x =>
        throw new IllegalArgumentException(s"HashJoin should not take $x as the JoinType")
    }
  }

ShuffledJoin.scala

override def output: Seq[Attribute] = {
    joinType match {
      case _: InnerLike =>
        left.output ++ right.output
      case LeftOuter =>
        left.output ++ right.output.map(_.withNullability(true))
      case RightOuter =>
        left.output.map(_.withNullability(true)) ++ right.output
      case FullOuter =>
        (left.output ++ right.output).map(_.withNullability(true))
      case j: ExistenceJoin =>
        left.output :+ j.exists
      case LeftExistence(_) =>
        left.output
      case x =>
        throw new IllegalArgumentException(
          s"${getClass.getSimpleName} not take $x as the JoinType")
    }
  }

@cloud-fan Output determines that we cannot simply put streaming side left and build side right.
Am I understanding correctly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea that's why we have a createResultProjection to reorder the columns.

Copy link
Contributor

@JoshRosen JoshRosen Nov 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think I understand now:

In all of the *Join methods in this file (which doesn't include full outer join), the streamed side is always the left side of the joinRow that is passed to boundCondition: in HashJoin.scala, the input to boundCondition is not necessarily the same as the output of the join operator itself (for that, we have resultProj which comes from createResultProjection which remains in sync with boundCondition (source).

However, boundCondition is inherited in ShuffledHashJoinExec and there we have a buildSideOrFullOuterJoin method (source) and there the resultProjection operates over the same input schema as the join output schema (rather than assuming that the joined row always has the streamed side on the left). In that code, boundCondition is evaluated over an input that matches the join output schema and there the streamed side could be on either side rather than only the left.

I think this inheritance is confusing and hard to reason about.

It seems like HashJoin.scala has an invariant of "streamed side always on left" which gets violated in ShuffleHashJoin.scala's separate implementation of outer joins.

I wonder whether we can address this bug by modifying ShuffledHashJoinExec.buildSideOrFullOuterJoin so that it always unconditionally places the streamed side on the left (the same as HashJoin.scala's default). (Edit: I'm basically agreeing with @cloud-fan's suggestion above).

withSQLConf(SQLConf.ENABLE_BUILD_SIDE_OUTER_SHUFFLED_HASH_JOIN_CODEGEN.key -> "false") {
val df1 = sql(
"""
|SELECT /*+ SHUFFLE_HASH(t1) */ *
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the golden file test outer-join.sql, we run it multiple times to test different code paths: broadcast join, shuffle join, sort merge join. code on/off etc., can we move the test there?

Copy link

github-actions bot commented Mar 3, 2024

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Mar 3, 2024
@github-actions github-actions bot closed this Mar 4, 2024
cloud-fan added a commit that referenced this pull request Aug 29, 2024
…without codegen

### What changes were proposed in this pull request?

This is a re-submitting of #43938 to fix a join correctness bug caused by #41398 . Credits go to mcdull-zhang

### Why are the changes needed?

correctness fix

### Does this PR introduce _any_ user-facing change?

Yes, the query result will be corrected.

### How was this patch tested?

new test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #47905 from cloud-fan/join.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan added a commit that referenced this pull request Aug 29, 2024
…without codegen

This is a re-submitting of #43938 to fix a join correctness bug caused by #41398 . Credits go to mcdull-zhang

correctness fix

Yes, the query result will be corrected.

new test

no

Closes #47905 from cloud-fan/join.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit af5e0a2)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
IvanK-db pushed a commit to IvanK-db/spark that referenced this pull request Sep 20, 2024
…without codegen

### What changes were proposed in this pull request?

This is a re-submitting of apache#43938 to fix a join correctness bug caused by apache#41398 . Credits go to mcdull-zhang

### Why are the changes needed?

correctness fix

### Does this PR introduce _any_ user-facing change?

Yes, the query result will be corrected.

### How was this patch tested?

new test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#47905 from cloud-fan/join.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants