Skip to content

Commit

Permalink
Debug utility method to dump a table or columnar batch to Parquet (#3646
Browse files Browse the repository at this point in the history
)

* gpu dump utils

Signed-off-by: Chong Gao <res_life@163.com>
  • Loading branch information
Chong Gao authored Sep 29, 2021
1 parent d19fb1f commit df44623
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.nvidia.spark.rapids

import java.io.OutputStream

import scala.collection.mutable

import ai.rapids.cudf.{HostBufferConsumer, HostMemoryBuffer, NvtxColor, NvtxRange, Table, TableWriter}
Expand Down Expand Up @@ -79,27 +81,11 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext,
buffers += Tuple2(buffer, len)

def writeBufferedData(): Unit = {
val toProcess = buffers.dequeueAll(_ => true)
try {
toProcess.foreach(ops => {
val buffer = ops._1
var len = ops._2
var offset: Long = 0
while (len > 0) {
val toCopy = math.min(tempBuffer.length, len).toInt
buffer.getBytes(tempBuffer, 0, offset, toCopy)
outputStream.write(tempBuffer, 0, toCopy)
len = len - toCopy
offset = offset + toCopy
}
})
} finally {
toProcess.map(_._1).safeClose()
}
ColumnarOutputWriter.writeBufferedData(buffers, tempBuffer, outputStream)
}

/**
* Persists a columnar batch. Invoked on the executor side. When writing to dynamically
* Persists a columnar batch. Invoked on the executor side. When writing to dynamically
* partitioned tables, dynamic partition columns are not included in columns to be written.
* NOTE: It is the writer's responsibility to close the batch.
*/
Expand Down Expand Up @@ -180,3 +166,26 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext,
outputStream.close()
}
}

object ColumnarOutputWriter {
// write buffers to outputStream via tempBuffer and close buffers
def writeBufferedData(buffers: mutable.Queue[(HostMemoryBuffer, Long)],
tempBuffer: Array[Byte], outputStream: OutputStream): Unit = {
val toProcess = buffers.dequeueAll(_ => true)
try {
toProcess.foreach { case (buffer, len) =>
var offset: Long = 0
var left = len
while (left > 0) {
val toCopy = math.min(tempBuffer.length, left).toInt
buffer.getBytes(tempBuffer, 0, offset, toCopy)
outputStream.write(tempBuffer, 0, toCopy)
left = left - toCopy
offset = offset + toCopy
}
}
} finally {
toProcess.map { case (buffer, len) => buffer }.safeClose()
}
}
}
206 changes: 206 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids

import java.io.{File, FileOutputStream}
import java.util.Random

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{
ColumnView, CompressionType, DType, HostBufferConsumer, HostMemoryBuffer,
ParquetColumnWriterOptions, ParquetWriterOptions, Table, TableWriter
}
import ai.rapids.cudf.ParquetColumnWriterOptions.{listBuilder, structBuilder, NestedBuilder}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.vectorized.ColumnarBatch

object DumpUtils extends Logging with Arm {
/**
* Debug utility to dump columnar batch to parquet file. <br>
* It's running on GPU. Parquet column names are generated from columnar batch type info. <br>
*
* @param columnarBatch the columnar batch to be dumped, should be GPU columnar batch
* @param filePrefix parquet file prefix, e.g. /tmp/my-debug-prefix-
* @return parquet file path if dump is successful, e.g. /tmp/my-debug-prefix-123.parquet
*/
def dumpToParquetFile(columnarBatch: ColumnarBatch, filePrefix: String): Option[String] = {
if (columnarBatch.numCols() == 0) {
logWarning("dump to parquet failed, has no column, file prefix is " + filePrefix)
None
} else {
Some(dumpToParquetFileImpl(columnarBatch, filePrefix))
}
}

/**
* Debug utility to dump table to parquet file. <br>
* It's running on GPU. Parquet column names are generated from table column type info. <br>
*
* @param table the table to be dumped, should be GPU columnar batch
* @param filePrefix parquet file prefix, e.g. /tmp/my-debug-prefix-
* @return parquet file path if dump is successful, e.g. /tmp/my-debug-prefix-123.parquet
*/
def dumpToParquetFile(table: Table, filePrefix: String): Option[String] = {
if (table.getNumberOfColumns == 0) {
logWarning("dump to parquet failed, has no column, file prefix is " + filePrefix)
None
} else {
Some(dumpToParquetFileImp(table, filePrefix))
}
}

private def dumpToParquetFileImp(table: Table, filePrefix: String): String = {
val path = genPath(filePrefix)
withResource(new ParquetDumper(path, table)) { dumper =>
dumper.writeTable(table)
path
}
}

private def dumpToParquetFileImpl(columnarBatch: ColumnarBatch, filePrefix: String): String = {
// transform to table then dump
withResource(GpuColumnVector.from(columnarBatch)) { table =>
dumpToParquetFileImp(table, filePrefix)
}
}

private def genPath(filePrefix: String): String = {
var path = ""
val random = new Random
var succeeded = false
while (!succeeded) {
path = filePrefix + random.nextInt(Int.MaxValue) + ".parquet"
if (!new File(path).exists()) {
succeeded = true
}
}
path
}
}

