Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GPU device watermark metrics #11457

Merged
merged 4 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ private final class GpuSemaphore() extends Logging {
def completeTask(context: TaskContext): Unit = {
val taskAttemptId = context.taskAttemptId()
GpuTaskMetrics.get.updateRetry(taskAttemptId)
GpuTaskMetrics.get.updateMaxMemory(taskAttemptId)
zpuller marked this conversation as resolved.
Show resolved Hide resolved
val refs = tasks.remove(taskAttemptId)
if (refs == null) {
throw new IllegalStateException(s"Completion of unknown task $taskAttemptId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,35 @@ class NanoSecondAccumulator extends AccumulatorV2[jl.Long, NanoTime] {
override def value: NanoTime = NanoTime(_sum)
}

class WatermarkAccumulator extends AccumulatorV2[jl.Long, Long] {
zpuller marked this conversation as resolved.
Show resolved Hide resolved
private var _value = 0L
override def isZero: Boolean = _value == 0

override def copy(): WatermarkAccumulator = {
val newAcc = new WatermarkAccumulator
newAcc._value = this._value
newAcc
}

override def reset(): Unit = {
_value = 0
}

override def add(v: jl.Long): Unit = {
_value += v
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Although I understand this function will be called once on task completion, I wonder if it would be more straightforward to understand if this were _value = _value.max(v). Is HighWatermarkAccumulator not just a max accumulator?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't have a strong opinion about this. I think, yes, it is a max accumulator. Do others have thoughts on this?

Copy link
Collaborator

@jihoonson jihoonson Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  /**
   * Takes the inputs and accumulates.
   */
  def add(v: IN): Unit

The above snippet shows the doc of AccumulatorV2.add(). Based on that, it seems reasonable to compute a max in this function rather than a sum.

I think my comment is a nitpick though, and do not want to block this PR unnecessarily as long as it is clear in the code that this function must be called only once on task completion.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only 2c for keeping it as is that we can probably consolidate such metrics into a single generic implementation potentially.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the sentiment that this is not meant to be an additive metric. I guess I am more leaning towards add setting the value, and throwing if it already had set it. E.g. we don't have a case where we want increment, and we don't think max makes sense within this scope (only at merge time). I'd throw to be sure we don't shoot ourselves in the foot.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the API is not ideal, but it is what we have to work with. Also if you think of it as a collection of measurements, then add is appending a new measurement instead of adding a number to a single value. The fact that we reduce it to a single value to represent the collection is separate.

But either way I am not a fan of throwing an exception if the value is already set. This makes assumptions about how the metric is going to be used by Spark that might not hold true in all cases. It has a reset API after all. If you really want to do it, then go ahead, but we need to do testing on all of our supported platforms to make sure it is correct.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point @revans2 on the exception being a bad idea.

I am back at what does add mean here. I think it means what @jihoonson suggested, it's a max, that's how you add it because the input is not meant to be additive, it is meant to be the max, always.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I'm going to leave it as is for now for expediency given that I have the approvals and I'm still not totally sure I want to change it, but definitely open to it in a follow up PR. I will be continuing work on related metrics so I'll have my eye on it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But either way I am not a fan of throwing an exception if the value is already set. This makes assumptions about how the metric is going to be used by Spark that might not hold true in all cases. It has a reset API after all. If you really want to do it, then go ahead, but we need to do testing on all of our supported platforms to make sure it is correct.

Throwing an exception sounds reasonable to me because we will be able to catch such cases if Spark ever uses it in a different way than we expect. If we are not 100% sure whether Spark does or not in all cases, do we need the testing suggested above anyway? If add() is ever called more than once, computing a sum instead of a max in it will likely mess up the metric.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is why I approved the max version. Not the sum version. These metrics can get to be kind of complicated because they are produced on the worker and then sent back to the driver. On the driver they are written out ot the log, but then also something has to put them into the UI. I don't think that they use the accumulator for that, and that is where my questions are. I just have not looked at it enough. A max of a max is not a big deal, in reporting what we want to see if it does happen to be called multiple times. But doing a sum of a max might cause issues. They would likely be minor issues but all the same. If you want to learn what the code is doing then lets read it and do some experiments. If we want to ship it to production, then lets be defensive with the code that we ship.

}

override def merge(other: AccumulatorV2[jl.Long, Long]): Unit = other match {
case wa: WatermarkAccumulator =>
_value = _value.max(wa._value)
case _ =>
throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}

override def value: Long = _value
}

class GpuTaskMetrics extends Serializable {
private val semWaitTimeNs = new NanoSecondAccumulator
private val retryCount = new LongAccumulator
Expand All @@ -91,6 +120,8 @@ class GpuTaskMetrics extends Serializable {
private val readSpillFromHostTimeNs = new NanoSecondAccumulator
private val readSpillFromDiskTimeNs = new NanoSecondAccumulator

private val maxDeviceMemoryBytes = new WatermarkAccumulator

private val metrics = Map[String, AccumulatorV2[_, _]](
"gpuSemaphoreWait" -> semWaitTimeNs,
"gpuRetryCount" -> retryCount,
Expand All @@ -100,7 +131,8 @@ class GpuTaskMetrics extends Serializable {
"gpuSpillToHostTime" -> spillToHostTimeNs,
"gpuSpillToDiskTime" -> spillToDiskTimeNs,
"gpuReadSpillFromHostTime" -> readSpillFromHostTimeNs,
"gpuReadSpillFromDiskTime" -> readSpillFromDiskTimeNs
"gpuReadSpillFromDiskTime" -> readSpillFromDiskTimeNs,
"gpuMaxDeviceMemoryBytes" -> maxDeviceMemoryBytes
)

def register(sc: SparkContext): Unit = {
Expand Down Expand Up @@ -178,6 +210,13 @@ class GpuTaskMetrics extends Serializable {
retryComputationTime.add(compNs)
}
}

def updateMaxMemory(taskAttemptId: Long): Unit = {
val maxMem = RmmSpark.getAndResetGpuMaxMemoryAllocated(taskAttemptId)
if (maxMem > 0) {
maxDeviceMemoryBytes.add(maxMem)
zpuller marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

/**
Expand Down
Loading