Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Leverage cudf conditional nested loop join to implement semi/anti hash join with condition #4345

Closed
wants to merge 13 commits into from

Conversation

jbrennan333
Copy link
Contributor

Signed-off-by: Jim Brennan jimb@nvidia.com

Closes #4309

This is an experiment to see if replacing conditional hash joins that currently run on the cpu with conditional nested loop joins that run on the gpu will result in an overall performance improvement for tpc-ds queries, or at least to provide evidence that this improves some queries. Note that CUDF team is working on adding support for conditional hash joins, which would be a better solution in the long run.

This initial commit only has the conversion for GpuBroadcastHashJoin -> GpuBroadcastNestedLoopJoin. Support for the others (GpuShuffledHashJoin and GpuSortMergeJoin) will be added in later commits.

Signed-off-by: Jim Brennan <jimb@nvidia.com>
@jbrennan333 jbrennan333 self-assigned this Dec 10, 2021
@jbrennan333 jbrennan333 added the performance A performance related task/issue label Dec 10, 2021
joinType match {
case _: InnerLike =>
case RightOuter | LeftOuter | LeftSemi | LeftAnti =>
unSupportNonEqualNonAst()
Copy link
Collaborator

@abellina abellina Dec 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vote for the unSupport* methods to be just part of the case statement, so not defined as separate methods and kept as part of the match.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this commit, I was trying to minimize diffs with GpuHashJoin.tagJoin, in the hopes that once I implement the other two joins we can use the common method for all of them again. So I will hold off on making this change for now until I can make it in the common method.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointer @jbrennan333, agree with keeping the diffs minimal with that existing function.

}

