Skip to content

Commit

Permalink
Take semaphore after first stream batch is materialized (broadcast) (#…
Browse files Browse the repository at this point in the history
…6709)

* Fix semaphore acquisition before stream side IO in broadcast join

Signed-off-by: Alessandro Bellina <abellina@nvidia.com>

* Break up CloseableBufferedIterator into its own class

* Fix typo

Signed-off-by: Alessandro Bellina <abellina@nvidia.com>
  • Loading branch information
abellina authored Oct 6, 2022
1 parent 9b6b62a commit fb2e576
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import org.apache.spark.TaskContext

/**
* Helper iterator that wraps a BufferedIterator of AutoCloseable subclasses.
* This iterator also implements AutoCloseable, so it can be closed in case
* of exceptions.
*
* @param wrapped the buffered iterator
* @tparam T an AutoCloseable subclass
*/
class CloseableBufferedIterator[T <: AutoCloseable](wrapped: BufferedIterator[T])
extends BufferedIterator[T] with AutoCloseable {
// register against task completion to close any leaked buffered items
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => close()))

private[this] var isClosed = false
override def head: T = wrapped.head
override def headOption: Option[T] = wrapped.headOption
override def next: T = wrapped.next
override def hasNext: Boolean = wrapped.hasNext
override def close(): Unit = {
if (!isClosed) {
headOption.foreach(_.close())
isClosed = true
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

package com.nvidia.spark.rapids

import ai.rapids.cudf.{NvtxColor, NvtxRange}
import com.nvidia.spark.rapids.shims.ShimBinaryExecNode

import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
Expand All @@ -28,6 +31,7 @@ import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelationBroadcastMode}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExec, GpuBroadcastHelper, GpuHashJoin, JoinTypeChecks}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch

class GpuBroadcastHashJoinMeta(
Expand Down Expand Up @@ -149,6 +153,43 @@ case class GpuBroadcastHashJoinExec(
throw new IllegalStateException(
"GpuBroadcastHashJoin does not support row-based processing")

/**
* Gets the ColumnarBatch for the build side and the stream iterator by
* acquiring the GPU only after first stream batch has been streamed to GPU.
*
* `broadcastRelation` represents the broadcasted build side table on the host. The code
* in this function peaks at the stream side, after having wrapped it in a closeable
* buffered iterator, to cause the stream side to produce the first batch. This delays
* acquiring the semaphore until after the stream side performs all the steps needed
* (including IO) to produce that first batch. Once the first stream batch is produced,
* the build side is materialized to the GPU (while holding the semaphore).
*
* TODO: This could try to trigger the broadcast materialization on the host before
* getting started on the stream side (e.g. call `broadcastRelation.value`).
*/
private def getBroadcastBuiltBatchAndStreamIter(
broadcastRelation: Broadcast[Any],
buildSchema: StructType,
streamIter: Iterator[ColumnarBatch],
coalesceMetricsMap: Map[String, GpuMetric]): (ColumnarBatch, Iterator[ColumnarBatch]) = {
val semWait = coalesceMetricsMap(GpuMetric.SEMAPHORE_WAIT_TIME)

val bufferedStreamIter = new CloseableBufferedIterator(streamIter.buffered)
closeOnExcept(bufferedStreamIter) { _ =>
withResource(new NvtxRange("first stream batch", NvtxColor.RED)) { _ =>
if (bufferedStreamIter.hasNext) {
bufferedStreamIter.head
} else {
GpuSemaphore.acquireIfNecessary(TaskContext.get(), semWait)
}
}

val buildBatch =
GpuBroadcastHelper.getBroadcastBatch(broadcastRelation, buildSchema)
(buildBatch, bufferedStreamIter)
}
}

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
Expand All @@ -166,10 +207,14 @@ case class GpuBroadcastHashJoinExec(
val rdd = streamedPlan.executeColumnar()
val buildSchema = buildPlan.schema
rdd.mapPartitions { it =>
val stIt = new CollectTimeIterator("broadcast join stream", it, streamTime)
withResource(
GpuBroadcastHelper.getBroadcastBatch(broadcastRelation, buildSchema)) { builtBatch =>
doJoin(builtBatch, stIt, targetSize, spillCallback,
val (builtBatch, streamIter) =
getBroadcastBuiltBatchAndStreamIter(
broadcastRelation,
buildSchema,
new CollectTimeIterator("broadcast join stream", it, streamTime),
allMetrics)
withResource(builtBatch) { _ =>
doJoin(builtBatch, streamIter, targetSize, spillCallback,
numOutputRows, joinOutputRows, numOutputBatches, opTime, joinTime)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,32 +190,6 @@ case class GpuShuffledHashJoinExec(
}

object GpuShuffledHashJoinExec extends Arm {
/**
* Helper iterator that wraps a BufferedIterator of AutoCloseable subclasses.
* This iterator also implements AutoCloseable, so it can be closed in case
* of exceptions.
*
* @param wrapped the buffered iterator
* @tparam T an AutoCloseable subclass
*/
class CloseableBufferedIterator[T <: AutoCloseable](wrapped: BufferedIterator[T])
extends BufferedIterator[T] with AutoCloseable {
// register against task completion to close any leaked buffered items
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => close()))

private[this] var isClosed = false
override def head: T = wrapped.head
override def headOption: Option[T] = wrapped.headOption
override def next: T = wrapped.next
override def hasNext: Boolean = wrapped.hasNext
override def close(): Unit = {
if (!isClosed) {
headOption.foreach(_.close())
isClosed = true
}
}
}

/**
* Gets a `ColumnarBatch` and stream Iterator[ColumnarBatch] pair by acquiring
* the GPU semaphore optimally in the scenario where the build side is relatively
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

package org.apache.spark.sql.rapids.execution

import com.nvidia.spark.rapids.GpuColumnVector
import ai.rapids.cudf.{NvtxColor, NvtxRange}
import com.nvidia.spark.rapids.{Arm, GpuColumnVector}
import com.nvidia.spark.rapids.shims.SparkShimImpl

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch

object GpuBroadcastHelper {
object GpuBroadcastHelper extends Arm {
/**
* Given a broadcast relation get a ColumnarBatch that can be used on the GPU.
*
Expand All @@ -41,7 +42,9 @@ object GpuBroadcastHelper {
broadcastSchema: StructType): ColumnarBatch = {
broadcastRelation.value match {
case broadcastBatch: SerializeConcatHostBuffersDeserializeBatch =>
broadcastBatch.batch.getColumnarBatch()
withResource(new NvtxRange("getBroadcastBatch", NvtxColor.YELLOW)) { _ =>
broadcastBatch.batch.getColumnarBatch()
}
case v if SparkShimImpl.isEmptyRelation(v) =>
GpuColumnVector.emptyBatch(broadcastSchema)
case t =>
Expand Down

0 comments on commit fb2e576

Please sign in to comment.