Skip to content

Commit

Permalink
Remove collectTime and standardize on fetchTime
Browse files Browse the repository at this point in the history
Signed-off-by: Andy Grove <andygrove@nvidia.com>
  • Loading branch information
andygrove committed May 26, 2021
1 parent 3b718f8 commit 72d204e
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ abstract class AbstractGpuCoalesceIterator(
numInputBatches: GpuMetric,
numOutputRows: GpuMetric,
numOutputBatches: GpuMetric,
collectTime: GpuMetric,
fetchTime: GpuMetric,
concatTime: GpuMetric,
totalTime: GpuMetric,
opName: String) extends Iterator[ColumnarBatch] with Arm with Logging {
Expand Down Expand Up @@ -171,11 +171,11 @@ abstract class AbstractGpuCoalesceIterator(
Option(TaskContext.get())
.foreach(_.addTaskCompletionListener[Unit](_ => clearOnDeck()))

private def iterHasNext: Boolean = withResource(new MetricRange(collectTime)) { _ =>
private def iterHasNext: Boolean = withResource(new MetricRange(fetchTime)) { _ =>
iter.hasNext
}

private def iterNext(): ColumnarBatch = withResource(new MetricRange(collectTime)) { _ =>
private def iterNext(): ColumnarBatch = withResource(new MetricRange(fetchTime)) { _ =>
iter.next()
}

Expand Down Expand Up @@ -345,7 +345,7 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
numInputBatches: GpuMetric,
numOutputRows: GpuMetric,
numOutputBatches: GpuMetric,
collectTime: GpuMetric,
fetchTime: GpuMetric,
concatTime: GpuMetric,
totalTime: GpuMetric,
peakDevMemory: GpuMetric,
Expand All @@ -357,7 +357,7 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
numInputBatches,
numOutputRows,
numOutputBatches,
collectTime,
fetchTime,
concatTime,
totalTime,
opName) with Arm {
Expand Down Expand Up @@ -465,7 +465,7 @@ case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal)
TOTAL_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_TOTAL_TIME),
NUM_INPUT_ROWS -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_ROWS),
NUM_INPUT_BATCHES -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_BATCHES),
COLLECT_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_COLLECT_TIME),
FETCH_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FETCH_TIME),
CONCAT_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_CONCAT_TIME),
PEAK_DEVICE_MEMORY -> createSizeMetric(MODERATE_LEVEL, DESCRIPTION_PEAK_DEVICE_MEMORY)
) ++ spillMetrics
Expand All @@ -485,7 +485,7 @@ case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal)
val numInputBatches = gpuLongMetric(NUM_INPUT_BATCHES)
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val collectTime = gpuLongMetric(COLLECT_TIME)
val fetchTime = gpuLongMetric(FETCH_TIME)
val concatTime = gpuLongMetric(CONCAT_TIME)
val totalTime = gpuLongMetric(TOTAL_TIME)
val peakDevMemory = gpuLongMetric(PEAK_DEVICE_MEMORY)
Expand All @@ -503,7 +503,7 @@ case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal)
} else {
val callback = GpuMetric.makeSpillCallback(allMetrics)
new GpuCoalesceIterator(iter, outputSchema, goal, decompressMemoryTarget,
numInputRows, numInputBatches, numOutputRows, numOutputBatches, collectTime,
numInputRows, numInputBatches, numOutputRows, numOutputBatches, fetchTime,
concatTime, totalTime, peakDevMemory, callback, "GpuCoalesceBatches")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ object GpuMetric extends Logging {
val GPU_OP_TIME = "gpuOpTime"
val FETCH_TIME = "fetchTime"
val PEAK_DEVICE_MEMORY = "peakDevMemory"
val COLLECT_TIME = "collectTime"
val CONCAT_TIME = "concatTime"
val SORT_TIME = "sortTime"
val AGG_TIME = "computeAggTime"
Expand All @@ -78,7 +77,6 @@ object GpuMetric extends Logging {
val DESCRIPTION_GPU_OP_TIME = "GPU time"
val DESCRIPTION_FETCH_TIME = "fetch time"
val DESCRIPTION_PEAK_DEVICE_MEMORY = "peak device memory"
val DESCRIPTION_COLLECT_TIME = "collect batch time"
val DESCRIPTION_CONCAT_TIME = "concat batch time"
val DESCRIPTION_SORT_TIME = "sort time"
val DESCRIPTION_AGG_TIME = "aggregation time"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ case class GpuShuffleCoalesceExec(child: SparkPlan, targetBatchByteSize: Long)
TOTAL_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_TOTAL_TIME),
NUM_INPUT_ROWS -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_ROWS),
NUM_INPUT_BATCHES -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_BATCHES),
COLLECT_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_COLLECT_TIME),
FETCH_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FETCH_TIME),
CONCAT_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_CONCAT_TIME)
)