if (substituteBroadcastNestedLoopJoin) {
val joinExec = ShimLoader.getSparkShims.getGpuBroadcastNestedLoopJoinShim(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could just return here, without defining joinExec)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I will change this.

@jbrennan333
Copy link
Contributor Author

build

Copy link
Collaborator

@abellina abellina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, just minor things. I think we are still adding the config to enable this before we merge the PR right?


override def tagPlanForGpu(): Unit = {
GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition)
//GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be removed?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment for each shim.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can. I left it there because my intention is to have GpuHashJoin be common again after I implement the other two hash joins. If we decide we want to do those in a separate PR, I will fix this. Otherwise it will be fixed in later commit to this PR.

// If there is an AST condition, we need to make sure the build side works for
// GpuBroadcastNestedLoopJoin
if (isAstCondition()) {
join.joinType match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
join.joinType match {
joinType match {

there are other uses of join.joinType but you defined joinType so I'd make them all consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this was another minor change to minimize diffs with GpuHashJoin.tagJoin. I will make them consistent.

// as a filter after the join when possible.
condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec)

val condition = conditionMeta.map(_.convertToGpu())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should convertToGpu happen if and only if this is the correct type of join and isAstCondition? In other words, should it happen given substituteBroadcastNestedLoopJoin is true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can be. I will remove the local val and move it to where it's used.

joinType match {
case _: InnerLike =>
case RightOuter | LeftOuter | LeftSemi | LeftAnti =>
unSupportNonEqualNonAst()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointer @jbrennan333, agree with keeping the diffs minimal with that existing function.

def test_right_broadcast_hash_join_ast_override(data_gen, join_type):
def do_join(spark):
left, right = create_ridealong_df(spark, short_gen, data_gen, 500, 50)
return left.join(broadcast(right), (left.b >= right.r_b), join_type)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not testing a hash join but rather a nested loop join. It's effectively the same test as test_right_broadcast_nested_loop_join_with_ast_condition. The join condition needs an equality check logically AND'd with an inequality check. The test below has the same issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to test the conversion from GpuBroadcastHashJoin to GpuBroadcastNestedLoopJoin. I originally had versions of these with no condition, and they were definitely using GpuBroadcastHashJoin, which I verified with: @validate_execs_in_gpu_plan('GpuBroadcastHashJoinExec'). I'm not sure why I deleted those versions. I will add them back in my next commit so you can see what I have. They were modeled after test_hash_join_ridealong, and use the _hash_join_conf.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The join condition will need to be on two key columns. One will be testing for equality while the other needs to test for an inequality. Therefore create_ridealong_df is not appropriate for this, since it only creates one key column that is joinable and another that is not (it's a nested type).

Comment on lines 90 to 98
if (isAstCondition()) {
join.joinType match {
case LeftOuter | LeftSemi | LeftAnti if gpuBuildSide == GpuBuildLeft =>
willNotWorkOnGpu(s"build left not supported for conditional ${join.joinType}")
case RightOuter if gpuBuildSide == GpuBuildRight =>
willNotWorkOnGpu(s"build right not supported for conditional ${join.joinType}")
case _ =>
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this conditional on having an AST condition? I don't think we support build-side-matches-join-side for any type of join at the moment.

However since we are switching a hash-join to a nested loop join (which doesn't really have a build side), we could get tricky here and swap the build side as we swap the join implementation to allow it to be supported.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this conditional because GpuHashJoin.tagJoin and GpuBroadcastHashJoin.tagJoin did not previously have this check (maybe I am not seeing it?). Since it appeared to be required for GpuBroadcastNestedLoopJoin, I thought I should add it in the substitution case. If I can make it unconditional, I will. Or I can remove it and try doing the switcheroo in convertToGpu.

plan1.collect()

val finalPlan1 = findOperator(plan1.queryExecution.executedPlan,
_.isInstanceOf[GpuBroadcastNestedLoopJoinExecBase])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should be integrated into the new integration tests using assert_cpu_and_gpu_are_equal_collect_with_capture

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally added this before I added the integration tests. I will remove it.

@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', basic_nested_gens + decimal_128_gens, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'LeftSemi', 'LeftAnti'], ids=idfn)
def test_right_broadcast_hash_join_ridealong(data_gen, join_type):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a duplicate of the existing test_broadcast_join_right_table_ridealong test? I'm not sure why this PR would be adding tests for joins without any condition.

def test_right_broadcast_hash_join_ast_override(data_gen, join_type):
def do_join(spark):
left, right = create_ridealong_df(spark, short_gen, data_gen, 500, 50)
return left.join(broadcast(right), (left.b >= right.r_b), join_type)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The join condition will need to be on two key columns. One will be testing for equality while the other needs to test for an inequality. Therefore create_ridealong_df is not appropriate for this, since it only creates one key column that is joinable and another that is not (it's a nested type).

_hash_join_conf = {'spark.sql.autoBroadcastJoinThreshold': '160',
'spark.sql.join.preferSortMergeJoin': 'false',
'spark.sql.shuffle.partitions': '2',
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true',
'spark.rapids.sql.replaceConditionalHashJoin.enabled': 'true'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be a test parameter (@pytest.mark.parametrize). We don't want to change the behavior of all tests necessarily, but test with it on and off in specific tests. Thoughts?

@@ -562,6 +562,12 @@ object RapidsConf {
.booleanConf
.createWithDefault(true)

val ENABLE_REPLACE_CONDITIONAL_HASHJOIN =
conf("spark.rapids.sql.replaceConditionalHashJoin.enabled")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation is a little off, but then again its off in other places in this file.

val ENABLE_REPLACE_CONDITIONAL_HASHJOIN =
conf("spark.rapids.sql.replaceConditionalHashJoin.enabled")
.doc("Allow replacing conditional hash joins with nested loop joins")
.booleanConf
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to add .internal()

@@ -0,0 +1,44 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2021, NVIDIA CORPORATION.

override val isSkewJoin: Boolean)(
cpuLeftKeys: Seq[Expression],
cpuRightKeys: Seq[Expression])
extends GpuShuffledNestedLoopJoinBase(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to trivially derive, we should just remove the base class and implement directly.

import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.rapids.execution.GpuShuffledNestedLoopJoinBase

case class GpuShuffledNestedLoopJoinExec(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs scaladoc comment explaining what this is and parameters

Comment on lines 50 to 51
BUILD_DATA_SIZE -> createSizeMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_DATA_SIZE),
BUILD_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_TIME),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit odd for a nested loop join, as we're not building a hash table.


withResource(ConcatAndConsumeAll.getSingleBatchWithVerification(buildIter,
localBuildOutput)) { builtBatch =>
GpuColumnVector.incRefCounts(builtBatch)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be here, it's getting double-closed due to the two withResource calls on builtBatch.


val spillableBuiltBatch = withResource(builtBatch) {
LazySpillableColumnarBatch(_, spillCallback, "built")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not hold builtBatch open once we've built the spillable batch

@@ -42,16 +43,64 @@ class GpuBroadcastHashJoinMeta(
join.leftKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
val rightKeys: Seq[BaseExprMeta[_]] =
join.rightKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
val condition: Option[BaseExprMeta[_]] =
val conditionMeta: Option[BaseExprMeta[_]] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to replace this with a nested loop join, we need to fold in the "equality" condition into the "inequality" condition that Spark already separated when choosing a hash join. The nested loop join needs the entire condition which is not covered by the condition parameter in the join node in Spark. This needs a separate condition, like nestedLoopCondition, where we "tack-on" the equality condition to the expression. For example if the hash join is on keys X and Y with a condition of W > Z then the condition will be simply W > Z but the nested loop join needs it to be X == Y AND W > Z. There are some types that could cause the equality itself to preclude being an AST (e.g.: AST cannot compare strings even for equality today).

@jbrennan333
Copy link
Contributor Author

Running q72 with this patch on my desktop at a scale of 10GB was 61x slower than without. So using conditional nested loop joins in place of the existing hash-join-with-post-filter makes q72 61x slower. On my desktop the query time went from 29 seconds to 1789 seconds with the patch.

So this experiment successfully showed that replacing hash-join-with-filter with a conditional nested loop join is not a good idea for q72. Fortunately, it looks like this cudf change: rapidsai/cudf#9917 will make this unnecessary.

@jlowe, @abellina are there other situations where we think this substitution might be useful?

@jlowe
Copy link
Member

jlowe commented Jan 4, 2022

Given how slow this was for inner joins, it seems very unlikely this will be a good idea even for semi/anti joins that currently fallback to the CPU. We'd have to see a significant performance improvement in libcudf nested loop join first, so I'm OK with closing this for now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance A performance related task/issue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] Leverage cudf conditional nested loop join to implement semi/anti hash join with condition
3 participants