Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46037][SQL] When Left Join build Left and codegen is turned off, ShuffledHashJoinExec may result in incorrect results #43938

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,8 @@ trait HashJoin extends JoinCodegenSupport {
UnsafeProjection.create(streamedBoundKeys)

@transient protected[this] lazy val boundCondition = if (condition.isDefined) {
if (joinType == FullOuter && buildSide == BuildLeft) {
// Put join left side before right side. This is to be consistent with
// `ShuffledHashJoinExec.fullOuterJoin`.
if ((joinType == FullOuter || joinType == LeftOuter) && buildSide == BuildLeft) {
// Put join left side before right side. This is to be consistent with ShuffledHashJoinExec.
Copy link
Contributor

@JoshRosen JoshRosen Nov 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether this whole boundCondition block can be simplified to

Predicate.create(condition.get, left.output ++ right.output).eval _

since the left side is always on the left and the right side is always on the right.

(Edit: the proposed simplification in ⬆️ is not correct (the bound condition input schema is not necessarily the join's output schema); see later comments for discussion of an alternative simplification).

Further down in this file, we have

protected lazy val (buildPlan, streamedPlan) = buildSide match {
case BuildLeft => (left, right)
case BuildRight => (right, left)
}

showing that buildPlan and streamPlan are just re-mappings of left and right depending on the build side.


Several years ago, it looks like we used to do something similar to my proposal:

@transient private[this] lazy val boundCondition = if (condition.isDefined) {
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
} else {
(r: InternalRow) => true
}

but we switched to the current use of buildPlan and streamPlan in a refactoring in #12102 (I'm not fully clear on why).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, It's fragile to keep 2 pieces of code in sync.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I guess it's wrong for inner join + build left?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this

  protected def createResultProjection(): (InternalRow) => InternalRow = joinType match {
    case LeftExistence(_) =>
      UnsafeProjection.create(output, output)
    case _ =>
      // Always put the stream side on left to simplify implementation
      // both of left and right side could be null
      UnsafeProjection.create(
        output, (streamedPlan.output ++ buildPlan.output).map(_.withNullability(true)))
  }

I think stream side left and build side right is the common case, but we have some special cases for outer joins.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I don't know why outer join needs to be a special case. Can't we always put streaming side left and build side right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashJoin.scala

override def output: Seq[Attribute] = {
    joinType match {
      case _: InnerLike =>
        left.output ++ right.output
      case LeftOuter =>
        left.output ++ right.output.map(_.withNullability(true))
      case RightOuter =>
        left.output.map(_.withNullability(true)) ++ right.output
      case j: ExistenceJoin =>
        left.output :+ j.exists
      case LeftExistence(_) =>
        left.output
      case x =>
        throw new IllegalArgumentException(s"HashJoin should not take $x as the JoinType")
    }
  }

ShuffledJoin.scala

override def output: Seq[Attribute] = {
    joinType match {
      case _: InnerLike =>
        left.output ++ right.output
      case LeftOuter =>
        left.output ++ right.output.map(_.withNullability(true))
      case RightOuter =>
        left.output.map(_.withNullability(true)) ++ right.output
      case FullOuter =>
        (left.output ++ right.output).map(_.withNullability(true))
      case j: ExistenceJoin =>
        left.output :+ j.exists
      case LeftExistence(_) =>
        left.output
      case x =>
        throw new IllegalArgumentException(
          s"${getClass.getSimpleName} not take $x as the JoinType")
    }
  }

@cloud-fan Output determines that we cannot simply put streaming side left and build side right.
Am I understanding correctly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea that's why we have a createResultProjection to reorder the columns.

Copy link
Contributor

@JoshRosen JoshRosen Nov 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think I understand now:

In all of the *Join methods in this file (which doesn't include full outer join), the streamed side is always the left side of the joinRow that is passed to boundCondition: in HashJoin.scala, the input to boundCondition is not necessarily the same as the output of the join operator itself (for that, we have resultProj which comes from createResultProjection which remains in sync with boundCondition (source).

However, boundCondition is inherited in ShuffledHashJoinExec and there we have a buildSideOrFullOuterJoin method (source) and there the resultProjection operates over the same input schema as the join output schema (rather than assuming that the joined row always has the streamed side on the left). In that code, boundCondition is evaluated over an input that matches the join output schema and there the streamed side could be on either side rather than only the left.

I think this inheritance is confusing and hard to reason about.

It seems like HashJoin.scala has an invariant of "streamed side always on left" which gets violated in ShuffleHashJoin.scala's separate implementation of outer joins.

I wonder whether we can address this bug by modifying ShuffledHashJoinExec.buildSideOrFullOuterJoin so that it always unconditionally places the streamed side on the left (the same as HashJoin.scala's default). (Edit: I'm basically agreeing with @cloud-fan's suggestion above).

Predicate.create(condition.get, buildPlan.output ++ streamedPlan.output).eval _
} else {
Predicate.create(condition.get, streamedPlan.output ++ buildPlan.output).eval _
Expand Down
23 changes: 23 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1755,4 +1755,27 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
cached.unpersist()
}
}

test("SPARK-46037: When Left Join build Left, ShuffledHashJoinExec may " +
JoshRosen marked this conversation as resolved.
Show resolved Hide resolved
"result in incorrect results") {
withSQLConf(SQLConf.ENABLE_BUILD_SIDE_OUTER_SHUFFLED_HASH_JOIN_CODEGEN.key -> "false") {
val df1 = sql(
"""
|SELECT /*+ SHUFFLE_HASH(t1) */ *
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the golden file test outer-join.sql, we run it multiple times to test different code paths: broadcast join, shuffle join, sort merge join. code on/off etc., can we move the test there?

|FROM testData t1
|LEFT OUTER JOIN
|testData2 t2
|ON key = a AND concat(value, b) = '12'
|""".stripMargin)
val df2 = sql(
"""
|SELECT /*+ SHUFFLE_MERGE(t1) */ *
|FROM testData t1
|LEFT OUTER JOIN
|testData2 t2
|ON key = a AND concat(value, b) = '12'
|""".stripMargin)
checkAnswer(df1, df2.collect())
}
}
}