Expand Down Expand Up @@ -88,7 +88,7 @@ class GpuShuffleCoalesceIterator(
private[this] val inputRowsMetric = metricsMap(GpuMetric.NUM_INPUT_ROWS)
private[this] val outputBatchesMetric = metricsMap(GpuMetric.NUM_OUTPUT_BATCHES)
private[this] val outputRowsMetric = metricsMap(GpuMetric.NUM_OUTPUT_ROWS)
private[this] val collectTimeMetric = metricsMap("collectTime")
private[this] val fetchTimeMetric = metricsMap("fetchTime")
private[this] val concatTimeMetric = metricsMap("concatTime")
private[this] val serializedTables = new util.ArrayDeque[SerializedTableColumn]
private[this] var numTablesInBatch: Int = 0
Expand Down Expand Up @@ -119,13 +119,13 @@ class GpuShuffleCoalesceIterator(
}

private def batchIterHasNext: Boolean = {
withResource(new MetricRange(collectTimeMetric)) { _ =>
withResource(new MetricRange(fetchTimeMetric)) { _ =>
batchIter.hasNext
}
}

private def batchIterNext(): ColumnarBatch = {
withResource(new MetricRange(collectTimeMetric)) { _ =>
withResource(new MetricRange(fetchTimeMetric)) { _ =>
batchIter.next()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ class HostToGpuCoalesceIterator(iter: Iterator[ColumnarBatch],
numInputBatches: GpuMetric,
numOutputRows: GpuMetric,
numOutputBatches: GpuMetric,
collectTime: GpuMetric,
fetchTime: GpuMetric,
concatTime: GpuMetric,
totalTime: GpuMetric,
peakDevMemory: GpuMetric,
Expand All @@ -265,7 +265,7 @@ class HostToGpuCoalesceIterator(iter: Iterator[ColumnarBatch],
numInputBatches,
numOutputRows,
numOutputBatches,
collectTime,
fetchTime,
concatTime,
totalTime,
opName) {
Expand Down Expand Up @@ -385,7 +385,7 @@ case class HostColumnarToGpu(child: SparkPlan, goal: CoalesceGoal)
NUM_INPUT_ROWS -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_ROWS),
NUM_INPUT_BATCHES -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_BATCHES),
TOTAL_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_TOTAL_TIME),
COLLECT_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_COLLECT_TIME),
FETCH_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FETCH_TIME),
CONCAT_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_CONCAT_TIME),
PEAK_DEVICE_MEMORY -> createMetric(MODERATE_LEVEL, DESCRIPTION_PEAK_DEVICE_MEMORY)
)
Expand Down Expand Up @@ -415,7 +415,7 @@ case class HostColumnarToGpu(child: SparkPlan, goal: CoalesceGoal)
val numInputBatches = gpuLongMetric(NUM_INPUT_BATCHES)
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val collectTime = gpuLongMetric(COLLECT_TIME)
val fetchTime = gpuLongMetric(FETCH_TIME)
val concatTime = gpuLongMetric(CONCAT_TIME)
val totalTime = gpuLongMetric(TOTAL_TIME)
val peakDevMemory = gpuLongMetric(PEAK_DEVICE_MEMORY)
Expand All @@ -428,7 +428,7 @@ case class HostColumnarToGpu(child: SparkPlan, goal: CoalesceGoal)
val confUseArrow = new RapidsConf(child.conf).useArrowCopyOptimization
batches.mapPartitions { iter =>
new HostToGpuCoalesceIterator(iter, goal, outputSchema,
numInputRows, numInputBatches, numOutputRows, numOutputBatches, collectTime, concatTime,
numInputRows, numInputBatches, numOutputRows, numOutputBatches, fetchTime, concatTime,
totalTime, peakDevMemory, "HostColumnarToGpu", confUseArrow)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ abstract class GpuBroadcastExchangeExecBase(
override lazy val additionalMetrics = Map(
TOTAL_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_TOTAL_TIME),
"dataSize" -> createSizeMetric(ESSENTIAL_LEVEL, "data size"),
COLLECT_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_COLLECT_TIME),
FETCH_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_FETCH_TIME),
BUILD_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_TIME),
"broadcastTime" -> createNanoTimingMetric(ESSENTIAL_LEVEL, "time to broadcast"))

Expand Down Expand Up @@ -274,7 +274,7 @@ abstract class GpuBroadcastExchangeExecBase(
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val totalTime = gpuLongMetric(TOTAL_TIME)
val collectTime = gpuLongMetric(COLLECT_TIME)
val fetchTime = gpuLongMetric(FETCH_TIME)
val buildTime = gpuLongMetric(BUILD_TIME)
val broadcastTime = gpuLongMetric("broadcastTime")

Expand All @@ -289,7 +289,7 @@ abstract class GpuBroadcastExchangeExecBase(
sparkContext.setJobGroup(_runId.toString, s"broadcast exchange (runId ${_runId})",
interruptOnCancel = true)
val collectRange = new NvtxWithMetrics("broadcast collect", NvtxColor.GREEN,
collectTime)
fetchTime)
val batch = try {
val data = child.executeColumnar().map(cb => try {
new SerializeBatchDeserializeHostBuffer(cb)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ case class GpuBroadcastToCpuExec(override val mode: BroadcastMode, child: SparkP
// relationFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val collectTime = gpuLongMetric(COLLECT_TIME)
val fetchTime = gpuLongMetric(FETCH_TIME)
val buildTime = gpuLongMetric(BUILD_TIME)
val broadcastTime = gpuLongMetric("broadcastTime")
val totalTime = gpuLongMetric(TOTAL_TIME)
Expand All @@ -73,7 +73,7 @@ case class GpuBroadcastToCpuExec(override val mode: BroadcastMode, child: SparkP

// run code on executors to serialize batches
val serializedBatches: Array[SerializeBatchDeserializeHostBuffer] = withResource(
new NvtxWithMetrics("broadcast collect", NvtxColor.GREEN, collectTime)) { _ =>
new NvtxWithMetrics("broadcast collect", NvtxColor.GREEN, fetchTime)) { _ =>
val data = child.executeColumnar().map(cb => try {
new SerializeBatchDeserializeHostBuffer(cb)
} finally {
Expand Down

0 comments on commit 72d204e

Please sign in to comment.