From 27cc72544fe6d35594de7c327c43f527f0645db8 Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Tue, 21 Dec 2021 12:48:29 -0600 Subject: [PATCH] GpuShuffleCoalesceIterator acquire semaphore after host concat (#4396) * GpuShuffleCoalesceIterator acquire semaphore after host concat Signed-off-by: Alessandro Bellina * Add semaphore acquire for batches without columns --- .../spark/rapids/GpuShuffleCoalesceExec.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala index 976c835eab3..7d6404259e3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala @@ -151,13 +151,14 @@ class GpuShuffleCoalesceIterator( } private def concatenateBatch(): ColumnarBatch = { - // about to start using the GPU in this task - GpuSemaphore.acquireIfNecessary(TaskContext.get(), semWaitTime) - withResource(new MetricRange(opTimeMetric)) { _ => val firstHeader = serializedTables.peekFirst().header val batch = withResource(new MetricRange(concatTimeMetric)) { _ => if (firstHeader.getNumColumns == 0) { + // acquire the GPU unconditionally for now in this case, as a downstream exec + // may need the GPU, and the assumption is that it is acquired in the coalesce + // code. + GpuSemaphore.acquireIfNecessary(TaskContext.get(), semWaitTime) (0 until numTablesInBatch).foreach(_ => serializedTables.removeFirst()) new ColumnarBatch(Array.empty, numRowsInBatch) } else { @@ -194,8 +195,12 @@ class GpuShuffleCoalesceIterator( } withResource(new NvtxRange("Concat+Load Batch", NvtxColor.YELLOW)) { _ => - withResource(JCudfSerialization.concatToContiguousTable(headers, buffers)) { table => - GpuColumnVectorFromBuffer.from(table, sparkSchema) + withResource(JCudfSerialization.concatToHostBuffer(headers, buffers)) { hostConcatResult => + // about to start using the GPU in this task + GpuSemaphore.acquireIfNecessary(TaskContext.get(), semWaitTime) + withResource(hostConcatResult.toContiguousTable) { contigTable => + GpuColumnVectorFromBuffer.from(contigTable, sparkSchema) + } } } }