Skip to content

Commit

Permalink
Moved commonality to base class for Gpu window execs.
Browse files Browse the repository at this point in the history
  • Loading branch information
mythrocks committed Jun 8, 2021
1 parent f4968cc commit 86b995a
Showing 1 changed file with 78 additions and 30 deletions.
108 changes: 78 additions & 30 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,82 @@ import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.vectorized.ColumnarBatch

class GpuWindowExecMeta(windowExec: WindowExec,
/**
* Base class for GPU Execs that implement window functions. This abstracts the method
* by which the window function's input expressions, partition specs, order-by specs, etc.
* are extracted from the specific WindowExecType.
*
* @tparam WindowExecType The Exec class that implements window functions
* (E.g. o.a.s.sql.execution.window.WindowExec.)
*/
abstract class GpuBaseWindowExecMeta[WindowExecType <: SparkPlan] (windowExec: WindowExecType,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule)
extends SparkPlanMeta[WindowExec](windowExec, conf, parent, rule) {
extends SparkPlanMeta[WindowExecType](windowExec, conf, parent, rule) {

/**
* Extracts window-expression from WindowExecType.
* The implementation varies, depending on the WindowExecType class.
*/
def getInputWindowExpressions: Seq[NamedExpression]

/**
* Extracts partition-spec from WindowExecType.
* The implementation varies, depending on the WindowExecType class.
*/
def getPartitionSpecs: Seq[Expression]

/**
* Extracts order-by spec from WindowExecType.
* The implementation varies, depending on the WindowExecType class.
*/
def getOrderSpecs: Seq[SortOrder]

/**
* Indicates the output column semantics for the WindowExecType,
* i.e. whether to only return the window-expression result columns (as in some Spark
* distributions) or also include the input columns (as in Apache Spark).
*/
def getResultColumnsOnly: Boolean

val windowExpressions: Seq[BaseExprMeta[NamedExpression]] =
getInputWindowExpressions.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
val partitionSpec: Seq[BaseExprMeta[Expression]] =
getPartitionSpecs.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
val orderSpec: Seq[BaseExprMeta[SortOrder]] =
getOrderSpecs.map(GpuOverrides.wrapExpr(_, conf, Some(this)))

override def tagPlanForGpu(): Unit = {
// Implementation depends on receiving a `NamedExpression` wrapped WindowExpression.
windowExpressions.map(meta => meta.wrapped)
.filter(expr => !expr.isInstanceOf[NamedExpression])
.foreach(_ => willNotWorkOnGpu(because = "Unexpected query plan with Windowing functions; " +
"cannot convert for GPU execution. " +
"(Detail: WindowExpression not wrapped in `NamedExpression`.)"))
}

override def convertToGpu(): GpuExec = {
GpuWindowExec(
windowExpressions.map(_.convertToGpu()),
partitionSpec.map(_.convertToGpu()),
orderSpec.map(_.convertToGpu().asInstanceOf[SortOrder]),
childPlans.head.convertIfNeeded(),
getResultColumnsOnly
)
}
}

/**
* Specialization of GpuBaseWindowExecMeta for org.apache.spark.sql.window.WindowExec.
* This class implements methods to extract the window-expressions, partition columns,
* order-by columns, etc. from WindowExec.
*/
class GpuWindowExecMeta(windowExec: WindowExec,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule)
extends GpuBaseWindowExecMeta[WindowExec](windowExec, conf, parent, rule) {

/**
* Fetches WindowExpressions in input `windowExec`, via reflection.
Expand Down Expand Up @@ -58,35 +129,12 @@ class GpuWindowExecMeta(windowExec: WindowExec,
(expr, resultColumnsOnly)
}

private val (inputWindowExpressions, resultColumnsOnly) = getWindowExpression

val windowExpressions: Seq[BaseExprMeta[NamedExpression]] =
inputWindowExpressions.map(GpuOverrides.wrapExpr(_, conf, Some(this)))

val partitionSpec: Seq[BaseExprMeta[Expression]] =
windowExec.partitionSpec.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
val orderSpec: Seq[BaseExprMeta[SortOrder]] =
windowExec.orderSpec.map(GpuOverrides.wrapExpr(_, conf, Some(this)))

override def tagPlanForGpu(): Unit = {
// Implementation depends on receiving a `NamedExpression` wrapped WindowExpression.
windowExpressions.map(meta => meta.wrapped)
.filter(expr => !expr.isInstanceOf[NamedExpression])
.foreach(_ => willNotWorkOnGpu(because = "Unexpected query plan with Windowing functions; " +
"cannot convert for GPU execution. " +
"(Detail: WindowExpression not wrapped in `NamedExpression`.)"))

}
private lazy val (inputWindowExpressions, resultColumnsOnly) = getWindowExpression

override def convertToGpu(): GpuExec = {
GpuWindowExec(
windowExpressions.map(_.convertToGpu()),
partitionSpec.map(_.convertToGpu()),
orderSpec.map(_.convertToGpu().asInstanceOf[SortOrder]),
childPlans.head.convertIfNeeded(),
resultColumnsOnly
)
}
override def getInputWindowExpressions: Seq[NamedExpression] = inputWindowExpressions
override def getPartitionSpecs: Seq[Expression] = windowExec.partitionSpec
override def getOrderSpecs: Seq[SortOrder] = windowExec.orderSpec
override def getResultColumnsOnly: Boolean = resultColumnsOnly
}

case class GpuWindowExec(
Expand Down

0 comments on commit 86b995a

Please sign in to comment.