diff --git a/docs/tuning-guide.md b/docs/tuning-guide.md index 14b0d9d2c26..e6c9da364f7 100644 --- a/docs/tuning-guide.md +++ b/docs/tuning-guide.md @@ -310,6 +310,7 @@ Custom Spark SQL Metrics are available which can help identify performance bottl | concatTime | concat batch time | Time to concatenate batches. | | filterTime | filter time | Time spent applying filters within other operators, such as joins. | | gpuDecodeTime | GPU decode time | Time spent on GPU decoding encrypted or compressed data. | +| gpuOpTime | GPU op time | Time that an operator spends performing computation on the GPU. | | joinOutputRows | join output rows | The number of rows produced by a join before any filter expression is applied. | | joinTime | join time | Total time for performing a join. | | numInputBatches | input columnar batches | Number of columnar batches that the operator received from its child operator(s). | @@ -320,6 +321,7 @@ Custom Spark SQL Metrics are available which can help identify performance bottl | opTime | op time | Time that an operator takes, exclusive of the time for executing or fetching results from child operators. | | partitionSize | partition data size | Total size in bytes of output partitions. | | peakDevMemory | peak device memory | Peak GPU memory used during execution of an operator. | +| semaphoreWaitTime| GPU semaphore wait time | Time spent waiting for the GPU semaphore. | | sortTime | sort time | Time spent in sort operations in GpuSortExec and GpuTopN. | | spillData | bytes spilled from GPU | Total bytes spilled from GPU. | | spillDisk | bytes spilled to disk | Total bytes spilled from GPU to disk. | diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java index 2864a607aef..aec809936d0 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.vectorized.ColumnarBatch; +import scala.Option; import scala.collection.Iterator; import java.util.ArrayList; @@ -49,6 +50,8 @@ public abstract class UnsafeRowToColumnarBatchIterator implements Iterator input, Attribute[] schema, CoalesceSizeGoal goal, + GpuMetric semaphoreWaitTime, + GpuMetric gpuOpTime, GpuMetric totalTime, GpuMetric numInputRows, GpuMetric numOutputRows, @@ -74,6 +79,8 @@ protected UnsafeRowToColumnarBatchIterator( rapidsTypes[i] = GpuColumnVector.getNonNestedRapidsType(schema[i].dataType()); outputTypes[i] = schema[i].dataType(); } + this.semaphoreWaitTime = semaphoreWaitTime; + this.gpuOpTime = gpuOpTime; this.totalTime = totalTime; this.numInputRows = numInputRows; this.numOutputRows = numOutputRows; @@ -100,7 +107,8 @@ public ColumnarBatch next() { // buffers. One will be for the byte data and the second will be for the offsets. We will then // write the data directly into those buffers using code generation in a child of this class. // that implements fillBatch. - try (HostMemoryBuffer dataBuffer = HostMemoryBuffer.allocate(dataLength); + try (NvtxWithMetrics nvtx = new NvtxWithMetrics("RowToColumnar", NvtxColor.CYAN, totalTime); + HostMemoryBuffer dataBuffer = HostMemoryBuffer.allocate(dataLength); HostMemoryBuffer offsetsBuffer = HostMemoryBuffer.allocate(((long)numRowsEstimate + 1) * BYTES_PER_OFFSET)) { @@ -135,13 +143,9 @@ public ColumnarBatch next() { // Grab the semaphore because we are about to put data onto the GPU. TaskContext tc = TaskContext.get(); if (tc != null) { - GpuSemaphore$.MODULE$.acquireIfNecessary(tc); - } - if (totalTime != null) { - buildRange = new NvtxWithMetrics("RowToColumnar", NvtxColor.GREEN, totalTime); - } else { - buildRange = new NvtxRange("RowToColumnar", NvtxColor.GREEN); + GpuSemaphore$.MODULE$.acquireIfNecessary(tc, semaphoreWaitTime); } + buildRange = NvtxWithMetrics.apply("RowToColumnar: build", NvtxColor.GREEN, Option.apply(gpuOpTime)); devColumn = hostColumn.copyToDevice(); } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala index 847dec8b70f..aaa654e4617 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala @@ -52,6 +52,8 @@ object GpuMetric extends Logging { val NUM_PARTITIONS = "numPartitions" val TOTAL_TIME = "totalTime" val OP_TIME = "opTime" + val GPU_OP_TIME = "gpuOpTime" + val SEMAPHORE_WAIT_TIME = "semaphoreWaitTime" val PEAK_DEVICE_MEMORY = "peakDevMemory" val COLLECT_TIME = "collectTime" val CONCAT_TIME = "concatTime" @@ -78,6 +80,8 @@ object GpuMetric extends Logging { val DESCRIPTION_NUM_PARTITIONS = "partitions" val DESCRIPTION_TOTAL_TIME = "total time" val DESCRIPTION_OP_TIME = "op time" + val DESCRIPTION_GPU_OP_TIME = "GPU op time" + val DESCRIPTION_SEMAPHORE_WAIT_TIME = "GPU semaphore wait time" val DESCRIPTION_PEAK_DEVICE_MEMORY = "peak device memory" val DESCRIPTION_COLLECT_TIME = "collect batch time" val DESCRIPTION_CONCAT_TIME = "concat batch time" diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala index 7c4e02592e6..ef8817dd10a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala @@ -554,7 +554,9 @@ class RowToColumnarIterator( totalTime: GpuMetric = NoopMetric, numInputRows: GpuMetric = NoopMetric, numOutputRows: GpuMetric = NoopMetric, - numOutputBatches: GpuMetric = NoopMetric) extends Iterator[ColumnarBatch] with Arm { + numOutputBatches: GpuMetric = NoopMetric, + semaphoreWaitTime: GpuMetric = NoopMetric, + gpuOpTime: GpuMetric = NoopMetric) extends Iterator[ColumnarBatch] with Arm { private val targetSizeBytes = localGoal.targetSizeBytes private var targetRows = 0 @@ -570,7 +572,8 @@ class RowToColumnarIterator( buildBatch() } - private def buildBatch(): ColumnarBatch = { + private def buildBatch(): ColumnarBatch = withResource( + new NvtxWithMetrics("RowToColumnar", NvtxColor.CYAN, totalTime)) { _ => // estimate the size of the first batch based on the schema if (targetRows == 0) { @@ -607,9 +610,11 @@ class RowToColumnarIterator( // About to place data back on the GPU // note that TaskContext.get() can return null during unit testing so we wrap it in an // option here - Option(TaskContext.get()).foreach(GpuSemaphore.acquireIfNecessary) + Option(TaskContext.get()) + .foreach(ctx => GpuSemaphore.acquireIfNecessary(ctx, semaphoreWaitTime)) - val ret = withResource(new NvtxWithMetrics("RowToColumnar", NvtxColor.GREEN, totalTime)) { _=> + val ret = withResource(new NvtxWithMetrics("RowToColumnar", NvtxColor.GREEN, + gpuOpTime)) { _ => builders.build(rowCount) } numInputRows += rowCount @@ -636,6 +641,8 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging { def apply(input: Iterator[UnsafeRow], schema: Array[Attribute], goal: CoalesceSizeGoal, + semaphoreWaitTime: GpuMetric, + gpuOpTime: GpuMetric, totalTime: GpuMetric, numInputRows: GpuMetric, numOutputRows: GpuMetric, @@ -645,6 +652,8 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging { ctx.addReferenceObj("iter", input, classOf[Iterator[UnsafeRow]].getName) ctx.addReferenceObj("schema", schema, classOf[Array[Attribute]].getName) ctx.addReferenceObj("goal", goal, classOf[CoalesceSizeGoal].getName) + ctx.addReferenceObj("semaphoreWaitTime", semaphoreWaitTime, classOf[GpuMetric].getName) + ctx.addReferenceObj("gpuOpTime", gpuOpTime, classOf[GpuMetric].getName) ctx.addReferenceObj("totalTime", totalTime, classOf[GpuMetric].getName) ctx.addReferenceObj("numInputRows", numInputRows, classOf[GpuMetric].getName) ctx.addReferenceObj("numOutputRows", numOutputRows, classOf[GpuMetric].getName) @@ -715,7 +724,9 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging { | (com.nvidia.spark.rapids.GpuMetric)references[3], | (com.nvidia.spark.rapids.GpuMetric)references[4], | (com.nvidia.spark.rapids.GpuMetric)references[5], - | (com.nvidia.spark.rapids.GpuMetric)references[6]); + | (com.nvidia.spark.rapids.GpuMetric)references[6], + | (com.nvidia.spark.rapids.GpuMetric)references[7], + | (com.nvidia.spark.rapids.GpuMetric)references[8]); | ${ctx.initMutableStates()} | } | @@ -806,7 +817,9 @@ case class GpuRowToColumnarExec(child: SparkPlan, goal: CoalesceSizeGoal) } override lazy val additionalMetrics: Map[String, GpuMetric] = Map( - TOTAL_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_TOTAL_TIME), + SEMAPHORE_WAIT_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_SEMAPHORE_WAIT_TIME), + GPU_OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_GPU_OP_TIME), + TOTAL_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_TOTAL_TIME), NUM_INPUT_ROWS -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_ROWS) ) @@ -817,6 +830,8 @@ case class GpuRowToColumnarExec(child: SparkPlan, goal: CoalesceSizeGoal) val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES) val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS) val totalTime = gpuLongMetric(TOTAL_TIME) + val gpuOpTime = gpuLongMetric(GPU_OP_TIME) + val semaphoreWaitTime = gpuLongMetric(SEMAPHORE_WAIT_TIME) val localGoal = goal val rowBased = child.execute() @@ -831,13 +846,13 @@ case class GpuRowToColumnarExec(child: SparkPlan, goal: CoalesceSizeGoal) val localOutput = output rowBased.mapPartitions(rowIter => GeneratedUnsafeRowToCudfRowIterator( rowIter.asInstanceOf[Iterator[UnsafeRow]], - localOutput.toArray, localGoal, totalTime, numInputRows, numOutputRows, - numOutputBatches)) + localOutput.toArray, localGoal, semaphoreWaitTime, gpuOpTime, totalTime, + numInputRows, numOutputRows, numOutputBatches)) } else { val converters = new GpuRowToColumnConverter(localSchema) rowBased.mapPartitions(rowIter => new RowToColumnarIterator(rowIter, localSchema, localGoal, converters, - totalTime, numInputRows, numOutputRows, numOutputBatches)) + totalTime, numInputRows, numOutputRows, numOutputBatches, semaphoreWaitTime, gpuOpTime)) } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala index 1fb2fa0fe94..35bba17fc8e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala @@ -73,7 +73,20 @@ object GpuSemaphore { */ def acquireIfNecessary(context: TaskContext): Unit = { if (enabled && context != null) { - getInstance.acquireIfNecessary(context) + getInstance.acquireIfNecessary(context, None) + } + } + + /** + * Tasks must call this when they begin to use the GPU. + * If the task has not already acquired the GPU semaphore then it is acquired, + * blocking if necessary. + * NOTE: A task completion listener will automatically be installed to ensure + * the semaphore is always released by the time the task completes. + */ + def acquireIfNecessary(context: TaskContext, waitMetric: GpuMetric): Unit = { + if (enabled && context != null) { + getInstance.acquireIfNecessary(context, Some(waitMetric)) } } @@ -98,14 +111,13 @@ object GpuSemaphore { } } -private final class GpuSemaphore(tasksPerGpu: Int) extends Logging { +private final class GpuSemaphore(tasksPerGpu: Int) extends Logging with Arm { private val semaphore = new Semaphore(tasksPerGpu) // Map to track which tasks have acquired the semaphore. private val activeTasks = new ConcurrentHashMap[Long, MutableInt] - def acquireIfNecessary(context: TaskContext): Unit = { - val nvtxRange = new NvtxRange("Acquire GPU", NvtxColor.RED) - try { + def acquireIfNecessary(context: TaskContext, waitMetric: Option[GpuMetric] = None): Unit = { + withResource(NvtxWithMetrics.apply("Acquire GPU", NvtxColor.RED, waitMetric)) { _ => val taskAttemptId = context.taskAttemptId() val refs = activeTasks.get(taskAttemptId) if (refs == null || refs.getValue == 0) { @@ -120,8 +132,6 @@ private final class GpuSemaphore(tasksPerGpu: Int) extends Logging { } GpuDeviceManager.initializeFromTask() } - } finally { - nvtxRange.close() } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/NvtxWithMetrics.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/NvtxWithMetrics.scala index 633ce75bd41..54067083cd8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/NvtxWithMetrics.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/NvtxWithMetrics.scala @@ -18,6 +18,15 @@ package com.nvidia.spark.rapids import ai.rapids.cudf.{NvtxColor, NvtxRange} +object NvtxWithMetrics { + def apply(name: String, color: NvtxColor, metric: Option[GpuMetric]): NvtxRange = { + metric match { + case Some(m) => new NvtxWithMetrics(name, color, m) + case _ => new NvtxRange(name, color) + } + } +} + /** * NvtxRange with option to pass one or more nano timing metric(s) that are updated upon close * by the amount of time spent in the range