Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add semaphoreWaitTime and gpuOpTime for GpuRowToColumnarExec #2823

Merged
merged 8 commits into from
Jun 28, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public abstract class UnsafeRowToColumnarBatchIterator implements Iterator<Colum
protected final long dataLength;
protected final DType[] rapidsTypes;
protected final DataType[] outputTypes;
protected final GpuMetric semaphoreWaitTime;
protected final GpuMetric gpuOpTime;
protected final GpuMetric totalTime;
protected final GpuMetric numInputRows;
protected final GpuMetric numOutputRows;
Expand All @@ -58,6 +60,8 @@ protected UnsafeRowToColumnarBatchIterator(
Iterator<UnsafeRow> input,
Attribute[] schema,
CoalesceSizeGoal goal,
GpuMetric semaphoreWaitTime,
GpuMetric gpuOpTime,
GpuMetric totalTime,
GpuMetric numInputRows,
GpuMetric numOutputRows,
Expand All @@ -74,6 +78,8 @@ protected UnsafeRowToColumnarBatchIterator(
rapidsTypes[i] = GpuColumnVector.getNonNestedRapidsType(schema[i].dataType());
outputTypes[i] = schema[i].dataType();
}
this.semaphoreWaitTime = semaphoreWaitTime;
this.gpuOpTime = gpuOpTime;
this.totalTime = totalTime;
this.numInputRows = numInputRows;
this.numOutputRows = numOutputRows;
Expand All @@ -100,7 +106,8 @@ public ColumnarBatch next() {
// buffers. One will be for the byte data and the second will be for the offsets. We will then
// write the data directly into those buffers using code generation in a child of this class.
// that implements fillBatch.
try (HostMemoryBuffer dataBuffer = HostMemoryBuffer.allocate(dataLength);
try (NvtxWithMetrics nvtx = new NvtxWithMetrics("RowToColumnar", NvtxColor.CYAN, totalTime);
HostMemoryBuffer dataBuffer = HostMemoryBuffer.allocate(dataLength);
HostMemoryBuffer offsetsBuffer =
HostMemoryBuffer.allocate(((long)numRowsEstimate + 1) * BYTES_PER_OFFSET)) {

Expand Down Expand Up @@ -135,12 +142,12 @@ public ColumnarBatch next() {
// Grab the semaphore because we are about to put data onto the GPU.
TaskContext tc = TaskContext.get();
if (tc != null) {
GpuSemaphore$.MODULE$.acquireIfNecessary(tc);
GpuSemaphore$.MODULE$.acquireIfNecessary(tc, semaphoreWaitTime);
}
if (totalTime != null) {
buildRange = new NvtxWithMetrics("RowToColumnar", NvtxColor.GREEN, totalTime);
if (gpuOpTime != null) {
buildRange = new NvtxWithMetrics("RowToColumnar: build", NvtxColor.GREEN, gpuOpTime);
} else {
buildRange = new NvtxRange("RowToColumnar", NvtxColor.GREEN);
buildRange = new NvtxRange("RowToColumnar: build", NvtxColor.GREEN);
}
devColumn = hostColumn.copyToDevice();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ object GpuMetric extends Logging {
val NUM_PARTITIONS = "numPartitions"
val TOTAL_TIME = "totalTime"
val OP_TIME = "opTime"
val GPU_OP_TIME = "gpuOpTime"
val SEMAPHORE_WAIT_TIME = "semaphoreWaitTime"
val PEAK_DEVICE_MEMORY = "peakDevMemory"
val COLLECT_TIME = "collectTime"
val CONCAT_TIME = "concatTime"
Expand All @@ -75,6 +77,8 @@ object GpuMetric extends Logging {
val DESCRIPTION_NUM_PARTITIONS = "partitions"
val DESCRIPTION_TOTAL_TIME = "total time"
val DESCRIPTION_OP_TIME = "op time"
val DESCRIPTION_GPU_OP_TIME = "GPU op time"
val DESCRIPTION_SEMAPHORE_WAIT_TIME = "GPU semaphore wait time"
val DESCRIPTION_PEAK_DEVICE_MEMORY = "peak device memory"
val DESCRIPTION_COLLECT_TIME = "collect batch time"
val DESCRIPTION_CONCAT_TIME = "concat batch time"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,9 @@ class RowToColumnarIterator(
totalTime: GpuMetric = NoopMetric,
numInputRows: GpuMetric = NoopMetric,
numOutputRows: GpuMetric = NoopMetric,
numOutputBatches: GpuMetric = NoopMetric) extends Iterator[ColumnarBatch] with Arm {
numOutputBatches: GpuMetric = NoopMetric,
semaphoreWaitTime: GpuMetric = NoopMetric,
gpuOpTime: GpuMetric = NoopMetric) extends Iterator[ColumnarBatch] with Arm {

private val targetSizeBytes = localGoal.targetSizeBytes
private var targetRows = 0
Expand All @@ -570,7 +572,8 @@ class RowToColumnarIterator(
buildBatch()
}

private def buildBatch(): ColumnarBatch = {
private def buildBatch(): ColumnarBatch = withResource(
new NvtxWithMetrics("RowToColumnar", NvtxColor.CYAN, totalTime)) { _ =>

// estimate the size of the first batch based on the schema
if (targetRows == 0) {
Expand Down Expand Up @@ -607,9 +610,11 @@ class RowToColumnarIterator(
// About to place data back on the GPU
// note that TaskContext.get() can return null during unit testing so we wrap it in an
// option here
Option(TaskContext.get()).foreach(GpuSemaphore.acquireIfNecessary)
Option(TaskContext.get())
.foreach(ctx => GpuSemaphore.acquireIfNecessary(ctx, semaphoreWaitTime))

val ret = withResource(new NvtxWithMetrics("RowToColumnar", NvtxColor.GREEN, totalTime)) { _=>
val ret = withResource(new NvtxWithMetrics("RowToColumnar", NvtxColor.GREEN,
gpuOpTime)) { _ =>
builders.build(rowCount)
}
numInputRows += rowCount
Expand All @@ -636,6 +641,8 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging {
def apply(input: Iterator[UnsafeRow],
schema: Array[Attribute],
goal: CoalesceSizeGoal,
semaphoreWaitTime: GpuMetric,
gpuOpTime: GpuMetric,
totalTime: GpuMetric,
numInputRows: GpuMetric,
numOutputRows: GpuMetric,
Expand All @@ -645,6 +652,8 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging {
ctx.addReferenceObj("iter", input, classOf[Iterator[UnsafeRow]].getName)
ctx.addReferenceObj("schema", schema, classOf[Array[Attribute]].getName)
ctx.addReferenceObj("goal", goal, classOf[CoalesceSizeGoal].getName)
ctx.addReferenceObj("semaphoreWaitTime", semaphoreWaitTime, classOf[GpuMetric].getName)
ctx.addReferenceObj("gpuOpTime", gpuOpTime, classOf[GpuMetric].getName)
ctx.addReferenceObj("totalTime", totalTime, classOf[GpuMetric].getName)
ctx.addReferenceObj("numInputRows", numInputRows, classOf[GpuMetric].getName)
ctx.addReferenceObj("numOutputRows", numOutputRows, classOf[GpuMetric].getName)
Expand Down Expand Up @@ -715,7 +724,9 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging {
| (com.nvidia.spark.rapids.GpuMetric)references[3],
| (com.nvidia.spark.rapids.GpuMetric)references[4],
| (com.nvidia.spark.rapids.GpuMetric)references[5],
| (com.nvidia.spark.rapids.GpuMetric)references[6]);
| (com.nvidia.spark.rapids.GpuMetric)references[6],
| (com.nvidia.spark.rapids.GpuMetric)references[7],
| (com.nvidia.spark.rapids.GpuMetric)references[8]);
| ${ctx.initMutableStates()}
| }
|
Expand Down Expand Up @@ -806,6 +817,8 @@ case class GpuRowToColumnarExec(child: SparkPlan, goal: CoalesceSizeGoal)
}

override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
SEMAPHORE_WAIT_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_SEMAPHORE_WAIT_TIME),
GPU_OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_GPU_OP_TIME),
TOTAL_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_TOTAL_TIME),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am tempted to drop TOTAL_TIME to DEBUG_LEVEL, but I don't know how you want to use it with benchmarks/etc so I don't know if that is a good idea or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I would really like is OP_TIME. I should be able to calculate that as TOTAL_TIME - SEMAPHORE_WAIT_TIME - GPU_OPTIME. I'll look at that next.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't that just be measuring mostly the time spent in earlier child execs within the stage? TOTAL_TIME includes time spent fetching inputs from child iterators. It wouldn't be very op-specific, and thus OP_TIME would be an odd name for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's a good point. I would need to subtract the cost of the fetches too, but that might be expensive because this is a row-based iterator. I'll take a look and see what the options are.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is converting rows on the CPU to columns on the GPU. There is close to no processing on the CPU beyond fetching data from upstream and putting it into a buffer. If we try to measure the amount of time it takes to convert from UnsafeRow to CudfUnsafeRow, or to just put it into the arrow format in a buffer (depending on the code path we take), we are likely going to spend more time measuring than actually doing the conversion. Unless knowing that number is critically important we would propose that we just lump it all together.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I went ahead and changed the level of TOTAL_TIME to DEBUG_LEVEL.

NUM_INPUT_ROWS -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_ROWS)
)
Expand All @@ -817,6 +830,8 @@ case class GpuRowToColumnarExec(child: SparkPlan, goal: CoalesceSizeGoal)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val totalTime = gpuLongMetric(TOTAL_TIME)
val gpuOpTime = gpuLongMetric(GPU_OP_TIME)
val semaphoreWaitTime = gpuLongMetric(SEMAPHORE_WAIT_TIME)
val localGoal = goal
val rowBased = child.execute()

Expand All @@ -831,13 +846,13 @@ case class GpuRowToColumnarExec(child: SparkPlan, goal: CoalesceSizeGoal)
val localOutput = output
rowBased.mapPartitions(rowIter => GeneratedUnsafeRowToCudfRowIterator(
rowIter.asInstanceOf[Iterator[UnsafeRow]],
localOutput.toArray, localGoal, totalTime, numInputRows, numOutputRows,
numOutputBatches))
localOutput.toArray, localGoal, semaphoreWaitTime, gpuOpTime, totalTime,
numInputRows, numOutputRows, numOutputBatches))
} else {
val converters = new GpuRowToColumnConverter(localSchema)
rowBased.mapPartitions(rowIter => new RowToColumnarIterator(rowIter,
localSchema, localGoal, converters,
totalTime, numInputRows, numOutputRows, numOutputBatches))
totalTime, numInputRows, numOutputRows, numOutputBatches, semaphoreWaitTime, gpuOpTime))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,20 @@ object GpuSemaphore {
*/
def acquireIfNecessary(context: TaskContext): Unit = {
if (enabled && context != null) {
getInstance.acquireIfNecessary(context)
getInstance.acquireIfNecessary(context, None)
}
}

/**
* Tasks must call this when they begin to use the GPU.
* If the task has not already acquired the GPU semaphore then it is acquired,
* blocking if necessary.
* NOTE: A task completion listener will automatically be installed to ensure
* the semaphore is always released by the time the task completes.
*/
def acquireIfNecessary(context: TaskContext, waitMetric: GpuMetric): Unit = {
if (enabled && context != null) {
getInstance.acquireIfNecessary(context, Some(waitMetric))
}
}

Expand Down Expand Up @@ -103,8 +116,11 @@ private final class GpuSemaphore(tasksPerGpu: Int) extends Logging {
// Map to track which tasks have acquired the semaphore.
private val activeTasks = new ConcurrentHashMap[Long, MutableInt]

def acquireIfNecessary(context: TaskContext): Unit = {
val nvtxRange = new NvtxRange("Acquire GPU", NvtxColor.RED)
def acquireIfNecessary(context: TaskContext, waitMetric: Option[GpuMetric] = None): Unit = {
val nvtxRange = waitMetric match {
case Some(m) => new NvtxWithMetrics("Acquire GPU", NvtxColor.RED, m)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have done this in a few places. Would it be better to just have NvtxWithMetrics have a constructor that takes a Option[GpuMetric] and hide it internally?

case _ => new NvtxRange("Acquire GPU", NvtxColor.RED)
}
try {
val taskAttemptId = context.taskAttemptId()
val refs = activeTasks.get(taskAttemptId)
Expand Down