Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support running CPU based UDF efficiently [databricks] #3897

Merged
merged 18 commits into from
Oct 29, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,17 @@ class Spark320Shims extends Spark32XShims {
ParamCheck("windowSpec",
TypeSig.CALENDAR + TypeSig.NULL + TypeSig.integral + TypeSig.DECIMAL_64 +
TypeSig.DAYTIME, TypeSig.numericAndInterval))),
(windowExpression, conf, p, r) => new GpuWindowExpressionMeta(windowExpression, conf, p, r))
(windowExpression, conf, p, r) => new GpuWindowExpressionMeta(windowExpression, conf, p, r)),
GpuOverrides.expr[ScalaUDF](
"User Defined Function, support requires the UDF to implement a RAPIDS accelerated interface",
revans2 marked this conversation as resolved.
Show resolved Hide resolved
ExprChecks.projectOnly(
GpuUserDefinedFunction.udfTypeSig,
TypeSig.all,
repeatingParamCheck =
Some(RepeatingParamCheck("param", GpuUserDefinedFunction.udfTypeSig, TypeSig.all))),
(a, conf, p, r) => new BaseScalaUDFMeta(a, conf, p, r) {
override protected def outputEncoder: Option[ExpressionEncoder[_]] = a.outputEncoder
})
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap

override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python._
import org.apache.spark.sql.execution.window.WindowExecBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.{GpuAbs, GpuAverage, GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub}
import org.apache.spark.sql.rapids.{GpuAbs, GpuAverage, GpuFileSourceScanExec, GpuScalaUDF, GpuStringReplace, GpuTimeSub}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase, JoinTypeChecks, SerializeBatchDeserializeHostBuffer, SerializeConcatHostBuffersDeserializeBatch, TrampolineUtil}
import org.apache.spark.sql.rapids.execution.python._
import org.apache.spark.sql.rapids.execution.python.shims.v2._
Expand Down Expand Up @@ -343,7 +343,8 @@ abstract class SparkBaseShims extends Spark30XShims {
}
override def convertToGpu(lhs: Expression, regexp: Expression,
rep: Expression): GpuExpression = GpuStringReplace(lhs, regexp, rep)
})
}),
GpuScalaUDF.exprMeta30X
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNes
import org.apache.spark.sql.execution.python.{AggregateInPandasExec, ArrowEvalPythonExec, FlatMapGroupsInPandasExec, MapInPandasExec, WindowInPandasExec}
import org.apache.spark.sql.execution.window.WindowExecBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.{GpuAbs, GpuAverage, GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub}
import org.apache.spark.sql.rapids.{GpuAbs, GpuAverage, GpuFileSourceScanExec, GpuScalaUDF, GpuStringReplace, GpuTimeSub}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase, JoinTypeChecks, SerializeBatchDeserializeHostBuffer, SerializeConcatHostBuffersDeserializeBatch}
import org.apache.spark.sql.rapids.execution.python._
import org.apache.spark.sql.rapids.execution.python.shims.v2._
Expand Down Expand Up @@ -306,7 +306,8 @@ abstract class SparkBaseShims extends Spark30XShims {
}
override def convertToGpu(lhs: Expression, regexp: Expression,
rep: Expression): GpuExpression = GpuStringReplace(lhs, regexp, rep)
})
}),
GpuScalaUDF.exprMeta30X
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,15 @@ abstract class SparkBaseShims extends Spark30XShims {
override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = {
GpuElementAt(lhs, rhs, SQLConf.get.ansiEnabled)
}
}),
GpuOverrides.expr[ScalaUDF](
"User Defined Function, support requires the UDF to implement a RAPIDS accelerated interface",
revans2 marked this conversation as resolved.
Show resolved Hide resolved
ExprChecks.projectOnly(
udfTypeSig,
TypeSig.all,
repeatingParamCheck = Some(RepeatingParamCheck("param", udfTypeSig, TypeSig.all))),
(a, conf, p, r) => new BaseScalaUDFMeta(a, conf, p, r) {
override protected def outputEncoder: Option[ExpressionEncoder[_]] = a.outputEncoder
})
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,16 @@ abstract class SparkBaseShims extends Spark31XShims {
override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = {
GpuElementAt(lhs, rhs, SQLConf.get.ansiEnabled)
}
}),
GpuOverrides.expr[ScalaUDF](
"User Defined Function, support requires the UDF to implement a RAPIDS accelerated interface",
revans2 marked this conversation as resolved.
Show resolved Hide resolved
ExprChecks.projectOnly(
GpuUserDefinedFunction.udfTypeSig,
TypeSig.all,
repeatingParamCheck =
Some(RepeatingParamCheck("param", GpuUserDefinedFunction.udfTypeSig, TypeSig.all))),
(a, conf, p, r) => new BaseScalaUDFMeta(a, conf, p, r) {
override protected def outputEncoder: Option[ExpressionEncoder[_]] = a.outputEncoder
})
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3236,8 +3236,7 @@ object GpuOverrides extends Logging {
(a, conf, p, r) => new ExprMeta[CreateMap](a, conf, p, r) {
override def convertToGpu(): GpuExpression = GpuCreateMap(childExprs.map(_.convertToGpu()))
}
),
GpuScalaUDF.exprMeta
)
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap

