Skip to content

Commit

Permalink
Support running CPU based UDF efficiently [databricks] (#3897)
Browse files Browse the repository at this point in the history
* Support running CPU based UDF.

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Address the comments

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Add more tests

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Add log for the time of running the UDF.

along with some small refactors.

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Add comments for disabling the input encoder

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Comment update

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Fix a build error on DB

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Address the new comments

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Update comments

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Update sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala

Update the config doc

Co-authored-by: Jason Lowe <jlowe@nvidia.com>

* Use the code of 3.0 for input type conversion

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* comment update

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Doc update

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Address the new comments

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Add shims for GpuRowBasedScalaUDF

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Update the test file name

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

Co-authored-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
firestarman and jlowe authored Oct 29, 2021
1 parent bfaf73b commit 96ca730
Show file tree
Hide file tree
Showing 17 changed files with 963 additions and 54 deletions.
3 changes: 2 additions & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ Name | Description | Default Value
<a name="sql.reader.batchSizeBytes"></a>spark.rapids.sql.reader.batchSizeBytes|Soft limit on the maximum number of bytes the reader reads per batch. The readers will read chunks of data until this limit is met or exceeded. Note that the reader may estimate the number of bytes that will be used on the GPU in some cases based on the schema and number of rows in each batch.|2147483647
<a name="sql.reader.batchSizeRows"></a>spark.rapids.sql.reader.batchSizeRows|Soft limit on the maximum number of rows the reader will read per batch. The orc and parquet readers will read row groups until this limit is met or exceeded. The limit is respected by the csv reader.|2147483647
<a name="sql.replaceSortMergeJoin.enabled"></a>spark.rapids.sql.replaceSortMergeJoin.enabled|Allow replacing sortMergeJoin with HashJoin|true
<a name="sql.rowBasedUDF.enabled"></a>spark.rapids.sql.rowBasedUDF.enabled|When set to true, optimizes a row-based UDF in a GPU operation by transferring only the data it needs between GPU and CPU inside a query operation, instead of falling this operation back to CPU. This is an experimental feature, and this config might be removed in the future.|false
<a name="sql.shuffle.spillThreads"></a>spark.rapids.sql.shuffle.spillThreads|Number of threads used to spill shuffle data to disk in the background.|6
<a name="sql.stableSort.enabled"></a>spark.rapids.sql.stableSort.enabled|Enable or disable stable sorting. Apache Spark's sorting is typically a stable sort, but sort stability cannot be guaranteed in distributed work loads because the order in which upstream data arrives to a task is not guaranteed. Sort stability then only matters when reading and sorting data from a file using a single task/partition. Because of limitations in the plugin when you enable stable sorting all of the data for a single task will be combined into a single batch before sorting. This currently disables spilling from GPU memory if the data size is too large.|false
<a name="sql.udfCompiler.enabled"></a>spark.rapids.sql.udfCompiler.enabled|When set to true, Scala UDFs will be considered for compilation as Catalyst expressions|false
Expand Down Expand Up @@ -264,7 +265,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.Rint"></a>spark.rapids.sql.expression.Rint|`rint`|Rounds up a double value to the nearest double equal to an integer|true|None|
<a name="sql.expression.Round"></a>spark.rapids.sql.expression.Round|`round`|Round an expression to d decimal places using HALF_UP rounding mode|true|None|
<a name="sql.expression.RowNumber"></a>spark.rapids.sql.expression.RowNumber|`row_number`|Window function that returns the index for the row within the aggregation window|true|None|
<a name="sql.expression.ScalaUDF"></a>spark.rapids.sql.expression.ScalaUDF| |User Defined Function, support requires the UDF to implement a RAPIDS accelerated interface|true|None|
<a name="sql.expression.ScalaUDF"></a>spark.rapids.sql.expression.ScalaUDF| |User Defined Function, the UDF can choose to implement a RAPIDS accelerated interface to get better performance.|true|None|
<a name="sql.expression.Second"></a>spark.rapids.sql.expression.Second|`second`|Returns the second component of the string/timestamp|true|None|
<a name="sql.expression.ShiftLeft"></a>spark.rapids.sql.expression.ShiftLeft|`shiftleft`|Bitwise shift left (<<)|true|None|
<a name="sql.expression.ShiftRight"></a>spark.rapids.sql.expression.ShiftRight|`shiftright`|Bitwise shift right (>>)|true|None|
Expand Down
2 changes: 1 addition & 1 deletion docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -10192,7 +10192,7 @@ are limited.
<tr>
<td rowSpan="2">ScalaUDF</td>
<td rowSpan="2"> </td>
<td rowSpan="2">User Defined Function, support requires the UDF to implement a RAPIDS accelerated interface</td>
<td rowSpan="2">User Defined Function, the UDF can choose to implement a RAPIDS accelerated interface to get better performance.</td>
<td rowSpan="2">None</td>
<td rowSpan="2">project</td>
<td>param</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,8 @@ class Spark320Shims extends Spark32XShims {
ParamCheck("windowSpec",
TypeSig.CALENDAR + TypeSig.NULL + TypeSig.integral + TypeSig.DECIMAL_64 +
TypeSig.DAYTIME, TypeSig.numericAndInterval))),
(windowExpression, conf, p, r) => new GpuWindowExpressionMeta(windowExpression, conf, p, r))
(windowExpression, conf, p, r) => new GpuWindowExpressionMeta(windowExpression, conf, p, r)),
GpuScalaUDFMeta.exprMeta
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap

