diff --git a/integration_tests/src/main/python/asserts.py b/integration_tests/src/main/python/asserts.py index 1f64f8e9fce..f07c9cdf0a4 100644 --- a/integration_tests/src/main/python/asserts.py +++ b/integration_tests/src/main/python/asserts.py @@ -401,13 +401,16 @@ def assert_gpu_and_cpu_row_counts_equal(func, conf={}, is_cpu_first=True): """ _assert_gpu_and_cpu_are_equal(func, 'COUNT', conf=conf, is_cpu_first=is_cpu_first) -def assert_gpu_and_cpu_are_equal_sql(df_fun, table_name, sql, conf=None, debug=False, is_cpu_first=True): +def assert_gpu_and_cpu_are_equal_sql(df_fun, table_name, sql, conf=None, debug=False, is_cpu_first=True, validate_execs_in_gpu_plan=[]): """ Assert that the specified SQL query produces equal results on CPU and GPU. :param df_fun: a function that will create the dataframe :param table_name: Name of table to be created with the dataframe :param sql: SQL query to be run on the specified table :param conf: Any user-specified confs. Empty by default. + :param debug: Boolean to indicate if the SQL output should be printed + :param is_cpu_first: Boolean to indicate if the CPU should be run first or not + :param validate_execs_in_gpu_plan: String list of expressions to be validated in the GPU plan. :return: Assertion failure, if results from CPU and GPU do not match. """ if conf is None: @@ -415,6 +418,9 @@ def assert_gpu_and_cpu_are_equal_sql(df_fun, table_name, sql, conf=None, debug=F def do_it_all(spark): df = df_fun(spark) df.createOrReplaceTempView(table_name) + # we hold off on setting the validate execs until after creating the temp view + + spark.conf.set('spark.rapids.sql.test.validateExecsInGpuPlan', ','.join(validate_execs_in_gpu_plan)) if debug: return data_gen.debug_df(spark.sql(sql)) else: diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index f157d3117fa..5e80cfc5aef 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -296,6 +296,7 @@ def test_window_running_no_part(b_gen, batch_size): 'select ' + ', '.join(query_parts) + ' from window_agg_table ', + validate_execs_in_gpu_plan = ['GpuRunningWindowExec'], conf = conf) # This is for aggregations that work with a running window optimization. They don't need to be batched @@ -319,6 +320,7 @@ def test_window_running(b_gen, c_gen, batch_size): 'select ' + ', '.join(query_parts) + ' from window_agg_table ', + validate_execs_in_gpu_plan = ['GpuRunningWindowExec'], conf = conf) @ignore_order @@ -527,6 +529,8 @@ def test_window_aggs_for_rows_collect_list(): # SortExec does not support array type, so sort the result locally. @ignore_order(local=True) +# This test is more directed at Databricks and their running window optimization instead of ours +# this is why we do not validate that we inserted in a GpuRunningWindowExec, yet. def test_running_window_function_exec_for_all_aggs(): assert_gpu_and_cpu_are_equal_sql( lambda spark : gen_df(spark, _gen_data_for_collect_list), @@ -554,7 +558,6 @@ def test_running_window_function_exec_for_all_aggs(): from window_collect_table ''') - # Generates some repeated values to test the deduplication of GpuCollectSet. # And GpuCollectSet does not yet support struct type. _gen_data_for_collect_set = [ diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index b6681e9e23e..0aa5eff6b1b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -16,6 +16,7 @@ package com.nvidia.spark.rapids +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf.Scalar @@ -24,7 +25,7 @@ import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, CurrentRow, Expression, NamedExpression, RowFrame, SortOrder, UnboundedPreceding} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, AttributeSet, CurrentRow, Expression, NamedExpression, RowFrame, SortOrder, UnboundedPreceding} import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.window.WindowExec @@ -78,6 +79,9 @@ abstract class GpuBaseWindowExecMeta[WindowExecType <: SparkPlan] (windowExec: W val orderSpec: Seq[BaseExprMeta[SortOrder]] = getOrderSpecs.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + lazy val inputFields: Seq[BaseExprMeta[Attribute]] = + windowExec.children.head.output.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + override def tagPlanForGpu(): Unit = { // Implementation depends on receiving a `NamedExpression` wrapped WindowExpression. windowExpressions.map(meta => meta.wrapped) @@ -89,12 +93,35 @@ abstract class GpuBaseWindowExecMeta[WindowExecType <: SparkPlan] (windowExec: W override def convertToGpu(): GpuExec = { val resultColumnsOnly = getResultColumnsOnly - val gpuWindowExpressions = windowExpressions.map(_.convertToGpu()) + val gpuWindowExpressions = if (resultColumnsOnly) { + windowExpressions.map(_.convertToGpu().asInstanceOf[NamedExpression]) + } else { + (inputFields ++ windowExpressions).map(_.convertToGpu().asInstanceOf[NamedExpression]) + } + + val (pre, windowOps, post) = GpuWindowExec.splitAndDedup(gpuWindowExpressions) + // Order is not important for pre. It is unbound and we are inserting it in. + val isPreNeeded = + (AttributeSet(pre.map(_.toAttribute)) -- windowExec.children.head.output).nonEmpty + // To check if post is needed we first have to remove a layer of indirection that + // might not be needed. Here we want to maintain order, just to match Spark as closely + // as possible + val remappedWindowOps = GpuWindowExec.remapAttributes(windowOps, post) + val isPostNeeded = remappedWindowOps.length != post.length || + remappedWindowOps.zip(post).exists { + case (w, p) => w.exprId != p.exprId + } + val fixedUpWindowOps = if(isPostNeeded) { + windowOps + } else { + remappedWindowOps + } + // When we support multiple ways to avoid batching the input data like with // https://github.com/NVIDIA/spark-rapids/issues/1860 we should check if all of // the operations fit into one of the supported groups and then split them up into // multiple execs if they do, so that we can avoid batching on all of them. - val allBatchedRunning = gpuWindowExpressions.forall { + val allBatchedRunning = fixedUpWindowOps.forall { case GpuAlias(GpuWindowExpression(func, spec), _) => val isRunningFunc = func match { case _: GpuBatchedRunningWindowFunction[_] => true @@ -104,31 +131,40 @@ abstract class GpuBaseWindowExecMeta[WindowExecType <: SparkPlan] (windowExec: W // Running windows are limited to row based queries with a few changes we could make this // work for range based queries too https://github.com/NVIDIA/spark-rapids/issues/2708 isRunningFunc && GpuWindowExec.isRunningWindow(spec) - case GpuAlias(_ :AttributeReference, _) => - // If there are result columns only, then we are going to allow a few things through - // but in practice this could be anything and we need to walk through the expression - // tree and split it into expressions before the window operation, the window operation, - // and things after the window operation. - // https://github.com/NVIDIA/spark-rapids/issues/2688 - resultColumnsOnly - case _ => false + case GpuAlias(_: AttributeReference, _) | _: AttributeReference => + // We allow pure result columns for running windows + true + case other => + // This should only happen if we did something wrong in splitting/deduping + // the window expressions. + throw new IllegalArgumentException( + s"Found unexpected expression $other in window exec ${other.getClass}") + } + + val input = if (isPreNeeded) { + GpuProjectExec(pre.toList, childPlans.head.convertIfNeeded()) + } else { + childPlans.head.convertIfNeeded() } - if (allBatchedRunning) { + val windowExpr = if (allBatchedRunning) { GpuRunningWindowExec( - gpuWindowExpressions, + fixedUpWindowOps, partitionSpec.map(_.convertToGpu()), orderSpec.map(_.convertToGpu().asInstanceOf[SortOrder]), - childPlans.head.convertIfNeeded(), - resultColumnsOnly) + input) } else { GpuWindowExec( - gpuWindowExpressions, + fixedUpWindowOps, partitionSpec.map(_.convertToGpu()), orderSpec.map(_.convertToGpu().asInstanceOf[SortOrder]), - childPlans.head.convertIfNeeded(), - resultColumnsOnly - ) + input) + } + + if (isPostNeeded) { + GpuProjectExec(post.toList, windowExpr) + } else { + windowExpr } } } @@ -181,6 +217,137 @@ class GpuWindowExecMeta(windowExec: WindowExec, } object GpuWindowExec extends Arm { + /** + * As a part of `splitAndDedup` the dedup part adds a layer of indirection. This attempts to + * remove that layer of indirection. + * @param windowOps the windowOps output of splitAndDedup + * @param post the post output of splitAndDedup + * @return a version of windowOps that has removed as many un-needed temp aliases as possible. + */ + def remapAttributes(windowOps: Seq[NamedExpression], + post: Seq[NamedExpression]): Seq[NamedExpression] = { + val postRemapping = post.flatMap { + case a @ GpuAlias(attr: AttributeReference, _) => Some((attr.exprId, a)) + case _ => None + }.groupBy(_._1) + windowOps.map { + case a @ GpuAlias(child, _) + // We can only replace the mapping if there is one thing to map it to. + if postRemapping.get(a.exprId).exists(_.length == 1) => + val attr = postRemapping(a.exprId).head._2 + GpuAlias(child, attr.name)(attr.exprId, attr.qualifier) + case other => other + } + } + + private def hasGpuWindowFunction(expr: Expression): Boolean = + expr.find(_.isInstanceOf[GpuWindowExpression]).isDefined + + private def extractAndSave(expr: Expression, + saved: ArrayBuffer[NamedExpression], + deduped: mutable.HashMap[Expression, Attribute]): Expression = + expr match { + // Don't rename an already named expression + case ne: NamedExpression => + if (!saved.exists(_.exprId == ne.exprId)) { + saved += ne + } + ne.toAttribute + case e: Expression if e.foldable => + e // No need to create an attribute reference if it will be evaluated as a Literal. + case e: Expression => + // For other expressions, we extract it and replace it with an AttributeReference (with + // an internal column name, e.g. "_gpu_w0"). Deduping it as we go. + deduped.getOrElseUpdate(e, { + val withName = GpuAlias(e, s"_gpu_w${saved.length}")() + saved += withName + withName.toAttribute + }) + } + + /** + * In some distributions expressions passed into WindowExec can have more operations + * in them than just a WindowExpression wrapped in an GpuAlias. This is a problem if we + * want to try and do multiple window operations in a single pass to speed things up + * or if we need to add new transitive window functions when we are doing some memory + * optimizations, like running window. This will split the input expressions + * into three sets of expressions. The first set is a project with no window expressions in it at + * all. The second takes the first as input and will only have aliases to columns in the first or + * named expressions wrapping a single window function in it. The third uses the second as + * input and will do any final steps to combine window functions together. + * + * For example `SUM(a) - SUM(b + c) over (PARTITION BY d ORDER BY e) as result` would be + * transformed into + *
+ * Phase 1 (Pre project): + * a, b + c as _tmp0, d, e + * + * Phase 2 (Window Operations): + * SUM(a) over (PARTITION BY d ORDER BY e) as _tmp1, + * SUM(_tmp0) over (PARTITION BY d ORDER BY e) as _tmp2 + * + * Phase 3 (Post Project): + * (_tmp1 - _tmp2) as result + *+ * + * This assumes that there is not a window function of another window function, like + * `LAG(SUM(a), 2)` which appears to be something all distros split apart into separate + * window operations, so we are good. + * @param exprs the input expressions to a GpuWindowExec + */ + def splitAndDedup(exprs: Seq[NamedExpression]): + (Seq[NamedExpression], Seq[NamedExpression], Seq[NamedExpression]) = { + // This is based off of similar code in Apache Spark's `ExtractWindowExpressions.extract` but + // has been highly modified + val preProject = ArrayBuffer[NamedExpression]() + val preDedupe = mutable.HashMap[Expression, Attribute]() + val windowOps = ArrayBuffer[NamedExpression]() + val windowDedupe = mutable.HashMap[Expression, Attribute]() + val postProject = ArrayBuffer[NamedExpression]() + + val shims = ShimLoader.getSparkShims + + exprs.foreach { expr => + if (hasGpuWindowFunction(expr)) { + // First pass looks for GpuWindowFunctions and GpuWindowSpecDefinitions to build up + // the preProject phase + val firstPass = expr.transformDown { + case wf: GpuWindowFunction => + // All window functions, including those that are also aggregation functions, are + // wrapped in a GpuWindowExpression, so dedup and save their children into the pre + // stage, replacing them with aliases. + val newChildren = wf.children.map(extractAndSave(_, preProject, preDedupe)) + wf.withNewChildren(newChildren) + case wsc @ GpuWindowSpecDefinition(partitionSpec, orderSpec, _) => + // Extracts expressions from the partition spec and order spec to be sure that they + // show up in the pre stage. Because map is lazy we are going to force it to be + // materialized, by forcing it to go through an array that cannot be lazily created + val newPartitionSpec = partitionSpec.map( + extractAndSave(_, preProject, preDedupe)).toArray.toSeq + val newOrderSpec = orderSpec.map { so => + val newChild = extractAndSave(so.child, preProject, preDedupe) + shims.sortOrder(newChild, so.direction, so.nullOrdering) + }.toArray.toSeq + wsc.copy(partitionSpec = newPartitionSpec, orderSpec = newOrderSpec) + } + val secondPass = firstPass.transformDown { + case we: GpuWindowExpression => + // A window Expression holds a window function or an aggregate function, so put it into + // the windowOps phase, and create a new alias for it for the post phase + extractAndSave(we, windowOps, windowDedupe) + }.asInstanceOf[NamedExpression] + + postProject += secondPass + } else { + // There is no window function so pass the result through all of the phases (with deduping) + postProject += extractAndSave( + extractAndSave(expr, preProject, preDedupe), windowOps, windowDedupe) + .asInstanceOf[NamedExpression] + } + } + (preProject, windowOps, postProject) + } + def isRunningWindow(spec: GpuWindowSpecDefinition): Boolean = spec match { case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame(RowFrame, GpuSpecialFrameBoundary(UnboundedPreceding), GpuSpecialFrameBoundary(CurrentRow))) => true @@ -202,11 +369,11 @@ object GpuWindowExec extends Arm { def computeRunningNoPartitioning( iter: Iterator[ColumnarBatch], - boundProjectList: Seq[GpuExpression], + boundWindowOps: Seq[GpuExpression], numOutputBatches: GpuMetric, numOutputRows: GpuMetric, opTime: GpuMetric): Iterator[ColumnarBatch] = { - val fixers = fixerIndexMap(boundProjectList) + val fixers = fixerIndexMap(boundWindowOps) TaskContext.get().addTaskCompletionListener[Unit](_ => fixers.values.foreach(_.close())) iter.flatMap { cb => @@ -215,9 +382,9 @@ object GpuWindowExec extends Arm { numOutputRows += numRows withResource(new MetricRange(opTime)) { _ => if (numRows > 0) { - withResource(GpuProjectExec.projectAndClose(cb, boundProjectList, NoopMetric)) { full => + withResource(GpuProjectExec.projectAndClose(cb, boundWindowOps, NoopMetric)) { full => closeOnExcept(ArrayBuffer[ColumnVector]()) { newColumns => - boundProjectList.indices.foreach { idx => + boundWindowOps.indices.foreach { idx => val column = full.column(idx).asInstanceOf[GpuColumnVector] fixers.get(idx) match { case Some(fixer) => @@ -268,13 +435,13 @@ object GpuWindowExec extends Arm { def computeRunning( iter: Iterator[ColumnarBatch], - boundProjectList: Seq[GpuExpression], + boundWindowOps: Seq[GpuExpression], boundPartitionSpec: Seq[Expression], numOutputBatches: GpuMetric, numOutputRows: GpuMetric, opTime: GpuMetric): Iterator[ColumnarBatch] = { var lastParts: Array[Scalar] = Array.empty - val fixers = fixerIndexMap(boundProjectList) + val fixers = fixerIndexMap(boundWindowOps) def saveLastParts(newLastParts: Array[Scalar]): Unit = { lastParts.foreach(_.close()) @@ -293,12 +460,13 @@ object GpuWindowExec extends Arm { numOutputBatches += 1 numOutputRows += numRows withResource(new MetricRange(opTime)) { _ => - val fullProjectList = boundProjectList ++ boundPartitionSpec - withResource(GpuProjectExec.projectAndClose(cb, fullProjectList, NoopMetric)) { full => + val fullWindowProjectList = boundWindowOps ++ boundPartitionSpec + withResource( + GpuProjectExec.projectAndClose(cb, fullWindowProjectList, NoopMetric)) { full => // part columns are owned by full and do not need to be closed, but should not be used // if full is closed val partColumns = boundPartitionSpec.indices.map { idx => - full.column(idx + boundProjectList.length).asInstanceOf[GpuColumnVector].getBase + full.column(idx + boundWindowOps.length).asInstanceOf[GpuColumnVector].getBase } // We need to fix up the rows that are part of the same batch as the end of the @@ -306,7 +474,7 @@ object GpuWindowExec extends Arm { val partsEqual = arePartsEqual(lastParts, partColumns) try { closeOnExcept(ArrayBuffer[ColumnVector]()) { newColumns => - boundProjectList.indices.foreach { idx => + boundWindowOps.indices.foreach { idx => val column = full.column(idx).asInstanceOf[GpuColumnVector] val fixer = fixers.get(idx) if (fixer.isDefined) { @@ -336,10 +504,9 @@ object GpuWindowExec extends Arm { } trait GpuWindowBaseExec extends UnaryExecNode with GpuExec { - val resultColumnsOnly: Boolean - val windowExpressionAliases: Seq[Expression] + val windowOps: Seq[NamedExpression] val partitionSpec: Seq[Expression] - val orderSpec: Seq[SortOrder] + val orderSpec: Seq[SortOrder] import GpuMetric._ @@ -347,11 +514,7 @@ trait GpuWindowBaseExec extends UnaryExecNode with GpuExec { OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, OP_TIME) ) - override def output: Seq[Attribute] = if (resultColumnsOnly) { - windowExpressionAliases.map(_.asInstanceOf[NamedExpression].toAttribute) - } else { - child.output ++ windowExpressionAliases.map(_.asInstanceOf[NamedExpression].toAttribute) - } + override def output: Seq[Attribute] = windowOps.map(_.toAttribute) override def requiredChildDistribution: Seq[Distribution] = { if (partitionSpec.isEmpty) { @@ -379,11 +542,10 @@ trait GpuWindowBaseExec extends UnaryExecNode with GpuExec { } case class GpuRunningWindowExec( - windowExpressionAliases: Seq[Expression], + windowOps: Seq[NamedExpression], partitionSpec: Seq[Expression], orderSpec: Seq[SortOrder], - child: SparkPlan, - resultColumnsOnly: Boolean + child: SparkPlan ) extends GpuWindowBaseExec { override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { @@ -391,14 +553,8 @@ case class GpuRunningWindowExec( val numOutputRows = gpuLongMetric(GpuMetric.NUM_OUTPUT_ROWS) val opTime = gpuLongMetric(GpuMetric.OP_TIME) - val projectList = if (resultColumnsOnly) { - windowExpressionAliases - } else { - child.output ++ windowExpressionAliases - } - - val boundProjectList = - GpuBindReferences.bindGpuReferences(projectList, child.output) + val boundWindowOps = + GpuBindReferences.bindGpuReferences(windowOps, child.output) val boundPartitionSpec = GpuBindReferences.bindGpuReferences(partitionSpec, child.output) @@ -406,25 +562,23 @@ case class GpuRunningWindowExec( if (partitionSpec.isEmpty) { child.executeColumnar().mapPartitions { iter => GpuWindowExec.computeRunningNoPartitioning(iter, - boundProjectList, - numOutputBatches, numOutputRows, opTime) + boundWindowOps, numOutputBatches, numOutputRows, opTime) } } else { child.executeColumnar().mapPartitions { iter => GpuWindowExec.computeRunning(iter, - boundProjectList, boundPartitionSpec, - numOutputBatches, numOutputRows, opTime) + boundWindowOps, boundPartitionSpec, numOutputBatches, + numOutputRows, opTime) } } } } case class GpuWindowExec( - windowExpressionAliases: Seq[Expression], + windowOps: Seq[NamedExpression], partitionSpec: Seq[Expression], orderSpec: Seq[SortOrder], - child: SparkPlan, - resultColumnsOnly: Boolean + child: SparkPlan ) extends GpuWindowBaseExec { override def childrenCoalesceGoal: Seq[CoalesceGoal] = Seq(outputBatching) @@ -440,19 +594,13 @@ case class GpuWindowExec( val numOutputRows = gpuLongMetric(GpuMetric.NUM_OUTPUT_ROWS) val opTime = gpuLongMetric(GpuMetric.OP_TIME) - val projectList = if (resultColumnsOnly) { - windowExpressionAliases - } else { - child.output ++ windowExpressionAliases - } - - val boundProjectList = - GpuBindReferences.bindGpuReferences(projectList, child.output) + val boundWindowOps = + GpuBindReferences.bindGpuReferences(windowOps, child.output) child.executeColumnar().map { cb => numOutputBatches += 1 numOutputRows += cb.numRows - GpuProjectExec.projectAndClose(cb, boundProjectList, opTime) + GpuProjectExec.projectAndClose(cb, boundWindowOps, opTime) } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index ff6b641e097..439804c4e88 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -834,6 +834,10 @@ case class GpuSpecialFrameBoundary(boundary : SpecialFrameBoundary) } } +// This is here for now just to tag an expression as being a GpuWindowFunction and match +// Spark. This may expand in the future if other types of window functions show up. +trait GpuWindowFunction extends GpuUnevaluable + /** * GPU Counterpart of `AggregateWindowFunction`. * On the CPU this would extend `DeclarativeAggregate` and use the provided methods @@ -842,7 +846,7 @@ case class GpuSpecialFrameBoundary(boundary : SpecialFrameBoundary) * expressions. */ trait GpuAggregateWindowFunction[T <: Aggregation with RollingAggregation[T]] - extends GpuUnevaluable { + extends GpuWindowFunction { /** * Using child references, define the shape of the vectors sent to the window operations */