// Shim expressions should be last to allow overrides with shim-specific versions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@

package com.nvidia.spark.rapids

import ai.rapids.cudf.{NvtxColor, NvtxRange}
import ai.rapids.cudf.{ColumnVector, HostColumnVector, NvtxColor, NvtxRange}
import com.nvidia.spark.RapidsUDF
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.shims.v2.ShimExpression

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UserDefinedExpression
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.unsafe.types.UTF8String

/** Common implementation across all RAPIDS accelerated UDF types */
trait GpuUserDefinedFunction extends GpuExpression
Expand All @@ -43,8 +47,8 @@ trait GpuUserDefinedFunction extends GpuExpression

private[this] val nvtxRangeName = s"UDF: $name"
private[this] lazy val funcCls = TrampolineUtil.getSimpleName(function.getClass)
private[this] lazy val inputTypesString = children.map(_.dataType.catalogString).mkString(", ")
private[this] lazy val outputType = dataType.catalogString
protected lazy val inputTypesString = children.map(_.dataType.catalogString).mkString(", ")
protected lazy val outputType = dataType.catalogString

override def columnarEval(batch: ColumnarBatch): Any = {
val cols = children.safeMap(GpuExpressionsUtils.columnarEvalToColumn(_, batch))
Expand Down Expand Up @@ -74,3 +78,106 @@ object GpuUserDefinedFunction {
val udfTypeSig: TypeSig = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_64 + TypeSig.NULL +
TypeSig.BINARY + TypeSig.CALENDAR + TypeSig.ARRAY + TypeSig.MAP + TypeSig.STRUCT).nested()
}

/**
* Try to execute an UDF efficiently by
* 1 running the UDF on GPU if it is an instance of RapidsUDF. Otherwise,
* 2 pull back only the columns the UDF needs to host and do the processing on CPU.
*/
trait RapidsUserDefinedFunction extends GpuUserDefinedFunction with Logging {

/** Whether this UDF implements the RapidsUDF interface */
protected def rapidsFunc: Option[RapidsUDF]

/** The row based function of the UDF. */
protected def evaluateRow(childrenRow: InternalRow): Any

override final val function: RapidsUDF = rapidsFunc.orNull

override def columnarEval(batch: ColumnarBatch): Any = {
rapidsFunc.map { _ =>
// It is a RapidsUDF instance.
super.columnarEval(batch)
}.getOrElse {
logInfo(s"Begin to execute the UDF($name) row by row.")
jlowe marked this conversation as resolved.
Show resolved Hide resolved
// It is only a CPU based UDF
// These child columns will be closed by `ColumnarToRowIterator`.
val argCols = children.safeMap(GpuExpressionsUtils.columnarEvalToColumn(_, batch))
try {
// 1 Convert the argument columns to row.
// 2 Evaluate the CPU UDF row by row and cache the result.
// 3 Build a result column from the cache.
closeOnExcept(new RowDataBuffer(dataType, batch.numRows)) { buffer =>
new ColumnarToRowIterator(
Iterator.single(new ColumnarBatch(argCols.toArray, batch.numRows())),
NoopMetric,
NoopMetric,
NoopMetric,
NoopMetric).foreach { row =>
buffer.append(evaluateRow(row))
}
closeOnExcept(buffer.copyToDevice()) { resultCol =>
GpuColumnVector.from(resultCol, dataType)
}
}
} catch {
case e: Exception =>
throw new SparkException("Failed to execute user defined function: " +
s"($name: ($inputTypesString) => $outputType)", e)
}
}
} // end of `columnarEval`

}