// parquet dumper
class ParquetDumper(path: String, table: Table) extends HostBufferConsumer
with Arm with AutoCloseable {
private[this] val outputStream = new FileOutputStream(path)
private[this] val tempBuffer = new Array[Byte](128 * 1024)
private[this] val buffers = mutable.Queue[(HostMemoryBuffer, Long)]()

val tableWriter: TableWriter = {
// avoid anything conversion, just dump as it is
val builder = ParquetDumper.parquetWriterOptionsFromTable(ParquetWriterOptions.builder(), table)
.withCompressionType(ParquetDumper.COMPRESS_TYPE)
Table.writeParquetChunked(builder.build(), this)
}

override
def handleBuffer(buffer: HostMemoryBuffer, len: Long): Unit =
buffers += Tuple2(buffer, len)

def writeBufferedData(): Unit = {
ColumnarOutputWriter.writeBufferedData(buffers, tempBuffer, outputStream)
}

def writeTable(table: Table): Unit = {
tableWriter.write(table)
writeBufferedData()
}

/**
* Closes the [[ParquetDumper]]. Invoked on the executor side after all columnar batches
* are persisted, before the task output is committed.
*/
def close(): Unit = {
tableWriter.close()
writeBufferedData()
outputStream.close()
}
}

private class ColumnIndex() {
var i = 0

def inc(): Int = {
i = i + 1
i
}
}

object ParquetDumper extends Arm {
val COMPRESS_TYPE = CompressionType.SNAPPY

def parquetWriterOptionsFromTable[T <: NestedBuilder[_, _], V <: ParquetColumnWriterOptions](
builder: ParquetColumnWriterOptions.NestedBuilder[T, V],
table: Table): T = {

val cIndex = new ColumnIndex
withResource(new ArrayBuffer[ColumnView]) { toClose =>
for (i <- 0 until table.getNumberOfColumns) {
parquetWriterOptionsFromColumnView(builder, table.getColumn(i), cIndex, toClose)
}

builder.asInstanceOf[T]
}
}

private def parquetWriterOptionsFromColumnView[T <: NestedBuilder[_, _],
V <: ParquetColumnWriterOptions](
builder: ParquetColumnWriterOptions.NestedBuilder[T, V],
cv: ColumnView,
cIndex: ColumnIndex,
toClose: ArrayBuffer[ColumnView]): T = {
val dType = cv.getType
if (dType.isDecimalType) {
if (dType.getTypeId == DType.DTypeEnum.DECIMAL32) {
builder.withDecimalColumn(getTypeName(dType) + cIndex.inc(),
DType.DECIMAL32_MAX_PRECISION, true)
} else if (dType.getTypeId == DType.DTypeEnum.DECIMAL64) {
builder.withDecimalColumn(getTypeName(dType) + cIndex.inc(),
DType.DECIMAL64_MAX_PRECISION, true)
} else {
// TODO for decimal 128 or other decimal
throw new UnsupportedOperationException("not support " + dType.getTypeId)
}
} else if (dType == DType.STRUCT) {
val subBuilder = structBuilder("c_struct" + cIndex.inc(), true)
for (i <- 0 until cv.getNumChildren) {
val subCv = cv.getChildColumnView(i)
toClose += subCv
parquetWriterOptionsFromColumnView(subBuilder, subCv, cIndex, toClose)
}
builder.withStructColumn(subBuilder.build())
} else if (dType == DType.LIST) {
val subCv = cv.getChildColumnView(0)
toClose += subCv

builder.withListColumn(
parquetWriterOptionsFromColumnView(
listBuilder("c_list" + cIndex.inc(), true),
subCv,
cIndex,
toClose).build())
} else {
builder.withColumns(true, getTypeName(dType) + cIndex.inc())
}
builder.asInstanceOf[T]
}

private def getTypeName(t: DType): String = {
"c_" + t.toString.replace(' ', '_')
}
}

0 comments on commit df44623

Please sign in to comment.