override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ abstract class SparkBaseShims extends Spark30XShims {
}
override def convertToGpu(lhs: Expression, regexp: Expression,
rep: Expression): GpuExpression = GpuStringReplace(lhs, regexp, rep)
})
}),
GpuScalaUDFMeta.exprMeta
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids.{ExprChecks, ExprRule, GpuOverrides, GpuUserDefinedFunction, RepeatingParamCheck, TypeSig}

import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUDF}
import org.apache.spark.sql.rapids.{GpuRowBasedScalaUDFBase, ScalaUDFMetaBase}
import org.apache.spark.sql.types.DataType

/** Run a row-based UDF in a GPU operation */
case class GpuRowBasedScalaUDF(
sparkFunc: AnyRef,
dataType: DataType,
children: Seq[Expression],
inputEncoders: Seq[Option[ExpressionEncoder[_]]],
udfName: Option[String],
nullable: Boolean,
udfDeterministic: Boolean)
extends GpuRowBasedScalaUDFBase(sparkFunc, dataType, children, inputEncoders, None, udfName) {

override def createInputConverter(i: Int, dataType: DataType): Any => Any = {
if (inputEncoders.isEmpty) {
// for untyped Scala UDF
CatalystTypeConverters.createToScalaConverter(dataType)
} else {
val encoder = inputEncoders(i)
if (encoder.isDefined && encoder.get.isSerializedAsStructForTopLevel) {
val fromRow = encoder.get.resolveAndBind().createDeserializer()
row: Any => fromRow(row.asInstanceOf[InternalRow])
} else {
CatalystTypeConverters.createToScalaConverter(dataType)
}
}
}
}

object GpuScalaUDFMeta {
def exprMeta: ExprRule[ScalaUDF] = GpuOverrides.expr[ScalaUDF](
"User Defined Function, the UDF can choose to implement a RAPIDS accelerated interface " +
"to get better performance.",
ExprChecks.projectOnly(
GpuUserDefinedFunction.udfTypeSig,
TypeSig.all,
repeatingParamCheck =
Some(RepeatingParamCheck("param", GpuUserDefinedFunction.udfTypeSig, TypeSig.all))),
(expr, conf, p, r) => new ScalaUDFMetaBase(expr, conf, p, r) {
override protected def rowBasedScalaUDF: GpuRowBasedScalaUDFBase =
GpuRowBasedScalaUDF(
expr.function,
expr.dataType,
childExprs.map(_.convertToGpu()),
expr.inputEncoders,
expr.udfName,
expr.nullable,
expr.udfDeterministic)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ abstract class SparkBaseShims extends Spark30XShims {
}
override def convertToGpu(lhs: Expression, regexp: Expression,
rep: Expression): GpuExpression = GpuStringReplace(lhs, regexp, rep)
})
}),
GpuScalaUDFMeta.exprMeta
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids.{ExprChecks, ExprRule, GpuOverrides, GpuUserDefinedFunction, RepeatingParamCheck, TypeSig}

import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, GenericInternalRow, ScalaUDF}
import org.apache.spark.sql.rapids.{GpuRowBasedScalaUDFBase, ScalaUDFMetaBase}
import org.apache.spark.sql.types.DataType

