diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala
index e3b73932fb3..d5c6761c43d 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala
@@ -16,6 +16,8 @@
package com.nvidia.spark.rapids
+import java.io.OutputStream
+
import scala.collection.mutable
import ai.rapids.cudf.{HostBufferConsumer, HostMemoryBuffer, NvtxColor, NvtxRange, Table, TableWriter}
@@ -79,27 +81,11 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext,
buffers += Tuple2(buffer, len)
def writeBufferedData(): Unit = {
- val toProcess = buffers.dequeueAll(_ => true)
- try {
- toProcess.foreach(ops => {
- val buffer = ops._1
- var len = ops._2
- var offset: Long = 0
- while (len > 0) {
- val toCopy = math.min(tempBuffer.length, len).toInt
- buffer.getBytes(tempBuffer, 0, offset, toCopy)
- outputStream.write(tempBuffer, 0, toCopy)
- len = len - toCopy
- offset = offset + toCopy
- }
- })
- } finally {
- toProcess.map(_._1).safeClose()
- }
+ ColumnarOutputWriter.writeBufferedData(buffers, tempBuffer, outputStream)
}
/**
- * Persists a columnar batch. Invoked on the executor side. When writing to dynamically
+ * Persists a columnar batch. Invoked on the executor side. When writing to dynamically
* partitioned tables, dynamic partition columns are not included in columns to be written.
* NOTE: It is the writer's responsibility to close the batch.
*/
@@ -180,3 +166,26 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext,
outputStream.close()
}
}
+
+object ColumnarOutputWriter {
+ // write buffers to outputStream via tempBuffer and close buffers
+ def writeBufferedData(buffers: mutable.Queue[(HostMemoryBuffer, Long)],
+ tempBuffer: Array[Byte], outputStream: OutputStream): Unit = {
+ val toProcess = buffers.dequeueAll(_ => true)
+ try {
+ toProcess.foreach { case (buffer, len) =>
+ var offset: Long = 0
+ var left = len
+ while (left > 0) {
+ val toCopy = math.min(tempBuffer.length, left).toInt
+ buffer.getBytes(tempBuffer, 0, offset, toCopy)
+ outputStream.write(tempBuffer, 0, toCopy)
+ left = left - toCopy
+ offset = offset + toCopy
+ }
+ }
+ } finally {
+ toProcess.map { case (buffer, len) => buffer }.safeClose()
+ }
+ }
+}
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala
new file mode 100644
index 00000000000..0f676203be8
--- /dev/null
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala
@@ -0,0 +1,206 @@
+/*
+ * Copyright (c) 2021, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.nvidia.spark.rapids
+
+import java.io.{File, FileOutputStream}
+import java.util.Random
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import ai.rapids.cudf.{
+ ColumnView, CompressionType, DType, HostBufferConsumer, HostMemoryBuffer,
+ ParquetColumnWriterOptions, ParquetWriterOptions, Table, TableWriter
+}
+import ai.rapids.cudf.ParquetColumnWriterOptions.{listBuilder, structBuilder, NestedBuilder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+object DumpUtils extends Logging with Arm {
+ /**
+ * Debug utility to dump columnar batch to parquet file.
+ * It's running on GPU. Parquet column names are generated from columnar batch type info.
+ *
+ * @param columnarBatch the columnar batch to be dumped, should be GPU columnar batch
+ * @param filePrefix parquet file prefix, e.g. /tmp/my-debug-prefix-
+ * @return parquet file path if dump is successful, e.g. /tmp/my-debug-prefix-123.parquet
+ */
+ def dumpToParquetFile(columnarBatch: ColumnarBatch, filePrefix: String): Option[String] = {
+ if (columnarBatch.numCols() == 0) {
+ logWarning("dump to parquet failed, has no column, file prefix is " + filePrefix)
+ None
+ } else {
+ Some(dumpToParquetFileImpl(columnarBatch, filePrefix))
+ }
+ }
+
+ /**
+ * Debug utility to dump table to parquet file.
+ * It's running on GPU. Parquet column names are generated from table column type info.
+ *
+ * @param table the table to be dumped, should be GPU columnar batch
+ * @param filePrefix parquet file prefix, e.g. /tmp/my-debug-prefix-
+ * @return parquet file path if dump is successful, e.g. /tmp/my-debug-prefix-123.parquet
+ */
+ def dumpToParquetFile(table: Table, filePrefix: String): Option[String] = {
+ if (table.getNumberOfColumns == 0) {
+ logWarning("dump to parquet failed, has no column, file prefix is " + filePrefix)
+ None
+ } else {
+ Some(dumpToParquetFileImp(table, filePrefix))
+ }
+ }
+
+ private def dumpToParquetFileImp(table: Table, filePrefix: String): String = {
+ val path = genPath(filePrefix)
+ withResource(new ParquetDumper(path, table)) { dumper =>
+ dumper.writeTable(table)
+ path
+ }
+ }
+
+ private def dumpToParquetFileImpl(columnarBatch: ColumnarBatch, filePrefix: String): String = {
+ // transform to table then dump
+ withResource(GpuColumnVector.from(columnarBatch)) { table =>
+ dumpToParquetFileImp(table, filePrefix)
+ }
+ }
+
+ private def genPath(filePrefix: String): String = {
+ var path = ""
+ val random = new Random
+ var succeeded = false
+ while (!succeeded) {
+ path = filePrefix + random.nextInt(Int.MaxValue) + ".parquet"
+ if (!new File(path).exists()) {
+ succeeded = true
+ }
+ }
+ path
+ }
+}
+
+// parquet dumper
+class ParquetDumper(path: String, table: Table) extends HostBufferConsumer
+ with Arm with AutoCloseable {
+ private[this] val outputStream = new FileOutputStream(path)
+ private[this] val tempBuffer = new Array[Byte](128 * 1024)
+ private[this] val buffers = mutable.Queue[(HostMemoryBuffer, Long)]()
+
+ val tableWriter: TableWriter = {
+ // avoid anything conversion, just dump as it is
+ val builder = ParquetDumper.parquetWriterOptionsFromTable(ParquetWriterOptions.builder(), table)
+ .withCompressionType(ParquetDumper.COMPRESS_TYPE)
+ Table.writeParquetChunked(builder.build(), this)
+ }
+
+ override
+ def handleBuffer(buffer: HostMemoryBuffer, len: Long): Unit =
+ buffers += Tuple2(buffer, len)
+
+ def writeBufferedData(): Unit = {
+ ColumnarOutputWriter.writeBufferedData(buffers, tempBuffer, outputStream)
+ }
+
+ def writeTable(table: Table): Unit = {
+ tableWriter.write(table)
+ writeBufferedData()
+ }
+
+ /**
+ * Closes the [[ParquetDumper]]. Invoked on the executor side after all columnar batches
+ * are persisted, before the task output is committed.
+ */
+ def close(): Unit = {
+ tableWriter.close()
+ writeBufferedData()
+ outputStream.close()
+ }
+}
+
+private class ColumnIndex() {
+ var i = 0
+
+ def inc(): Int = {
+ i = i + 1
+ i
+ }
+}
+
+object ParquetDumper extends Arm {
+ val COMPRESS_TYPE = CompressionType.SNAPPY
+
+ def parquetWriterOptionsFromTable[T <: NestedBuilder[_, _], V <: ParquetColumnWriterOptions](
+ builder: ParquetColumnWriterOptions.NestedBuilder[T, V],
+ table: Table): T = {
+
+ val cIndex = new ColumnIndex
+ withResource(new ArrayBuffer[ColumnView]) { toClose =>
+ for (i <- 0 until table.getNumberOfColumns) {
+ parquetWriterOptionsFromColumnView(builder, table.getColumn(i), cIndex, toClose)
+ }
+
+ builder.asInstanceOf[T]
+ }
+ }
+
+ private def parquetWriterOptionsFromColumnView[T <: NestedBuilder[_, _],
+ V <: ParquetColumnWriterOptions](
+ builder: ParquetColumnWriterOptions.NestedBuilder[T, V],
+ cv: ColumnView,
+ cIndex: ColumnIndex,
+ toClose: ArrayBuffer[ColumnView]): T = {
+ val dType = cv.getType
+ if (dType.isDecimalType) {
+ if (dType.getTypeId == DType.DTypeEnum.DECIMAL32) {
+ builder.withDecimalColumn(getTypeName(dType) + cIndex.inc(),
+ DType.DECIMAL32_MAX_PRECISION, true)
+ } else if (dType.getTypeId == DType.DTypeEnum.DECIMAL64) {
+ builder.withDecimalColumn(getTypeName(dType) + cIndex.inc(),
+ DType.DECIMAL64_MAX_PRECISION, true)
+ } else {
+ // TODO for decimal 128 or other decimal
+ throw new UnsupportedOperationException("not support " + dType.getTypeId)
+ }
+ } else if (dType == DType.STRUCT) {
+ val subBuilder = structBuilder("c_struct" + cIndex.inc(), true)
+ for (i <- 0 until cv.getNumChildren) {
+ val subCv = cv.getChildColumnView(i)
+ toClose += subCv
+ parquetWriterOptionsFromColumnView(subBuilder, subCv, cIndex, toClose)
+ }
+ builder.withStructColumn(subBuilder.build())
+ } else if (dType == DType.LIST) {
+ val subCv = cv.getChildColumnView(0)
+ toClose += subCv
+
+ builder.withListColumn(
+ parquetWriterOptionsFromColumnView(
+ listBuilder("c_list" + cIndex.inc(), true),
+ subCv,
+ cIndex,
+ toClose).build())
+ } else {
+ builder.withColumns(true, getTypeName(dType) + cIndex.inc())
+ }
+ builder.asInstanceOf[T]
+ }
+
+ private def getTypeName(t: DType): String = {
+ "c_" + t.toString.replace(' ', '_')
+ }
+}