-
Notifications
You must be signed in to change notification settings - Fork 237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize semaphore acquisition in GpuShuffledHashJoinExec #4588
Conversation
Signed-off-by: Alessandro Bellina <abellina@nvidia.com>
So just to be clear this optimization can never work with UCX right? |
Great point. I have missed a case where we need to ignore the config when UCX is turned on and we have compression on. If we don't do this, the fallback case right now doesn't handle compressed batches from the shuffle. |
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
/** | ||
* Removes `GpuCoalesceBatches(GpuShuffleCoalesceExec(build side))` for the build side | ||
* for the shuffled hash join. The coalesce logic has been moved to the | ||
* `GpuShuffleCoalesceExec` class, and is handled differently to prevent holding onto the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fine for now, but I really would prefer to have us just build the plan the right way from the beginning. Perhaps we can have a flag in GpuExec that says for this input I know how to handle raw shuffle data so then the rules that insert the host side coalesce can deal with it there.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
I added a commit that I think addresses many of the points, enough has changed that I think this needs another pass. One comment from Jason wasn't addressed for sure:
We need something to hold on to that first stream batch. That I think one ugly option is to attach the closeable iterator to the task completion logic. I think we discussed doing this, but I am confirming that's the approach or if there's something better. |
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/ai/rapids/cudf/HostConcatResultUtil.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/ai/rapids/cudf/HostConcatResultUtil.scala
Outdated
Show resolved
Hide resolved
GpuShuffledHashJoinExec
…rf/shj_optimization
build |
Looking into the test failures. |
build |
@@ -448,9 +458,84 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], | |||
batches.append(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_BATCHING_PRIORITY, | |||
spillCallback)) | |||
|
|||
protected def popAll(): Array[ColumnarBatch] = { | |||
closeOnExcept(batches.map(_.getColumnarBatch())) { wip => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: does this need to be a safeMap?
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala
Show resolved
Hide resolved
@revans2 I replied to your comment just now in the original request, but adding a link here so we can continue if there's more I need to look into:
|
build |
Ok I have a bug with |
build |
Closes #4539.
This PR adds an optimization for the shuffled hash join where the semaphore is not taken after the build side is fetched to the host (since we attempt to keep the build side on the host while we load the first batch from the stream side).
After the first stream batch is loaded, the semaphore has been acquired. We then proceed to bring the build batch to the GPU.
This code falls back to the old behavior in cases where batches are not serialized (our input is not a shuffle) and when the streams are empty.
It is in draft mode because I need to run more testing in NDS. I don't think the case where I need to loop over the build batch multiple times has been hit with my runs so far (other than unit tests), so I need to figure out a way to trigger that case.