Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ExistenceJoin Iterator using an auxiliary left semijoin #4796

Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a080f9a
wip
gerashegalov Feb 9, 2022
cf135d7
Merge remote-tracking branch 'origin/branch-22.04' into gerashegalov/…
gerashegalov Feb 9, 2022
4ad4616
Merge remote-tracking branch 'origin/branch-22.04' into gerashegalov/…
gerashegalov Feb 15, 2022
deba9a1
wip
gerashegalov Feb 16, 2022
664ba4e
wip
gerashegalov Feb 16, 2022
a5da440
Use verboseStringWithSuffix(1000) for TreeNode match
gerashegalov Feb 16, 2022
b55a3e1
Use Scala Int.MinValue
gerashegalov Feb 16, 2022
6293dc1
config.md update for spark.rapids.sql.join.existence.enabled
gerashegalov Feb 16, 2022
8c9597b
Merge remote-tracking branch 'origin/branch-22.04' into gerashegalov/…
gerashegalov Feb 16, 2022
2517f73
undo buildall changes
gerashegalov Feb 16, 2022
193dbac
restore whitespace
gerashegalov Feb 16, 2022
5ed19db
existence join test with rhs duplicates
gerashegalov Feb 17, 2022
1c332ec
semijoin-based implementation
gerashegalov Feb 18, 2022
a02992e
undo cosmetic change
gerashegalov Feb 18, 2022
7e6bfcd
fix tagForGpu in GpuHashJoin
gerashegalov Feb 18, 2022
e333159
test updates
gerashegalov Feb 19, 2022
89c9a0e
draft
gerashegalov Feb 22, 2022
f4fe704
Merge remote-tracking branch 'origin/branch-22.04' into gerashegalov/…
gerashegalov Feb 23, 2022
465a957
undo gatherer changes
gerashegalov Feb 23, 2022
1a44866
mem leaks fixed
gerashegalov Feb 24, 2022
baea237
lhs dupes, conditional join
gerashegalov Feb 24, 2022
29704bf
mixedLeftSemiJoinGatherMap
gerashegalov Feb 25, 2022
b19c98f
mnemonic test ids
gerashegalov Feb 25, 2022
2948a66
refactoring
gerashegalov Feb 25, 2022
6d55b6a
comment fix
gerashegalov Feb 25, 2022
a700f33
wip
gerashegalov Feb 28, 2022
67d8616
broadcast hash join test
gerashegalov Mar 2, 2022
e4c0a40
undo import py
gerashegalov Mar 2, 2022
3a5a173
undo explain
gerashegalov Mar 2, 2022
2f88244
undo explain
gerashegalov Mar 2, 2022
5393301
fixe bhj test id
gerashegalov Mar 2, 2022
02b037c
Update comment in join_test.py
gerashegalov Mar 2, 2022
9f2ed60
review comments
gerashegalov Mar 3, 2022
df4ad01
Merge remote-tracking branch 'origin/branch-22.04' into pr/gerashegal…
gerashegalov Mar 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ Name | Description | Default Value
<a name="sql.incompatibleDateFormats.enabled"></a>spark.rapids.sql.incompatibleDateFormats.enabled|When parsing strings as dates and timestamps in functions like unix_timestamp, some formats are fully supported on the GPU and some are unsupported and will fall back to the CPU. Some formats behave differently on the GPU than the CPU. Spark on the CPU interprets date formats with unsupported trailing characters as nulls, while Spark on the GPU will parse the date with invalid trailing characters. More detail can be found at [parsing strings as dates or timestamps](compatibility.md#parsing-strings-as-dates-or-timestamps).|false
<a name="sql.incompatibleOps.enabled"></a>spark.rapids.sql.incompatibleOps.enabled|For operations that work, but are not 100% compatible with the Spark equivalent set if they should be enabled by default or disabled by default.|false
<a name="sql.join.cross.enabled"></a>spark.rapids.sql.join.cross.enabled|When set to true cross joins are enabled on the GPU|true
<a name="sql.join.existence.enabled"></a>spark.rapids.sql.join.existence.enabled|When set to true existence joins are enabled on the GPU|true
<a name="sql.join.fullOuter.enabled"></a>spark.rapids.sql.join.fullOuter.enabled|When set to true full outer joins are enabled on the GPU|true
<a name="sql.join.inner.enabled"></a>spark.rapids.sql.join.inner.enabled|When set to true inner joins are enabled on the GPU|true
<a name="sql.join.leftAnti.enabled"></a>spark.rapids.sql.join.leftAnti.enabled|When set to true left anti joins are enabled on the GPU|true
Expand Down
47 changes: 33 additions & 14 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -802,27 +802,46 @@ def do_join(spark):
# If the condition is something like an AND, it makes the result a subset of a SemiJoin, and
# the optimizer won't use ExistenceJoin.
@ignore_order(local=True)
@pytest.mark.parametrize(
"allowFallback", [
pytest.param('true',
marks=pytest.mark.allow_non_gpu('SortMergeJoinExec')),
pytest.param('false',
marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/589"))
], ids=idfn
)
def test_existence_join(allowFallback, spark_tmp_table_factory):
@pytest.mark.parametrize('numComplementsToExists', [0, 1, 2])
@pytest.mark.parametrize('aqeEnabled', [
pytest.param(False),
pytest.param(True, marks=pytest.mark.allow_non_gpu('ShuffleExchangeExec'))
revans2 marked this conversation as resolved.
Show resolved Hide resolved
])
def test_existence_join(numComplementsToExists, aqeEnabled, spark_tmp_table_factory):
leftTable = spark_tmp_table_factory.get()
rightTable = spark_tmp_table_factory.get()
def do_join(spark):
# create non-overlapping ranges to have a mix of exists=true and exists=false
spark.createDataFrame([v] for v in range(2, 10)).createOrReplaceTempView(leftTable)
spark.createDataFrame([v] for v in range(0, 8)).createOrReplaceTempView(rightTable)
# duplicate every row in the rhs to verify it does not affect
# the the number of the output rows, which should be equal to the number of the
gerashegalov marked this conversation as resolved.
Show resolved Hide resolved
# left-hand side rows
lhs_upper_bound = 10
lhs_data = list((f"left_{v}", v * 10, v * 100) for v in range(2, lhs_upper_bound))
lhs_data.append(('left_null', None, None))
df_left = spark.createDataFrame(lhs_data)
df_left.createOrReplaceTempView(leftTable)

rhs_data = list((f"right_{v}", v * 10, v * 100) for v in range(0, 8))
rhs_data.append(('right_null', None, None))
rhs_data_with_dupes=[]
for dupe in rhs_data:
rhs_data_with_dupes.extend([dupe, dupe])

df_right = spark.createDataFrame(rhs_data_with_dupes)
df_right.createOrReplaceTempView(rightTable)

assert df_right.count() == df_left.count() * 2
assert df_right.distinct().count() == df_left.count()
revans2 marked this conversation as resolved.
Show resolved Hide resolved

res = spark.sql((
"select * "
"from {} as l "
"where l._1 < 0 "
" OR l._1 in (select * from {} as r)"
f"where l._2 >= {10 * (lhs_upper_bound - numComplementsToExists)}"
" or exists (select * from {} as r where r._2 = l._2 AND r._3 = l._3)"
).format(leftTable, rightTable))
return res
assert_cpu_and_gpu_are_equal_collect_with_capture(do_join, r".+Join ExistenceJoin\(exists#[0-9]+\).+")

assert_cpu_and_gpu_are_equal_collect_with_capture(do_join, r"ExistenceJoin\(exists#[0-9]+\)",
conf={
"spark.sql.adaptive.enabled": aqeEnabled,
jlowe marked this conversation as resolved.
Show resolved Hide resolved
})
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ package com.nvidia.spark.rapids

import scala.collection.mutable

import ai.rapids.cudf.{GatherMap, NvtxColor, OutOfBoundsPolicy}
import ai.rapids.cudf.{ColumnVector, GatherMap, NvtxColor, OutOfBoundsPolicy, Scalar, Table}

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.{InnerLike, JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, InnerLike, JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
Expand Down Expand Up @@ -298,14 +299,20 @@ abstract class SplittableJoinIterator(
None
}

val lazyLeftMap = LazySpillableGatherMap(leftMap, spillCallback, "left_map")
// TODO refactor lazyLeftMap out of the way, lazy workaround
revans2 marked this conversation as resolved.
Show resolved Hide resolved
lazy val lazyLeftMap = LazySpillableGatherMap(leftMap, spillCallback, "left_map")
val gatherer = rightMap match {
case None =>
// When there isn't a `rightMap` we are in either LeftSemi or LeftAnti joins.
jlowe marked this conversation as resolved.
Show resolved Hide resolved
// In these cases, the map and the table are both the left side, and everything in the map
// is a match on the left table, so we don't want to check for bounds.
rightData.close()
JoinGatherer(lazyLeftMap, leftData, OutOfBoundsPolicy.DONT_CHECK)
joinType match {
case ExistenceJoin(_) =>
createExistenceJoinGatherer(leftData, lazyLeftMap)
case _ =>
JoinGatherer(lazyLeftMap, leftData, OutOfBoundsPolicy.DONT_CHECK)
}
case Some(right) =>
// Inner joins -- manifest the intersection of both left and right sides. The gather maps
// contain the number of rows that must be manifested, and every index
Expand Down Expand Up @@ -344,4 +351,38 @@ abstract class SplittableJoinIterator(
maps.foreach(_.close())
}
}

private def createExistenceJoinGatherer(
lhs: LazySpillableColumnarBatch,
existenceGatherMap: LazySpillableGatherMap
): JoinGatherer = {
// cuDF executes left semijoin, the gatherer is constructed with a new
// gather to gather every row from lhs
//
// we build a new rhs with a the "exists" Boolean column that has as many rows
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels off. with a the "exists" I think the a is a typo

// as the input from from a false-Scalar, then scatter true-Scalar using the original
jlowe marked this conversation as resolved.
Show resolved Hide resolved
revans2 marked this conversation as resolved.
Show resolved Hide resolved
// semijoin lhs-GatherMap labeling rows that have at least one match in the original
// rhs
//
val rhsExistsCB = withResource(Scalar.fromBool(false)) { falseScalar =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is way too deeply nested for me. Could we try to break it up some? The falseScalar is only used to create falseCV. It might also be nice to create a method for Table.scatter that takes the columnView and a single Scalar as input, and does all of the wrapping/unwrapping. to make this code that much more readable.

withResource(Scalar.fromBool(true)) { trueScalar =>
jlowe marked this conversation as resolved.
Show resolved Hide resolved
withResource(ColumnVector.fromScalar(falseScalar, lhs.numRows)) { falseCV =>
jlowe marked this conversation as resolved.
Show resolved Hide resolved
withResource(new Table(falseCV)) { existsNothingTable =>
// auto close original gather map
withResource(existenceGatherMap) { egMap =>
jlowe marked this conversation as resolved.
Show resolved Hide resolved
val numExists = egMap.getRowCount.toInt
withResource(egMap.toColumnView(0, numExists)) { existsView =>
withResource(
Table.scatter(Array(trueScalar), existsView, existsNothingTable, false)
) { existsTable =>
withResource(
GpuColumnVector.from(existsTable, Array[DataType](BooleanType))
) { existsBatch =>
LazySpillableColumnarBatch.apply(existsBatch, spillCallback, "right_data")
revans2 marked this conversation as resolved.
Show resolved Hide resolved
}}}}}}}}

val lazyRightMap: LazySpillableGatherMap = LazySpillableGatherMap.identity(lhs.numRows)
jlowe marked this conversation as resolved.
Show resolved Hide resolved
JoinGatherer(lazyRightMap, lhs, lazyRightMap, rhsExistsCB,
OutOfBoundsPolicy.DONT_CHECK, OutOfBoundsPolicy.DONT_CHECK)
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -336,6 +336,17 @@ object LazySpillableGatherMap {

def rightCross(leftCount: Int, rightCount: Int): LazySpillableGatherMap =
new RightCrossGatherMap(leftCount, rightCount)

def identity(numRows: Long): LazySpillableGatherMap = new LazySpillableGatherMap {
override val getRowCount = numRows
override def toColumnView(startRow: Long, numRows: Int): ColumnView = {
withResource(GpuScalar.from(startRow, LongType)) { startScalar =>
ai.rapids.cudf.ColumnVector.sequence(startScalar, numRows)
}
}
override def close(): Unit = {}
override def allowSpilling(): Unit = {}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,9 @@ object ExecutionPlanCaptureCallback {
case p if p.expressions.exists(containsExpression(_, className, regexMap)) =>
true
case p: SparkPlan =>
val sparkPlanStringForRegex = p.verboseStringWithSuffix(1000)
regexMap.getOrElseUpdate(className, className.r)
.findFirstIn(p.simpleStringWithNodeId())
.findFirstIn(sparkPlanStringForRegex)
.nonEmpty
}.nonEmpty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,11 @@ object RapidsConf {
.booleanConf
.createWithDefault(true)

val ENABLE_EXISTENCE_JOIN = conf("spark.rapids.sql.join.existence.enabled")
.doc("When set to true existence joins are enabled on the GPU")
.booleanConf
.createWithDefault(true)

val ENABLE_PROJECT_AST = conf("spark.rapids.sql.projectAstEnabled")
.doc("Enable project operations to use cudf AST expressions when possible.")
.internal()
Expand Down Expand Up @@ -1603,6 +1608,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val areLeftAntiJoinsEnabled: Boolean = get(ENABLE_LEFT_ANTI_JOIN)

lazy val areExistenceJoinsEnabled: Boolean = get(ENABLE_EXISTENCE_JOIN)

lazy val isCastDecimalToFloatEnabled: Boolean = get(ENABLE_CAST_DECIMAL_TO_FLOAT)

lazy val isCastFloatToDecimalEnabled: Boolean = get(ENABLE_CAST_FLOAT_TO_DECIMAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ object JoinTypeChecks {
case LeftAnti if !conf.areLeftAntiJoinsEnabled =>
meta.willNotWorkOnGpu("left anti joins have been disabled. To enable set " +
s"${RapidsConf.ENABLE_LEFT_ANTI_JOIN.key} to true")
case ExistenceJoin(_) if !conf.areExistenceJoinsEnabled =>
meta.willNotWorkOnGpu("existence joins have been disabled. To enable set " +
s"${RapidsConf.ENABLE_EXISTENCE_JOIN.key} to true")
case _ => // not disabled
}
}
Expand Down Expand Up @@ -107,7 +110,7 @@ object GpuHashJoin extends Arm {
JoinTypeChecks.tagForGpu(joinType, meta)
joinType match {
case _: InnerLike =>
case RightOuter | LeftOuter | LeftSemi | LeftAnti =>
case RightOuter | LeftOuter | LeftSemi | LeftAnti | ExistenceJoin(_) =>
conditionMeta.foreach(meta.requireAstForGpuOn)
case FullOuter =>
conditionMeta.foreach(meta.requireAstForGpuOn)
Expand Down Expand Up @@ -411,13 +414,17 @@ class HashJoinIterator(
rightData: LazySpillableColumnarBatch): Option[JoinGatherer] = {
withResource(new NvtxWithMetrics("hash join gather map", NvtxColor.ORANGE, joinTime)) { _ =>
val maps = joinType match {
case LeftOuter => leftKeys.leftJoinGatherMaps(rightKeys, compareNullsEqual)
case LeftOuter =>
leftKeys.leftJoinGatherMaps(rightKeys, compareNullsEqual)
case RightOuter =>
// Reverse the output of the join, because we expect the right gather map to
// always be on the right
rightKeys.leftJoinGatherMaps(leftKeys, compareNullsEqual).reverse
case _: InnerLike => leftKeys.innerJoinGatherMaps(rightKeys, compareNullsEqual)
case LeftSemi => Array(leftKeys.leftSemiJoinGatherMap(rightKeys, compareNullsEqual))
case LeftSemi | ExistenceJoin(_) =>
// existence join is implemented as a LeftSemi join with GatherMap converted
// to the Boolean column "exists"
Array(leftKeys.leftSemiJoinGatherMap(rightKeys, compareNullsEqual))
case LeftAnti => Array(leftKeys.leftAntiJoinGatherMap(rightKeys, compareNullsEqual))
case FullOuter => leftKeys.fullJoinGatherMaps(rightKeys, compareNullsEqual)
case _ =>
Expand Down