/** Run a row-based UDF in a GPU operation */
case class GpuRowBasedScalaUDF(
sparkFunc: AnyRef,
dataType: DataType,
children: Seq[Expression],
inputEncoders: Seq[Option[ExpressionEncoder[_]]],
outputEncoder: Option[ExpressionEncoder[_]],
udfName: Option[String],
nullable: Boolean,
udfDeterministic: Boolean)
extends GpuRowBasedScalaUDFBase(sparkFunc, dataType, children, inputEncoders, outputEncoder,
udfName) {

override def createInputConverter(i: Int, dataType: DataType): Any => Any =
scalaConverter(i, dataType)._1

/**
* Create the converter which converts the catalyst data type to the scala data type.
* We use `CatalystTypeConverters` to create the converter for:
* - UDF which doesn't provide inputEncoders, e.g., untyped Scala UDF and Java UDF
* - type which isn't supported by `ExpressionEncoder`, e.g., Any
* - primitive types, in order to use `identity` for better performance
* For other cases like case class, Option[T], we use `ExpressionEncoder` instead since
* `CatalystTypeConverters` doesn't support these data types.
*
* @param i the index of the child
* @param dataType the output data type of the i-th child
* @return the converter and a boolean value to indicate whether the converter is
* created by using `ExpressionEncoder`.
*/
private def scalaConverter(i: Int, dataType: DataType): (Any => Any, Boolean) = {
val useEncoder =
!(inputEncoders.isEmpty || // for untyped Scala UDF and Java UDF
inputEncoders(i).isEmpty || // for types aren't supported by encoder, e.g. Any
inputPrimitives(i)) // for primitive types

if (useEncoder) {
val enc = inputEncoders(i).get
val fromRow = enc.createDeserializer()
val converter = if (enc.isSerializedAsStructForTopLevel) {
row: Any => fromRow(row.asInstanceOf[InternalRow])
} else {
val inputRow = new GenericInternalRow(1)
value: Any => inputRow.update(0, value); fromRow(inputRow)
}
(converter, true)
} else { // use CatalystTypeConverters
(CatalystTypeConverters.createToScalaConverter(dataType), false)
}
}
}

object GpuScalaUDFMeta {
def exprMeta: ExprRule[ScalaUDF] = GpuOverrides.expr[ScalaUDF](
"User Defined Function, the UDF can choose to implement a RAPIDS accelerated interface " +
"to get better performance.",
ExprChecks.projectOnly(
GpuUserDefinedFunction.udfTypeSig,
TypeSig.all,
repeatingParamCheck =
Some(RepeatingParamCheck("param", GpuUserDefinedFunction.udfTypeSig, TypeSig.all))),
(expr, conf, p, r) => new ScalaUDFMetaBase(expr, conf, p, r) {
override protected def rowBasedScalaUDF: GpuRowBasedScalaUDFBase =
GpuRowBasedScalaUDF(
expr.function,
expr.dataType,
childExprs.map(_.convertToGpu()),
expr.inputEncoders,
expr.outputEncoder,
expr.udfName,
expr.nullable,
expr.udfDeterministic)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,8 @@ abstract class SparkBaseShims extends Spark30XShims {
override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = {
GpuElementAt(lhs, rhs, SQLConf.get.ansiEnabled)
}
})
}),
GpuScalaUDFMeta.exprMeta
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap

override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,8 @@ abstract class SparkBaseShims extends Spark31XShims {
override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = {
GpuElementAt(lhs, rhs, SQLConf.get.ansiEnabled)
}
})
}),
GpuScalaUDFMeta.exprMeta
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap

override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ private static void debugInteger(HostColumnVectorCore hostCol, DType intType) {
}
}