/** The wrapper of a ColumnBuilder to support appending an `Any`. */
class RowDataBuffer(rowType: DataType, estimatedRows: Int) extends AutoCloseable {
private val builder =
new HostColumnVector.ColumnBuilder(GpuScalar.resolveElementType(rowType), estimatedRows)

/**
* Append a value of Catalyst type to this buffer.
* The value `row` should be compatible with the `rowType` used to create this buffer.
*/
def append(row: Any): this.type = {
revans2 marked this conversation as resolved.
Show resolved Hide resolved
if (row == null) {
builder.appendNull()
} else {
// A boxed version of primitive types will be unboxed, and it is safe for non-nulls here.
rowType match {
case NullType => builder.appendNull()
case ByteType => builder.append(row.asInstanceOf[Byte])
case ShortType => builder.append(row.asInstanceOf[Short])
case FloatType => builder.append(row.asInstanceOf[Float])
case DoubleType => builder.append(row.asInstanceOf[Double])
case BooleanType => builder.append(row.asInstanceOf[Boolean])
case IntegerType | DateType => builder.append(row.asInstanceOf[Int])
case LongType | TimestampType => builder.append(row.asInstanceOf[Long])
case StringType => builder.appendUTF8String(row.asInstanceOf[UTF8String].getBytes)
case dt: DecimalType =>
GpuScalar.convertDecimalTo(row.asInstanceOf[Decimal], dt) match {
case Left(dec32AsInt) => builder.append(dec32AsInt)
case Right(dec64AsLong) => builder.append(dec64AsLong)
}
case StructType(_) =>
val dataAsStruct = GpuScalar.convertElementTo(row, rowType)
builder.appendStructValues(dataAsStruct.asInstanceOf[HostColumnVector.StructData])
case ArrayType(_, _) | MapType(_, _, _) =>
val dataAsList = GpuScalar.convertElementTo(row, rowType)
builder.appendLists(dataAsList.asInstanceOf[java.util.List[_]])
case ot =>
throw new IllegalArgumentException(s"Failed to append the value ('$row'), " +
s"because of unsupported data type ($ot).")
}
}
this
}

/**
* Copy the data collected so far to device.
* The returned column should be closed after done.
*/
def copyToDevice(): ColumnVector = builder.buildAndPutOnDevice()

override def close(): Unit = builder.close()
}
13 changes: 13 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,17 @@ object RapidsConf {
.booleanConf
.createWithDefault(true)

// FIXME Should it be an internal conf? Since it is an experimental feature now.
revans2 marked this conversation as resolved.
Show resolved Hide resolved
// And eventually, it should be enabled or disabled depending on something
// that we don't know yet but would try to figure out.
val CPU_BASED_UDF_ENABLED = conf("spark.rapids.sql.cpuBasedUDF.enabled")
.doc("When set to true, supports to run a CPU based UDF efficiently by transferring " +
"only the data it needs between GPU and CPU inside a plan, instead of falling the whole " +
jlowe marked this conversation as resolved.
Show resolved Hide resolved
"plan back to CPU. This is an experimental feature, and may be changed in the future.")
.internal()
.booleanConf
.createWithDefault(false)

object ParquetReaderType extends Enumeration {
val AUTO, COALESCING, MULTITHREADED, PERFILE = Value
}
Expand Down Expand Up @@ -1715,6 +1726,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isRangeWindowLongEnabled: Boolean = get(ENABLE_RANGE_WINDOW_LONG)

lazy val isCpuBasedUDFEnabled: Boolean = get(CPU_BASED_UDF_ENABLED)

private val optimizerDefaults = Map(
// this is not accurate because CPU projections do have a cost due to appending values
// to each row that is produced, but this needs to be a really small number because
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ object GpuScalar extends Arm with Logging {
* 'validateLiteralValue' in Spark.
*/
/** Converts a decimal, `dec` should not be null. */
private def convertDecimalTo(dec: Decimal, dt: DecimalType): Either[Integer, JLong] = {
private[rapids] def convertDecimalTo(dec: Decimal, dt: DecimalType): Either[Integer, JLong] = {
if (dec.scale > dt.scale) {
throw new IllegalArgumentException(s"Unexpected decimals rounding.")
}
Expand All @@ -118,8 +118,8 @@ object GpuScalar extends Arm with Logging {
}
}

/** Converts an element for nested lists */
private def convertElementTo(element: Any, elementType: DataType): Any = elementType match {
/** Converts an element from Catalyst type to cuDF type */
private[rapids] def convertElementTo(element: Any, elemType: DataType): Any = elemType match {
case _ if element == null => null
case StringType => element.asInstanceOf[UTF8String].getBytes
case dt: DecimalType => convertDecimalTo(element.asInstanceOf[Decimal], dt) match {
Expand Down
Loading