Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support running CPU based UDF efficiently [databricks] #3897

Merged
merged 18 commits into from
Oct 29, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ Name | Description | Default Value
<a name="sql.reader.batchSizeBytes"></a>spark.rapids.sql.reader.batchSizeBytes|Soft limit on the maximum number of bytes the reader reads per batch. The readers will read chunks of data until this limit is met or exceeded. Note that the reader may estimate the number of bytes that will be used on the GPU in some cases based on the schema and number of rows in each batch.|2147483647
<a name="sql.reader.batchSizeRows"></a>spark.rapids.sql.reader.batchSizeRows|Soft limit on the maximum number of rows the reader will read per batch. The orc and parquet readers will read row groups until this limit is met or exceeded. The limit is respected by the csv reader.|2147483647
<a name="sql.replaceSortMergeJoin.enabled"></a>spark.rapids.sql.replaceSortMergeJoin.enabled|Allow replacing sortMergeJoin with HashJoin|true
<a name="sql.rowBasedUDF.enabled"></a>spark.rapids.sql.rowBasedUDF.enabled|When set to true, optimizes a row-based UDF in a GPU operation by transferring only the data it needs between GPU and CPU inside a query operation, instead of falling this operation back to CPU. This is an experimental feature, and this config might be removed in the future.|false
<a name="sql.shuffle.spillThreads"></a>spark.rapids.sql.shuffle.spillThreads|Number of threads used to spill shuffle data to disk in the background.|6
<a name="sql.stableSort.enabled"></a>spark.rapids.sql.stableSort.enabled|Enable or disable stable sorting. Apache Spark's sorting is typically a stable sort, but sort stability cannot be guaranteed in distributed work loads because the order in which upstream data arrives to a task is not guaranteed. Sort stability then only matters when reading and sorting data from a file using a single task/partition. Because of limitations in the plugin when you enable stable sorting all of the data for a single task will be combined into a single batch before sorting. This currently disables spilling from GPU memory if the data size is too large.|false
<a name="sql.udfCompiler.enabled"></a>spark.rapids.sql.udfCompiler.enabled|When set to true, Scala UDFs will be considered for compilation as Catalyst expressions|false
Expand Down Expand Up @@ -264,7 +265,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.Rint"></a>spark.rapids.sql.expression.Rint|`rint`|Rounds up a double value to the nearest double equal to an integer|true|None|
<a name="sql.expression.Round"></a>spark.rapids.sql.expression.Round|`round`|Round an expression to d decimal places using HALF_UP rounding mode|true|None|
<a name="sql.expression.RowNumber"></a>spark.rapids.sql.expression.RowNumber|`row_number`|Window function that returns the index for the row within the aggregation window|true|None|
<a name="sql.expression.ScalaUDF"></a>spark.rapids.sql.expression.ScalaUDF| |User Defined Function, support requires the UDF to implement a RAPIDS accelerated interface|true|None|
<a name="sql.expression.ScalaUDF"></a>spark.rapids.sql.expression.ScalaUDF| |User Defined Function, the UDF can choose to implement a RAPIDS accelerated interface to get better performance.|true|None|
<a name="sql.expression.Second"></a>spark.rapids.sql.expression.Second|`second`|Returns the second component of the string/timestamp|true|None|
<a name="sql.expression.ShiftLeft"></a>spark.rapids.sql.expression.ShiftLeft|`shiftleft`|Bitwise shift left (<<)|true|None|
<a name="sql.expression.ShiftRight"></a>spark.rapids.sql.expression.ShiftRight|`shiftright`|Bitwise shift right (>>)|true|None|
Expand Down
2 changes: 1 addition & 1 deletion docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -10192,7 +10192,7 @@ are limited.
<tr>
<td rowSpan="2">ScalaUDF</td>
<td rowSpan="2"> </td>
<td rowSpan="2">User Defined Function, support requires the UDF to implement a RAPIDS accelerated interface</td>
<td rowSpan="2">User Defined Function, the UDF can choose to implement a RAPIDS accelerated interface to get better performance.</td>
<td rowSpan="2">None</td>
<td rowSpan="2">project</td>
<td>param</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,18 @@ class Spark320Shims extends Spark32XShims {
ParamCheck("windowSpec",
TypeSig.CALENDAR + TypeSig.NULL + TypeSig.integral + TypeSig.DECIMAL_64 +
TypeSig.DAYTIME, TypeSig.numericAndInterval))),
(windowExpression, conf, p, r) => new GpuWindowExpressionMeta(windowExpression, conf, p, r))
(windowExpression, conf, p, r) => new GpuWindowExpressionMeta(windowExpression, conf, p, r)),
GpuOverrides.expr[ScalaUDF](
"User Defined Function, the UDF can choose to implement a RAPIDS accelerated interface " +
"to get better performance.",
ExprChecks.projectOnly(
GpuUserDefinedFunction.udfTypeSig,
TypeSig.all,
repeatingParamCheck =
Some(RepeatingParamCheck("param", GpuUserDefinedFunction.udfTypeSig, TypeSig.all))),
(a, conf, p, r) => new BaseScalaUDFMeta(a, conf, p, r) {
override protected def outputEncoder: Option[ExpressionEncoder[_]] = a.outputEncoder
})
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap

override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python._
import org.apache.spark.sql.execution.window.WindowExecBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.{GpuAbs, GpuAverage, GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub}
import org.apache.spark.sql.rapids.{GpuAbs, GpuAverage, GpuFileSourceScanExec, GpuScalaUDF, GpuStringReplace, GpuTimeSub}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase, JoinTypeChecks, SerializeBatchDeserializeHostBuffer, SerializeConcatHostBuffersDeserializeBatch, TrampolineUtil}
import org.apache.spark.sql.rapids.execution.python._
import org.apache.spark.sql.rapids.execution.python.shims.v2._
Expand Down Expand Up @@ -343,7 +343,8 @@ abstract class SparkBaseShims extends Spark30XShims {
}
override def convertToGpu(lhs: Expression, regexp: Expression,
rep: Expression): GpuExpression = GpuStringReplace(lhs, regexp, rep)
})
}),
GpuScalaUDF.exprMeta30X
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNes
import org.apache.spark.sql.execution.python.{AggregateInPandasExec, ArrowEvalPythonExec, FlatMapGroupsInPandasExec, MapInPandasExec, WindowInPandasExec}
import org.apache.spark.sql.execution.window.WindowExecBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.{GpuAbs, GpuAverage, GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub}
import org.apache.spark.sql.rapids.{GpuAbs, GpuAverage, GpuFileSourceScanExec, GpuScalaUDF, GpuStringReplace, GpuTimeSub}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase, JoinTypeChecks, SerializeBatchDeserializeHostBuffer, SerializeConcatHostBuffersDeserializeBatch}
import org.apache.spark.sql.rapids.execution.python._
import org.apache.spark.sql.rapids.execution.python.shims.v2._
Expand Down Expand Up @@ -306,7 +306,8 @@ abstract class SparkBaseShims extends Spark30XShims {
}
override def convertToGpu(lhs: Expression, regexp: Expression,
rep: Expression): GpuExpression = GpuStringReplace(lhs, regexp, rep)
})
}),
GpuScalaUDF.exprMeta30X
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,17 @@ abstract class SparkBaseShims extends Spark30XShims {
override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = {
GpuElementAt(lhs, rhs, SQLConf.get.ansiEnabled)
}
}),
GpuOverrides.expr[ScalaUDF](
"User Defined Function, the UDF can choose to implement a RAPIDS accelerated interface " +
"to get better performance.",
ExprChecks.projectOnly(
GpuUserDefinedFunction.udfTypeSig,
TypeSig.all,
repeatingParamCheck =
Some(RepeatingParamCheck("param", GpuUserDefinedFunction.udfTypeSig, TypeSig.all))),
(a, conf, p, r) => new BaseScalaUDFMeta(a, conf, p, r) {
override protected def outputEncoder: Option[ExpressionEncoder[_]] = a.outputEncoder
})
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,17 @@ abstract class SparkBaseShims extends Spark31XShims {
override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = {
GpuElementAt(lhs, rhs, SQLConf.get.ansiEnabled)
}
}),
GpuOverrides.expr[ScalaUDF](
"User Defined Function, the UDF can choose to implement a RAPIDS accelerated interface " +
"to get better performance.",
ExprChecks.projectOnly(
GpuUserDefinedFunction.udfTypeSig,
TypeSig.all,
repeatingParamCheck =
Some(RepeatingParamCheck("param", GpuUserDefinedFunction.udfTypeSig, TypeSig.all))),
(a, conf, p, r) => new BaseScalaUDFMeta(a, conf, p, r) {
override protected def outputEncoder: Option[ExpressionEncoder[_]] = a.outputEncoder
})
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ private static void debugInteger(HostColumnVectorCore hostCol, DType intType) {
}
}