private static HostColumnVector.DataType convertFrom(DataType spark, boolean nullable) {
static HostColumnVector.DataType convertFrom(DataType spark, boolean nullable) {
if (spark instanceof ArrayType) {
ArrayType arrayType = (ArrayType) spark;
return new HostColumnVector.ListType(nullable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3198,8 +3198,7 @@ object GpuOverrides extends Logging {
(a, conf, p, r) => new ExprMeta[CreateMap](a, conf, p, r) {
override def convertToGpu(): GpuExpression = GpuCreateMap(childExprs.map(_.convertToGpu()))
}
),
GpuScalaUDF.exprMeta
)
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap

// Shim expressions should be last to allow overrides with shim-specific versions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private class GpuRowToColumnConverter(schema: StructType) extends Serializable w
}
}

private object GpuRowToColumnConverter {
private[rapids] object GpuRowToColumnConverter {
// Sizes estimates for different things
/*
* size of an offset entry. In general we have 1 more offset entry than rows, so
Expand All @@ -75,7 +75,7 @@ private object GpuRowToColumnConverter {
private[this] val VALIDITY = 0.125 // 1/8th of a byte (1 bit)
private[this] val VALIDITY_N_OFFSET = OFFSET + VALIDITY

private abstract class TypeConverter extends Serializable {
private[rapids] abstract class TypeConverter extends Serializable {
/** Append row value to the column builder and return the number of data bytes written */
def append(row: SpecializedGetters,
column: Int,
Expand All @@ -92,7 +92,7 @@ private object GpuRowToColumnConverter {
private def getConverterFor(field: StructField): TypeConverter =
getConverterForType(field.dataType, field.nullable)

private def getConverterForType(dataType: DataType, nullable: Boolean): TypeConverter = {
private[rapids] def getConverterForType(dataType: DataType, nullable: Boolean): TypeConverter = {
(dataType, nullable) match {
case (BooleanType, true) => BooleanConverter
case (BooleanType, false) => NotNullBooleanConverter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

package com.nvidia.spark.rapids

import ai.rapids.cudf.{NvtxColor, NvtxRange}
import ai.rapids.cudf.{HostColumnVector, NvtxColor, NvtxRange}
import com.nvidia.spark.RapidsUDF
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.shims.v2.ShimExpression

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.expressions.UserDefinedExpression
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UserDefinedExpression}
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand Down Expand Up @@ -74,3 +76,63 @@ object GpuUserDefinedFunction {
val udfTypeSig: TypeSig = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_64 + TypeSig.NULL +
TypeSig.BINARY + TypeSig.CALENDAR + TypeSig.ARRAY + TypeSig.MAP + TypeSig.STRUCT).nested()
}

/**
* Execute a row based UDF efficiently by pulling back only the columns the UDF needs to host
* and do the processing on CPU.
*/
trait GpuRowBasedUserDefinedFunction extends GpuExpression
with ShimExpression with UserDefinedExpression with Serializable with Logging {
/** name of the UDF function */
val name: String

/** True if the UDF is deterministic */
val udfDeterministic: Boolean

/** The row based function of the UDF. */
protected def evaluateRow(childrenRow: InternalRow): Any

private[this] lazy val inputTypesString = children.map(_.dataType.catalogString).mkString(", ")
private[this] lazy val outputType = dataType.catalogString

override lazy val deterministic: Boolean = udfDeterministic && children.forall(_.deterministic)

override def columnarEval(batch: ColumnarBatch): Any = {
val cpuUDFStart = System.nanoTime
// These child columns will be closed by `ColumnarToRowIterator`.
val argCols = children.safeMap(GpuExpressionsUtils.columnarEvalToColumn(_, batch))
val prepareArgsEnd = System.nanoTime
try {
// 1 Convert the argument columns to row.
// 2 Evaluate the CPU UDF row by row and cache the result.
// 3 Build a result column from the cache.
val retConverter = GpuRowToColumnConverter.getConverterForType(dataType, nullable)
val retType = GpuColumnVector.convertFrom(dataType, nullable)
val retRow = new GenericInternalRow(size = 1)
closeOnExcept(new HostColumnVector.ColumnBuilder(retType, batch.numRows)) { builder =>
new ColumnarToRowIterator(
Iterator.single(new ColumnarBatch(argCols.toArray, batch.numRows())),
NoopMetric,
NoopMetric,
NoopMetric,
NoopMetric).foreach { row =>
retRow.update(0, evaluateRow(row))
retConverter.append(retRow, 0, builder)
}
closeOnExcept(builder.buildAndPutOnDevice()) { resultCol =>
val cpuRunningTime = System.nanoTime - prepareArgsEnd
// Use log to record the eclipsed time for the UDF running before
// figuring out how to support Spark metrics in this expression.
logDebug(s"It took ${cpuRunningTime} ns to run UDF $name, and " +
s"${prepareArgsEnd - cpuUDFStart} ns to get the input from children.")
GpuColumnVector.from(resultCol, dataType)
}
}
} catch {
case e: Exception =>
throw new SparkException("Failed to execute user defined function: " +
s"($name: ($inputTypesString) => $outputType)", e)
}
} // end of `columnarEval`

}
Loading

0 comments on commit 96ca730

Please sign in to comment.