From b6a9fd7f62f313408548c35d76a19e9ddde8252b Mon Sep 17 00:00:00 2001 From: Zach Puller Date: Wed, 4 Sep 2024 09:17:56 -0700 Subject: [PATCH 1/4] add max memory watermark metric Signed-off-by: Zach Puller --- .../nvidia/spark/rapids/GpuSemaphore.scala | 1 + .../spark/sql/rapids/GpuTaskMetrics.scala | 41 ++++++++++++++++++- 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala index ff02ab09647..a9566542eef 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala @@ -381,6 +381,7 @@ private final class GpuSemaphore() extends Logging { def completeTask(context: TaskContext): Unit = { val taskAttemptId = context.taskAttemptId() GpuTaskMetrics.get.updateRetry(taskAttemptId) + GpuTaskMetrics.get.updateMaxMemory(taskAttemptId) val refs = tasks.remove(taskAttemptId) if (refs == null) { throw new IllegalStateException(s"Completion of unknown task $taskAttemptId") diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala index c89e26f0a24..0bd9289f987 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala @@ -78,6 +78,35 @@ class NanoSecondAccumulator extends AccumulatorV2[jl.Long, NanoTime] { override def value: NanoTime = NanoTime(_sum) } +class WatermarkAccumulator extends AccumulatorV2[jl.Long, Long] { + private var _value = 0L + override def isZero: Boolean = _value == 0 + + override def copy(): WatermarkAccumulator = { + val newAcc = new WatermarkAccumulator + newAcc._value = this._value + newAcc + } + + override def reset(): Unit = { + _value = 0 + } + + override def add(v: jl.Long): Unit = { + _value += v + } + + override def merge(other: AccumulatorV2[jl.Long, Long]): Unit = other match { + case wa: WatermarkAccumulator => + _value = _value.max(wa._value) + case _ => + throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + override def value: Long = _value +} + class GpuTaskMetrics extends Serializable { private val semWaitTimeNs = new NanoSecondAccumulator private val retryCount = new LongAccumulator @@ -91,6 +120,8 @@ class GpuTaskMetrics extends Serializable { private val readSpillFromHostTimeNs = new NanoSecondAccumulator private val readSpillFromDiskTimeNs = new NanoSecondAccumulator + private val maxDeviceMemoryBytes = new WatermarkAccumulator + private val metrics = Map[String, AccumulatorV2[_, _]]( "gpuSemaphoreWait" -> semWaitTimeNs, "gpuRetryCount" -> retryCount, @@ -100,7 +131,8 @@ class GpuTaskMetrics extends Serializable { "gpuSpillToHostTime" -> spillToHostTimeNs, "gpuSpillToDiskTime" -> spillToDiskTimeNs, "gpuReadSpillFromHostTime" -> readSpillFromHostTimeNs, - "gpuReadSpillFromDiskTime" -> readSpillFromDiskTimeNs + "gpuReadSpillFromDiskTime" -> readSpillFromDiskTimeNs, + "gpuMaxDeviceMemoryBytes" -> maxDeviceMemoryBytes ) def register(sc: SparkContext): Unit = { @@ -178,6 +210,13 @@ class GpuTaskMetrics extends Serializable { retryComputationTime.add(compNs) } } + + def updateMaxMemory(taskAttemptId: Long): Unit = { + val maxMem = RmmSpark.getAndResetGpuMaxMemoryAllocated(taskAttemptId) + if (maxMem > 0) { + maxDeviceMemoryBytes.add(maxMem) + } + } } /** From ebb0803523c7acbfc4bc6facd47df708264df40c Mon Sep 17 00:00:00 2001 From: Zach Puller Date: Tue, 24 Sep 2024 10:32:07 -0700 Subject: [PATCH 2/4] pr comments Signed-off-by: Zach Puller --- .../scala/com/nvidia/spark/rapids/GpuSemaphore.scala | 2 +- .../org/apache/spark/sql/rapids/GpuTaskMetrics.scala | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala index a9566542eef..fab30853596 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala @@ -381,7 +381,7 @@ private final class GpuSemaphore() extends Logging { def completeTask(context: TaskContext): Unit = { val taskAttemptId = context.taskAttemptId() GpuTaskMetrics.get.updateRetry(taskAttemptId) - GpuTaskMetrics.get.updateMaxMemory(taskAttemptId) + GpuTaskMetrics.get.updateMaxGpuMemory(taskAttemptId) val refs = tasks.remove(taskAttemptId) if (refs == null) { throw new IllegalStateException(s"Completion of unknown task $taskAttemptId") diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala index 0bd9289f987..f5f29cfff97 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala @@ -78,12 +78,12 @@ class NanoSecondAccumulator extends AccumulatorV2[jl.Long, NanoTime] { override def value: NanoTime = NanoTime(_sum) } -class WatermarkAccumulator extends AccumulatorV2[jl.Long, Long] { +class HighWatermarkAccumulator extends AccumulatorV2[jl.Long, Long] { private var _value = 0L override def isZero: Boolean = _value == 0 - override def copy(): WatermarkAccumulator = { - val newAcc = new WatermarkAccumulator + override def copy(): HighWatermarkAccumulator = { + val newAcc = new HighWatermarkAccumulator newAcc._value = this._value newAcc } @@ -97,7 +97,7 @@ class WatermarkAccumulator extends AccumulatorV2[jl.Long, Long] { } override def merge(other: AccumulatorV2[jl.Long, Long]): Unit = other match { - case wa: WatermarkAccumulator => + case wa: HighWatermarkAccumulator => _value = _value.max(wa._value) case _ => throw new UnsupportedOperationException( @@ -120,7 +120,7 @@ class GpuTaskMetrics extends Serializable { private val readSpillFromHostTimeNs = new NanoSecondAccumulator private val readSpillFromDiskTimeNs = new NanoSecondAccumulator - private val maxDeviceMemoryBytes = new WatermarkAccumulator + private val maxDeviceMemoryBytes = new HighWatermarkAccumulator private val metrics = Map[String, AccumulatorV2[_, _]]( "gpuSemaphoreWait" -> semWaitTimeNs, @@ -211,7 +211,7 @@ class GpuTaskMetrics extends Serializable { } } - def updateMaxMemory(taskAttemptId: Long): Unit = { + def updateMaxGpuMemory(taskAttemptId: Long): Unit = { val maxMem = RmmSpark.getAndResetGpuMaxMemoryAllocated(taskAttemptId) if (maxMem > 0) { maxDeviceMemoryBytes.add(maxMem) From 4e81016f20820fc75106e6dc7e1a202abbb98e29 Mon Sep 17 00:00:00 2001 From: Zach Puller Date: Tue, 24 Sep 2024 10:41:36 -0700 Subject: [PATCH 3/4] add comment about the accumulator usage Signed-off-by: Zach Puller --- .../scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala index f5f29cfff97..5c7b3f692cb 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala @@ -214,6 +214,11 @@ class GpuTaskMetrics extends Serializable { def updateMaxGpuMemory(taskAttemptId: Long): Unit = { val maxMem = RmmSpark.getAndResetGpuMaxMemoryAllocated(taskAttemptId) if (maxMem > 0) { + // This metric tracks the max amount of memory that is allocated on the gpu during + // the lifespan of a task. However, this update function only gets called once on task + // completion, whereas the actual logic tracking of the max value during memory allocations + // lives in the JNI. Therefore we can stick the convention here of calling the add method + // instead of adding a dedicated max method to the accumulator. maxDeviceMemoryBytes.add(maxMem) } } From 2a27b4c71c6028049920b49fef7af3d2dbd777fd Mon Sep 17 00:00:00 2001 From: Zach Puller Date: Tue, 24 Sep 2024 10:42:42 -0700 Subject: [PATCH 4/4] formatting Signed-off-by: Zach Puller --- .../scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala index 5c7b3f692cb..ce6f321bb93 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala @@ -216,8 +216,8 @@ class GpuTaskMetrics extends Serializable { if (maxMem > 0) { // This metric tracks the max amount of memory that is allocated on the gpu during // the lifespan of a task. However, this update function only gets called once on task - // completion, whereas the actual logic tracking of the max value during memory allocations - // lives in the JNI. Therefore we can stick the convention here of calling the add method + // completion, whereas the actual logic tracking of the max value during memory allocations + // lives in the JNI. Therefore, we can stick the convention here of calling the add method // instead of adding a dedicated max method to the accumulator. maxDeviceMemoryBytes.add(maxMem) }