private static HostColumnVector.DataType convertFrom(DataType spark, boolean nullable) {
static HostColumnVector.DataType convertFrom(DataType spark, boolean nullable) {
if (spark instanceof ArrayType) {
ArrayType arrayType = (ArrayType) spark;
return new HostColumnVector.ListType(nullable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3198,8 +3198,7 @@ object GpuOverrides extends Logging {
(a, conf, p, r) => new ExprMeta[CreateMap](a, conf, p, r) {
override def convertToGpu(): GpuExpression = GpuCreateMap(childExprs.map(_.convertToGpu()))
}
),
GpuScalaUDF.exprMeta
)
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap

// Shim expressions should be last to allow overrides with shim-specific versions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private class GpuRowToColumnConverter(schema: StructType) extends Serializable w
}
}

private object GpuRowToColumnConverter {
private[rapids] object GpuRowToColumnConverter {
// Sizes estimates for different things
/*
* size of an offset entry. In general we have 1 more offset entry than rows, so
Expand All @@ -75,7 +75,7 @@ private object GpuRowToColumnConverter {
private[this] val VALIDITY = 0.125 // 1/8th of a byte (1 bit)
private[this] val VALIDITY_N_OFFSET = OFFSET + VALIDITY

private abstract class TypeConverter extends Serializable {
private[rapids] abstract class TypeConverter extends Serializable {
/** Append row value to the column builder and return the number of data bytes written */
def append(row: SpecializedGetters,
column: Int,
Expand All @@ -92,7 +92,7 @@ private object GpuRowToColumnConverter {
private def getConverterFor(field: StructField): TypeConverter =
getConverterForType(field.dataType, field.nullable)

private def getConverterForType(dataType: DataType, nullable: Boolean): TypeConverter = {
private[rapids] def getConverterForType(dataType: DataType, nullable: Boolean): TypeConverter = {
(dataType, nullable) match {
case (BooleanType, true) => BooleanConverter
case (BooleanType, false) => NotNullBooleanConverter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

package com.nvidia.spark.rapids

import ai.rapids.cudf.{NvtxColor, NvtxRange}
import ai.rapids.cudf.{HostColumnVector, NvtxColor, NvtxRange}
import com.nvidia.spark.RapidsUDF
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.shims.v2.ShimExpression

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.expressions.UserDefinedExpression
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UserDefinedExpression}
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand Down Expand Up @@ -74,3 +76,63 @@ object GpuUserDefinedFunction {
val udfTypeSig: TypeSig = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_64 + TypeSig.NULL +
TypeSig.BINARY + TypeSig.CALENDAR + TypeSig.ARRAY + TypeSig.MAP + TypeSig.STRUCT).nested()
}

/**
* Execute a row based UDF efficiently by pulling back only the columns the UDF needs to host
* and do the processing on CPU.
*/
trait GpuRowBasedUserDefinedFunction extends GpuExpression
with ShimExpression with UserDefinedExpression with Serializable with Logging {
/** name of the UDF function */
val name: String

/** True if the UDF is deterministic */
val udfDeterministic: Boolean

/** The row based function of the UDF. */
protected def evaluateRow(childrenRow: InternalRow): Any

private[this] lazy val inputTypesString = children.map(_.dataType.catalogString).mkString(", ")
private[this] lazy val outputType = dataType.catalogString

override lazy val deterministic: Boolean = udfDeterministic && children.forall(_.deterministic)

override def columnarEval(batch: ColumnarBatch): Any = {
val cpuUDFStart = System.nanoTime
// These child columns will be closed by `ColumnarToRowIterator`.
val argCols = children.safeMap(GpuExpressionsUtils.columnarEvalToColumn(_, batch))
val prepareArgsEnd = System.nanoTime
try {
// 1 Convert the argument columns to row.
// 2 Evaluate the CPU UDF row by row and cache the result.
// 3 Build a result column from the cache.
val retConverter = GpuRowToColumnConverter.getConverterForType(dataType, nullable)
val retType = GpuColumnVector.convertFrom(dataType, nullable)
val retRow = new GenericInternalRow(size = 1)
closeOnExcept(new HostColumnVector.ColumnBuilder(retType, batch.numRows)) { builder =>
new ColumnarToRowIterator(
Iterator.single(new ColumnarBatch(argCols.toArray, batch.numRows())),
NoopMetric,
NoopMetric,
NoopMetric,
NoopMetric).foreach { row =>
retRow.update(0, evaluateRow(row))
retConverter.append(retRow, 0, builder)
}
closeOnExcept(builder.buildAndPutOnDevice()) { resultCol =>
val cpuRunningTime = System.nanoTime - prepareArgsEnd
// Use log of info. level to record the eclipsed time for the UDF running before
// figuring out how to support Spark metrics in this expression.
logInfo(s"It took ${cpuRunningTime} ns to run UDF $name, and " +
revans2 marked this conversation as resolved.
Show resolved Hide resolved
s"${prepareArgsEnd - cpuUDFStart} ns to get the input from children.")
GpuColumnVector.from(resultCol, dataType)
}
}
} catch {
case e: Exception =>
throw new SparkException("Failed to execute user defined function: " +
s"($name: ($inputTypesString) => $outputType)", e)
}
} // end of `columnarEval`

}
12 changes: 12 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,16 @@ object RapidsConf {
.booleanConf
.createWithDefault(true)

// This is an experimental feature now. And eventually, should be enabled or disabled depending
// on something that we don't know yet but would try to figure out.
val ENABLE_CPU_BASED_UDF = conf("spark.rapids.sql.rowBasedUDF.enabled")
.doc("When set to true, optimizes a row-based UDF in a GPU operation by transferring " +
"only the data it needs between GPU and CPU inside a query operation, instead of falling " +
"this operation back to CPU. This is an experimental feature, and this config might be " +
"removed in the future.")
.booleanConf
.createWithDefault(false)

object ParquetReaderType extends Enumeration {
val AUTO, COALESCING, MULTITHREADED, PERFILE = Value
}
Expand Down Expand Up @@ -1715,6 +1725,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isRangeWindowLongEnabled: Boolean = get(ENABLE_RANGE_WINDOW_LONG)

lazy val isCpuBasedUDFEnabled: Boolean = get(ENABLE_CPU_BASED_UDF)

private val optimizerDefaults = Map(
// this is not accurate because CPU projections do have a cost due to appending values
// to each row that is produced, but this needs to be a really small number because
Expand Down
15 changes: 3 additions & 12 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,9 @@ object GpuScalar extends Arm with Logging {
* Resolves a cudf `HostColumnVector.DataType` from a Spark `DataType`.
* The returned type will be used by the `ColumnVector.fromXXX` family.
*/
private[rapids] def resolveElementType(dt: DataType): HostColumnVector.DataType = dt match {
case ArrayType(elementType, _) =>
new HostColumnVector.ListType(true, resolveElementType(elementType))
case StructType(fields) =>
new HostColumnVector.StructType(true, fields.map(f => resolveElementType(f.dataType)): _*)
case MapType(keyType, valueType, _) =>
new HostColumnVector.ListType(true,
new HostColumnVector.StructType(true,
resolveElementType(keyType),
resolveElementType(valueType)))
case other =>
new HostColumnVector.BasicType(true, GpuColumnVector.getNonNestedRapidsType(other))
private[rapids] def resolveElementType(dt: DataType,
nullable: Boolean = true): HostColumnVector.DataType = {
GpuColumnVector.convertFrom(dt, nullable)
}

/**
Expand Down
Loading