Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix leaks in the test cases of CachedBatchWriterSuite #5938

Merged
merged 1 commit into from
Jul 1, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.nvidia.spark.rapids

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{ColumnVector, CompressionType, DType, Table, TableWriter}
import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
Expand All @@ -36,49 +37,78 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
*/
class CachedBatchWriterSuite extends SparkQueryCompareTestSuite {

test("convert large columnar batch to cachedbatch on single col table") {
class TestResources extends AutoCloseable {
val byteCv1 = ColumnVector.fromBytes(1)
val byteCv3 = ColumnVector.fromBytes(3)
val byteCv456 = ColumnVector.fromBytes(4, 5, 6)

override def close(): Unit = {
byteCv1.close()
byteCv3.close()
byteCv456.close()
}
}

test("convert large columnar batch to cached batch on single col table") {
if (!withCpuSparkSession(s => s.version < "3.1.0")) {
val (spyCol0, spyGpuCol0) = getCudfAndGpuVectors()
testCompressColBatch(Array(spyCol0), Array(spyGpuCol0))
verify(spyCol0).split(2086912)
withResource(new TestResources()) { resources =>
val (spyCol0, spyGpuCol0) = getCudfAndGpuVectors(resources)
val splitAt = 2086912
testCompressColBatch(resources, Array(spyCol0), Array(spyGpuCol0), splitAt)
verify(spyCol0).split(splitAt)
}
}
}

test("convert large columnar batch to cachedbatch on multi-col table") {
test("convert large columnar batch to cached batch on multi-col table") {
if (!withCpuSparkSession(s => s.version < "3.1.0")) {
val (spyCol0, spyGpuCol0) = getCudfAndGpuVectors()
val (spyCol1, spyGpuCol1) = getCudfAndGpuVectors()
val (spyCol2, spyGpuCol2) = getCudfAndGpuVectors()
testCompressColBatch(Array(spyCol0, spyCol1, spyCol2),
Array(spyGpuCol0, spyGpuCol1, spyGpuCol2))
val splitAt = Seq(695637, 1391274, 2086911, 2782548)
verify(spyCol0).split(splitAt: _*)
verify(spyCol1).split(splitAt: _*)
verify(spyCol2).split(splitAt: _*)
withResource(new TestResources()) { resources =>
val (spyCol0, spyGpuCol0) = getCudfAndGpuVectors(resources)
val splitAt = Seq(695637, 1391274, 2086911, 2782548)
testCompressColBatch(resources, Array(spyCol0, spyCol0, spyCol0),
Array(spyGpuCol0, spyGpuCol0, spyGpuCol0), splitAt: _*)
verify(spyCol0, times(3)).split(splitAt: _*)
}
}
}

test("convert large InternalRow iterator to cached batch single col") {
val (_, spyGpuCol0) = getCudfAndGpuVectors()
val cb = new ColumnarBatch(Array(spyGpuCol0), ROWS)
val mockByteType = mock(classOf[ByteType])
when(mockByteType.defaultSize).thenReturn(1024)
val schema = Seq(AttributeReference("field0", mockByteType, true)())
testColumnarBatchToCachedBatchIterator(cb, schema)
// Allow printing "A HOST BUFFER WAS LEAKED"
// see comments in ColumnarBatchToCachedBatchIterator
// TaskContext.get is null in unit test
// Option(TaskContext.get).foreach(_.addTaskCompletionListener[Unit](_ => hostBatch.close()))
withResource(new TestResources()) { resources =>
val (_, spyGpuCol0) = getCudfAndGpuVectors(resources)
val cb = new ColumnarBatch(Array(spyGpuCol0), ROWS)
val mockByteType = mock(classOf[ByteType])
when(mockByteType.defaultSize).thenReturn(1024)
val schema = Seq(AttributeReference("field0", mockByteType, true)())
testColumnarBatchToCachedBatchIterator(cb, schema)
}
}

test("convert large InternalRow iterator to cached batch multi-col") {
val (_, spyGpuCol0) = getCudfAndGpuVectors()
val (_, spyGpuCol1) = getCudfAndGpuVectors()
val (_, spyGpuCol2) = getCudfAndGpuVectors()
val cb = new ColumnarBatch(Array(spyGpuCol0, spyGpuCol1, spyGpuCol2), ROWS)
val mockByteType = mock(classOf[ByteType])
when(mockByteType.defaultSize).thenReturn(1024)
val schema = Seq(AttributeReference("field0", mockByteType, true)(),
AttributeReference("field1", mockByteType, true)(),
AttributeReference("field2", mockByteType, true)())

testColumnarBatchToCachedBatchIterator(cb, schema)
// Allow printing "A HOST BUFFER WAS LEAKED"
// see comments in ColumnarBatchToCachedBatchIterator
// TaskContext.get is null in unit test
// Option(TaskContext.get).foreach(_.addTaskCompletionListener[Unit](_ => hostBatch.close()))
withResource(new TestResources()) { resources1 =>
val (_, spyGpuCol0) = getCudfAndGpuVectors(resources1)
withResource(new TestResources()) { resources2 =>
val (_, spyGpuCol1) = getCudfAndGpuVectors(resources2)
withResource(new TestResources()) { resources3 =>
val (_, spyGpuCol2) = getCudfAndGpuVectors(resources3)
val cb = new ColumnarBatch(Array(spyGpuCol0, spyGpuCol1, spyGpuCol2), ROWS)
val mockByteType = mock(classOf[ByteType])
when(mockByteType.defaultSize).thenReturn(1024)
val schema = Seq(AttributeReference("field0", mockByteType, true)(),
AttributeReference("field1", mockByteType, true)(),
AttributeReference("field2", mockByteType, true)())

testColumnarBatchToCachedBatchIterator(cb, schema)
}
}
}
}

test("test useCompression conf is honored") {
Expand All @@ -97,22 +127,22 @@ class CachedBatchWriterSuite extends SparkQueryCompareTestSuite {

val ROWS = 3 * 1024 * 1024

private def getCudfAndGpuVectors(onHost: Boolean = false): (ColumnVector, GpuColumnVector)= {
val spyCol = spy(ColumnVector.fromBytes(1))
private def getCudfAndGpuVectors(resources: TestResources): (ColumnVector, GpuColumnVector) = {
val spyCol = spy(resources.byteCv1)
when(spyCol.getRowCount).thenReturn(ROWS)
val mockDtype = mock(classOf[DType])
when(mockDtype.getSizeInBytes).thenReturn(1024)
val spyGpuCol = spy(GpuColumnVector.from(spyCol, ByteType))
when(spyCol.getDeviceMemorySize).thenReturn(1024L * ROWS)

(spyCol, spyGpuCol)
}

val _2GB = 2L * 1024 * 1024 * 1024
val APPROX_PAR_META_DATA = 10 * 1024 * 1024 // we are estimating 10MB
val BYTES_ALLOWED_PER_BATCH = _2GB - APPROX_PAR_META_DATA

private def whenSplitCalled(cb: ColumnarBatch): Unit = {
private def whenSplitCalled(cb: ColumnarBatch, testResources: TestResources,
splitPoints: Int*): Unit = {
val rows = cb.numRows()
val eachRowSize = cb.numCols() * 1024
val rowsAllowedInABatch = BYTES_ALLOWED_PER_BATCH / eachRowSize
Expand All @@ -121,24 +151,34 @@ class CachedBatchWriterSuite extends SparkQueryCompareTestSuite {
scala.Range(0, cb.numCols()).indices.foreach { i =>
val spyCol = cb.column(i).asInstanceOf[GpuColumnVector].getBase
val splitCols0 = splitRange.indices.map { _ =>
val spySplitCol = spy(ColumnVector.fromBytes(4, 5, 6))
val spySplitCol = spy(testResources.byteCv456)
when(spySplitCol.getRowCount()).thenReturn(rowsAllowedInABatch)
spySplitCol
}
val splitCols = if (spillOver > 0) {
val splitCol = spy(ColumnVector.fromBytes(3))
val splitCol = spy(testResources.byteCv3)
when(splitCol.getRowCount()).thenReturn(spillOver)
splitCols0 :+ splitCol
} else {
splitCols0
}
when(spyCol.split(any())).thenReturn(splitCols.toArray)

// copy splitCols because ParquetCachedBatchSerializer.compressColumnarBatchWithParquet is
// responsible to close the copied splits
doAnswer(_ => copyOf(splitCols)).when(spyCol).split(splitPoints: _*)
}
}

def copyOf(in: Seq[ColumnVector]): Array[ColumnVector] = {
val buffers = ArrayBuffer[ColumnVector]()
in.foreach(e => buffers += e.copyToColumnVector())
buffers.toArray
}

private def testCompressColBatch(
testResources: TestResources,
cudfCols: Array[ColumnVector],
gpuCols: Array[org.apache.spark.sql.vectorized.ColumnVector]): Unit = {
gpuCols: Array[org.apache.spark.sql.vectorized.ColumnVector], splitAt: Int*): Unit = {
// mock static method for Table
val theTableMock = mockStatic(classOf[Table], (_: InvocationOnMock) =>
new TableWriter {
Expand All @@ -153,19 +193,16 @@ class CachedBatchWriterSuite extends SparkQueryCompareTestSuite {
// noop
}
})

withResource(cudfCols) { _ =>
val cb = new ColumnarBatch(gpuCols, ROWS)
whenSplitCalled(cb)
val ser = new ParquetCachedBatchSerializer
val dummySchema = new StructType(
Array(StructField("empty", ByteType, false),
StructField("empty", ByteType, false),
StructField("empty", ByteType, false)))
ser.compressColumnarBatchWithParquet(cb, dummySchema, dummySchema,
BYTES_ALLOWED_PER_BATCH, false)
theTableMock.close()
}
val cb = new ColumnarBatch(gpuCols, ROWS)
whenSplitCalled(cb, testResources, splitAt: _*)
val ser = new ParquetCachedBatchSerializer
val dummySchema = new StructType(
Array(StructField("empty", ByteType, false),
StructField("empty", ByteType, false),
StructField("empty", ByteType, false)))
ser.compressColumnarBatchWithParquet(cb, dummySchema, dummySchema,
BYTES_ALLOWED_PER_BATCH, false)
theTableMock.close()
}

private def testColumnarBatchToCachedBatchIterator(
Expand Down