diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index ba220eb3417..6e0c70c7b44 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -198,8 +198,9 @@ def test_window_aggs_for_ranges_numeric_long_overflow(data_gen): ' range between 9223372036854775807 preceding and 9223372036854775807 following) as sum_c_asc, ' 'from window_agg_table') - -@ignore_order +# In a distributed setup the order of the partitions returend might be different, so we must ignore the order +# but small batch sizes can make sort very slow, so do the final order by locally +@ignore_order(local=True) @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches @pytest.mark.parametrize('data_gen', [ _grpkey_byte_with_nulls, @@ -246,7 +247,9 @@ def test_window_aggs_for_range_numeric_date(data_gen, batch_size): 'from window_agg_table ', conf = conf) -@ignore_order +# In a distributed setup the order of the partitions returend might be different, so we must ignore the order +# but small batch sizes can make sort very slow, so do the final order by locally +@ignore_order(local=True) @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches @pytest.mark.parametrize('data_gen', [_grpkey_longs_with_no_nulls, _grpkey_longs_with_nulls, @@ -281,10 +284,8 @@ def test_window_aggs_for_rows(data_gen, batch_size): # This is for aggregations that work with a running window optimization. They don't need to be batched -# specially, but it only works if all of the aggregations can support this. Right now this is just -# row number, but will expand to others in the future (rank and dense_rank). -@ignore_order -@approximate_float +# specially, but it only works if all of the aggregations can support this. +# the order returned should be consistent because the data ends up in a single task (no partitioning) @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches @pytest.mark.parametrize('b_gen', all_basic_gens_no_nans + [decimal_gen_scale_precision], ids=meta_idfn('data:')) def test_window_running_no_part(b_gen, batch_size): @@ -295,6 +296,9 @@ def test_window_running_no_part(b_gen, batch_size): 'count(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as count_col', 'min(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as min_col', 'max(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as max_col'] + if isinstance(b_gen.data_type, NumericType) and not isinstance(b_gen, FloatGen) and not isinstance(b_gen, DoubleGen): + query_parts.append('sum(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as sum_col') + assert_gpu_and_cpu_are_equal_sql( lambda spark : two_col_df(spark, LongRangeGen(), b_gen, length=1024 * 14), "window_agg_table", @@ -304,21 +308,55 @@ def test_window_running_no_part(b_gen, batch_size): validate_execs_in_gpu_plan = ['GpuRunningWindowExec'], conf = conf) +# Test that we can do a running window sum on floats and doubles. This becomes problematic because we do the agg in parallel +# which means that the result can switch back and forth from Inf to not Inf depending on the order of aggregations. +# We test this by limiting the range of the values in the sum to never hit Inf, and by using abs so we don't have +# positive and negative values that interfere with each other. +# the order returned should be consistent because the data ends up in a single task (no partitioning) +@approximate_float +@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches +def test_running_float_sum_no_part(batch_size): + conf = {'spark.rapids.sql.batchSizeBytes': batch_size, + 'spark.rapids.sql.variableFloatAgg.enabled': True, + 'spark.rapids.sql.castFloatToDecimal.enabled': True} + query_parts = ['a', + 'sum(cast(b as double)) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as shrt_dbl_sum', + 'sum(abs(dbl)) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dbl_sum', + 'sum(cast(b as float)) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as shrt_flt_sum', + 'sum(abs(flt)) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as flt_sum'] + + gen = StructGen([('a', LongRangeGen()),('b', short_gen),('flt', float_gen),('dbl', double_gen)], nullable=False) + assert_gpu_and_cpu_are_equal_sql( + lambda spark : gen_df(spark, gen, length=1024 * 14), + "window_agg_table", + 'select ' + + ', '.join(query_parts) + + ' from window_agg_table ', + validate_execs_in_gpu_plan = ['GpuRunningWindowExec'], + conf = conf) + # This is for aggregations that work with a running window optimization. They don't need to be batched -# specially, but it only works if all of the aggregations can support this. Right now this is just -# row number, but will expand to others in the future (rank and dense_rank). -@ignore_order +# specially, but it only works if all of the aggregations can support this. +# In a distributed setup the order of the partitions returned might be different, so we must ignore the order +# but small batch sizes can make sort very slow, so do the final order by locally +@ignore_order(local=True) @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches @pytest.mark.parametrize('b_gen, c_gen', [(long_gen, x) for x in running_part_and_order_gens] + [(x, long_gen) for x in all_basic_gens_no_nans + [decimal_gen_scale_precision]], ids=idfn) def test_window_running(b_gen, c_gen, batch_size): conf = {'spark.rapids.sql.batchSizeBytes': batch_size, 'spark.rapids.sql.hasNans': False, + 'spark.rapids.sql.variableFloatAgg.enabled': True, 'spark.rapids.sql.castFloatToDecimal.enabled': True} query_parts = ['row_number() over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as row_num', 'count(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as count_col', 'min(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as min_col', 'max(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as max_col'] + + # Decimal precision can grow too large. Float and Double can get odd results for Inf/-Inf because of ordering + if isinstance(c_gen.data_type, NumericType) and (not isinstance(c_gen, FloatGen)) and (not isinstance(c_gen, DoubleGen)) and (not isinstance(c_gen, DecimalGen)): + query_parts.append('sum(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as sum_col') + assert_gpu_and_cpu_are_equal_sql( lambda spark : three_col_df(spark, LongRangeGen(), RepeatSeqGen(b_gen, length=100), c_gen, length=1024 * 14), "window_agg_table", @@ -328,8 +366,40 @@ def test_window_running(b_gen, c_gen, batch_size): validate_execs_in_gpu_plan = ['GpuRunningWindowExec'], conf = conf) +# Test that we can do a running window sum on floats and doubles and decimal. This becomes problematic because we do the agg in parallel +# which means that the result can switch back and forth from Inf to not Inf depending on the order of aggregations. +# We test this by limiting the range of the values in the sum to never hit Inf, and by using abs so we don't have +# positive and negative values that interfere with each other. +# decimal is problematic if the precision is so high it falls back to the CPU. +# In a distributed setup the order of the partitions returned might be different, so we must ignore the order +# but small batch sizes can make sort very slow, so do the final order by locally +@ignore_order(local=True) +@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches +def test_window_running_float_decimal_sum(batch_size): + conf = {'spark.rapids.sql.batchSizeBytes': batch_size, + 'spark.rapids.sql.variableFloatAgg.enabled': True, + 'spark.rapids.sql.castFloatToDecimal.enabled': True} + # TODO need a way to insert NaNs... + query_parts = ['b', 'a', + 'sum(cast(c as double)) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dbl_sum', + 'sum(abs(dbl)) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dbl_sum', + 'sum(cast(c as float)) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as flt_sum', + 'sum(abs(flt)) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as flt_sum', + 'sum(cast(c as Decimal(6,1))) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dec_sum'] + + gen = StructGen([('a', LongRangeGen()),('b', RepeatSeqGen(int_gen, length=1000)),('c', short_gen),('flt', float_gen),('dbl', double_gen)], nullable=False) + assert_gpu_and_cpu_are_equal_sql( + lambda spark : gen_df(spark, gen, length=1024 * 14), + "window_agg_table", + 'select ' + + ', '.join(query_parts) + + ' from window_agg_table ', + validate_execs_in_gpu_plan = ['GpuRunningWindowExec'], + conf = conf) -@ignore_order +# In a distributed setup the order of the partitions returned might be different, so we must ignore the order +# but small batch sizes can make sort very slow, so do the final order by locally +@ignore_order(local=True) @approximate_float @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches @pytest.mark.parametrize('c_gen', lead_lag_data_gens, ids=idfn) @@ -454,7 +524,9 @@ def test_window_aggs_for_rows_lead_lag_on_arrays(a_gen, b_gen, c_gen, d_gen, bat # lead and lag don't currently work for string columns, so redo the tests, but just for strings # without lead and lag -@ignore_order +# In a distributed setup the order of the partitions returned might be different, so we must ignore the order +# but small batch sizes can make sort very slow, so do the final order by locally +@ignore_order(local=True) @approximate_float @pytest.mark.parametrize('c_gen', [string_gen], ids=idfn) @pytest.mark.parametrize('a_b_gen', part_and_order_gens, ids=meta_idfn('partAndOrderBy:')) @@ -483,7 +555,9 @@ def do_it(spark): # Test for RANGE queries, with timestamp order-by expressions. -@ignore_order +# In a distributed setup the order of the partitions returned might be different, so we must ignore the order +# but small batch sizes can make sort very slow, so do the final order by locally +@ignore_order(local=True) @pytest.mark.parametrize('data_gen', [_grpkey_longs_with_timestamps, pytest.param(_grpkey_longs_with_nullable_timestamps)], ids=idfn) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index a875696e7d1..7b94a37b231 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import ai.rapids.cudf.{AggregationOverWindow, DType, NvtxColor, Scalar, Table, WindowOptions} +import ai.rapids.cudf.{Aggregation, AggregationOnColumn, AggregationOverWindow, DType, GroupByOptions, NullPolicy, NvtxColor, ReplacePolicy, ReplacePolicyWithColumn, Scalar, ScanType, Table, WindowOptions} import org.apache.spark.TaskContext import org.apache.spark.internal.Logging @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistrib import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.rapids.GpuAggregateExpression -import org.apache.spark.sql.types.{BooleanType, ByteType, CalendarIntervalType, DataType, IntegerType, LongType, ShortType} +import org.apache.spark.sql.types.{ArrayType, BooleanType, ByteType, CalendarIntervalType, DataType, IntegerType, LongType, ShortType, StructType} import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} import org.apache.spark.unsafe.types.CalendarInterval @@ -127,8 +127,8 @@ abstract class GpuBaseWindowExecMeta[WindowExecType <: SparkPlan] (windowExec: W val allBatchedRunning = fixedUpWindowOps.forall { case GpuAlias(GpuWindowExpression(func, spec), _) => val isRunningFunc = func match { - case _: GpuBatchedRunningWindowFunction[_] => true - case GpuAggregateExpression(_: GpuBatchedRunningWindowFunction[_], _, _, _ , _) => true + case _: GpuBatchedRunningWindowWithFixer => true + case GpuAggregateExpression(_: GpuBatchedRunningWindowWithFixer, _, _, _ , _) => true case _ => false } // Running windows are limited to row based queries with a few changes we could make this @@ -366,7 +366,7 @@ trait GpuWindowBaseExec extends UnaryExecNode with GpuExec { import GpuMetric._ override lazy val additionalMetrics: Map[String, GpuMetric] = Map( - OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, OP_TIME) + OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME) ) override def output: Seq[Attribute] = windowOps.map(_.toAttribute) @@ -400,7 +400,34 @@ trait GpuWindowBaseExec extends UnaryExecNode with GpuExec { * The class represents a window function and the locations of its deduped inputs after an initial * projection. */ -case class BoundGpuWindowFunction(windowFunc: GpuWindowFunction, boundInputLocations: Array[Int]) { +case class BoundGpuWindowFunction( + windowFunc: GpuWindowFunction, + boundInputLocations: Array[Int]) extends Arm { + + def scanAggregation: Aggregation = { + val aggFunc = windowFunc.asInstanceOf[GpuRunningWindowFunction] + aggFunc.scanAggregation + } + + def scanReplaceNulls: Option[ReplacePolicy] = { + val aggFunc = windowFunc.asInstanceOf[GpuRunningWindowFunction] + aggFunc.scanReplaceNulls + } + + def groupByScan(cb: ColumnarBatch): AggregationOnColumn[Nothing] = { + val aggFunc = windowFunc.asInstanceOf[GpuRunningWindowFunction] + val inputs = boundInputLocations.map { pos => + (cb.column(pos).asInstanceOf[GpuColumnVector].getBase, pos) + } + aggFunc.groupByScanAggregation(inputs).asInstanceOf[AggregationOnColumn[Nothing]] + } + + def groupByReplaceNulls(index: Int): Option[ReplacePolicyWithColumn] = { + val aggFunc = windowFunc.asInstanceOf[GpuRunningWindowFunction] + aggFunc.groupByReplaceNulls(index) + } + + def aggOverWindow(cb: ColumnarBatch, windowOpts: WindowOptions): AggregationOverWindow[Nothing] = { val aggFunc = windowFunc.asInstanceOf[GpuAggregateWindowFunction[_]] @@ -416,6 +443,22 @@ case class BoundGpuWindowFunction(windowFunc: GpuWindowFunction, boundInputLocat case class ParsedBoundary(isUnbounded: Boolean, valueAsLong: Long) object GroupedAggregations extends Arm { + // In some cases a scan or a group by scan produces a different type than window would for the + // same aggregation. A lot of this is because scan has a limited set of aggregations so we can + // end up using a SUM aggregation to work around other issues, and cudf rightly makes the output + // an INT64 instead of an INT32. This is here to fix that up. + private def castIfNeeded( + col: ai.rapids.cudf.ColumnVector, + dataType: DataType): GpuColumnVector = { + dataType match { + case _: ArrayType | _: StructType => + GpuColumnVector.from(col, dataType).incRefCount() + case other => + val dtype = GpuColumnVector.getNonNestedRapidsType(other) + GpuColumnVector.from(col.castTo(dtype), dataType) + } + } + /** * Get the window options for an aggregation * @param orderSpec the order by spec @@ -569,6 +612,10 @@ object GroupedAggregations extends Arm { } } +/** + * Window aggregations that are grouped together. It holds the aggregation and the offsets of + * its input columns, along with the output columns it should write the result to. + */ class GroupedAggregations extends Arm { import GroupedAggregations._ @@ -576,9 +623,25 @@ class GroupedAggregations extends Arm { private val data = mutable.HashMap[GpuSpecifiedWindowFrame, mutable.HashMap[BoundGpuWindowFunction, ArrayBuffer[Int]]]() + // This is similar to data but specific to running windows. We don't divide it up by the + // window frame because the frame is the same for all of them unbounded rows preceding to + // the current row. + private val runningWindowOptimizedData = + mutable.HashMap[BoundGpuWindowFunction, ArrayBuffer[Int]]() + + /** + * Add an aggregation. + * @param win the window this aggregation is over. + * @param inputLocs the locations of the input columns for this aggregation. + * @param outputIndex the output index this will write to in the final output. + */ def addAggregation(win: GpuWindowExpression, inputLocs: Array[Int], outputIndex: Int): Unit = { - val forSpec = + val forSpec = if (win.isOptimizedRunningWindow) { + runningWindowOptimizedData + } else { data.getOrElseUpdate(win.normalizedFrameSpec, mutable.HashMap.empty) + } + forSpec.getOrElseUpdate(BoundGpuWindowFunction(win.wrappedWindowFunc, inputLocs), ArrayBuffer.empty) += outputIndex } @@ -658,24 +721,149 @@ class GroupedAggregations extends Arm { (groupBy, aggs) => groupBy.aggregateWindowsOverRanges(aggs: _*)) } + private final def doRunningWindowScan( + inputCb: ColumnarBatch, + outputColumns: Array[ColumnVector]): Unit = { + runningWindowOptimizedData.foreach { + case (func, outputIndexes) => + val agg = func.scanAggregation + val replace = func.scanReplaceNulls + require(func.boundInputLocations.length == 1) + val inputColIndex = func.boundInputLocations.head + val inputCol = inputCb.column(inputColIndex).asInstanceOf[GpuColumnVector].getBase + val replaced = withResource(inputCol.scan(agg, ScanType.INCLUSIVE, NullPolicy.EXCLUDE)) { + scanned => + // For scans when nulls are excluded then each input row that has a null in it the + // output row also has a null in it. Typically this is not what we want, because + // for windows that only happens if the first values are nulls. So we will then call + // replace nulls as needed to fix that up. Typically the replacement policy is + // preceding. + if (replace.isDefined) { + scanned.replaceNulls(replace.get) + } else { + scanned.incRefCount() + } + } + + withResource(replaced) { replaced => + withResource(castIfNeeded(replaced, func.dataType)) { retCol => + outputIndexes.foreach { outIndex => + outputColumns(outIndex) = retCol.incRefCount() + } + } + } + } + } + + private final def doRunningWindowGroupedScan( + partByPositions: Array[Int], + inputCb: ColumnarBatch, + outputColumns: Array[ColumnVector]): Unit = { + val allAggs = runningWindowOptimizedData.map { + case (func, _) => + func.groupByScan(inputCb) + }.toSeq + + // Part by is always ascending with nulls first, which is the default for group by options too + val sortedGroupingOpts = GroupByOptions.builder() + .withKeysSorted(true) + .build() + + val scanned = withResource(GpuColumnVector.from(inputCb)) { initProjTab => + initProjTab.groupBy(sortedGroupingOpts, partByPositions: _*).scan(allAggs: _*) + } + withResource(scanned) { scanned => + // This gets a little complicated, because scan does not typically treat nulls the + // way window treats nulls. So in some cases we need to do another group by and replace + // the nulls to make them match what we want. But this is not all of the time, so we + // keep track of which aggregations need to have a replace called on them, and where + // we need to copy the results back out to. This is a little hard because the output of + // scan has the group by columns first followed by the scan columns, and the output of + // replace has the group by columns first followed by the replaced columns. + val allReplace = ArrayBuffer[ReplacePolicyWithColumn]() + val copyOutAfterReplace = ArrayBuffer[(Int, DataType, ArrayBuffer[Int])]() + var afterReplaceIndex = partByPositions.length + runningWindowOptimizedData.zipWithIndex.foreach { + case ((func, finalOutputIndices), zipIndex) => + val inputIndex = zipIndex + partByPositions.length + val replace = func.groupByReplaceNulls(inputIndex) + if (replace.isDefined) { + allReplace.append(replace.get) + copyOutAfterReplace.append((afterReplaceIndex, func.dataType, finalOutputIndices)) + afterReplaceIndex += 1 + } else { + withResource(castIfNeeded(scanned.getColumn(inputIndex), func.dataType)) { col => + finalOutputIndices.foreach { i => + outputColumns(i) = col.incRefCount() + } + } + } + } + + if (allReplace.nonEmpty) { + withResource(scanned + .groupBy(sortedGroupingOpts, partByPositions.indices: _*) + .replaceNulls(allReplace: _*)) { replaced => + copyOutAfterReplace.foreach { + case (inputIndex, dt, outputIndices) => + withResource(castIfNeeded(replaced.getColumn(inputIndex), dt)) { col => + outputIndices.foreach { i => + outputColumns(i) = col.incRefCount() + } + } + } + } + } + } + } + + private def doRunningWindowOptimizedAggs( + partByPositions: Array[Int], + inputCb: ColumnarBatch, + outputColumns: Array[ColumnVector]): Unit = { + if (runningWindowOptimizedData.nonEmpty) { + if (partByPositions.isEmpty) { + // This is implemented in terms of a scan on a column + doRunningWindowScan(inputCb, outputColumns) + } else { + doRunningWindowGroupedScan(partByPositions, inputCb, outputColumns) + } + } + } + def doAggs(boundOrderSpec: Seq[SortOrder], orderByPositions: Array[Int], partByPositions: Array[Int], inputCb: ColumnarBatch, outputColumns: Array[ColumnVector]): Unit = { + doRunningWindowOptimizedAggs(partByPositions, inputCb, outputColumns) doRowAggs(boundOrderSpec, orderByPositions, partByPositions, inputCb, outputColumns) doRangeAggs(boundOrderSpec, orderByPositions, partByPositions, inputCb, outputColumns) } } /** - * Calculates the results of window operations + * Calculates the results of window operations. It assumes that any batching of the data + * or fixups after the fact to get the right answer is done outside of this. */ trait BasicWindowCalc extends Arm { val boundWindowOps: Seq[GpuExpression] val boundPartitionSpec: Seq[GpuExpression] val boundOrderSpec: Seq[SortOrder] + // In order to dedupe aggregations we take a slightly different approach from + // group by aggregations. Instead of using named expressions to line up different + // parts of the aggregation (pre-processing, aggregation, post-processing) we + // keep track of the offsets directly. This is quite a bit more complex, but lets us + // see that 5 aggregations want a column of just 1 and we dedupe it so it is only + // materialized once. + // `initialProjections` are a list of projections that provide the inputs to the `aggregations` + // The order of these matter and `aggregations` is keeping track of them + // `passThrough` are columns that go directly from the input to the output. The first value + // is the index in the original input batch. The second value is the index in the final output + // batch + // `orderByPositions` and `partByPositions` are the positions in `initialProjections` for + // the order by columns and the part by columns respectively. private val (initialProjections, passThrough, aggregations, @@ -700,8 +888,8 @@ trait BasicWindowCalc extends Arm { case (GpuBoundReference(inputIndex, _, _), outputIndex) => passThrough.append((inputIndex, outputIndex)) case (GpuAlias(win: GpuWindowExpression, _), outputIndex) => - val inputLocations = win.wrappedWindowFunc. - windowInputProjection.map(getOrAddInitialProjectionIndex).toArray + val inputLocations = win.initialProjections + .map(getOrAddInitialProjectionIndex).toArray aggregations.addAggregation(win, inputLocations, outputIndex) case _ => throw new IllegalArgumentException("Unexpected operation found in window expression") @@ -832,9 +1020,9 @@ class GpuRunningWindowIterator( boundWindowOps.zipWithIndex.flatMap { case (GpuAlias(GpuWindowExpression(func, _), _), index) => func match { - case f: GpuBatchedRunningWindowFunction[_] => + case f: GpuBatchedRunningWindowWithFixer => Some((index, f.newFixer())) - case GpuAggregateExpression(f: GpuBatchedRunningWindowFunction[_], _, _, _, _) => + case GpuAggregateExpression(f: GpuBatchedRunningWindowWithFixer, _, _, _, _) => Some((index, f.newFixer())) case _ => None } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index 110d8c5c5eb..9a865f8f81d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -20,7 +20,7 @@ import java.util.concurrent.TimeUnit import scala.language.{existentials, implicitConversions} -import ai.rapids.cudf.{Aggregation, AggregationOnColumn, BinaryOp, ColumnVector, RollingAggregation, Scalar} +import ai.rapids.cudf.{Aggregation, AggregationOnColumn, BinaryOp, ColumnVector, DType, ReplacePolicy, ReplacePolicyWithColumn, RollingAggregation, Scalar} import ai.rapids.cudf.Aggregation.{LagAggregation, LeadAggregation, RowNumberAggregation} import com.nvidia.spark.rapids.GpuOverrides.wrapExpr @@ -44,6 +44,7 @@ class GpuWindowExpressionMeta( literal.dataType match { case IntegerType => literal.value.asInstanceOf[Int] + literal.value.asInstanceOf[Int] case t => willNotWorkOnGpu(s"unsupported window boundary type $t") -1 @@ -197,6 +198,42 @@ case class GpuWindowExpression(windowFunction: Expression, windowSpec: GpuWindow case other => throw new IllegalStateException(s"${other.getClass} is not a supported window function") } + + private[this] lazy val optimizedRunningWindow: Option[GpuRunningWindowFunction] = { + if (normalizedFrameSpec.frameType == RowFrame && + GpuWindowExec.isRunningWindow(windowSpec) && + wrappedWindowFunc.isInstanceOf[GpuRunningWindowFunction]) { + val runningWin = wrappedWindowFunc.asInstanceOf[GpuRunningWindowFunction] + val isSupported = if (windowSpec.partitionSpec.isEmpty) { + runningWin.isScanSupported + } else { + runningWin.isGroupByScanSupported + } + if (isSupported) { + Some(runningWin) + } else { + None + } + } else { + None + } + } + + lazy val isOptimizedRunningWindow: Boolean = optimizedRunningWindow.isDefined + + lazy val initialProjections: Seq[Expression] = { + val running = optimizedRunningWindow + if (running.isDefined) { + val r = running.get + if (windowSpec.partitionSpec.isEmpty) { + Seq(r.scanInputProjection) + } else { + r.groupByScanInputProjection + } + } else { + wrappedWindowFunc.windowInputProjection + } + } } class GpuWindowSpecDefinitionMeta( @@ -596,6 +633,18 @@ trait GpuAggregateWindowFunction[T <: Aggregation with RollingAggregation[T]] def windowAggregation(inputs: Seq[(ColumnVector, Int)]): AggregationOnColumn[T] } +trait GpuRunningWindowFunction extends GpuWindowFunction { + def groupByScanInputProjection: Seq[Expression] + def groupByScanAggregation(inputs: Seq[(ColumnVector, Int)]): AggregationOnColumn[_] + def groupByReplaceNulls(index: Int): Option[ReplacePolicyWithColumn] + def isGroupByScanSupported = true + + def scanInputProjection: Expression + def scanAggregation: Aggregation + def scanReplaceNulls: Option[ReplacePolicy] + def isScanSupported = true +} + /** * Provides a way to process running window operations without needing to buffer and split the * batches on partition by boundaries. When this happens part of a partition by key set may @@ -673,12 +722,9 @@ trait BatchedRunningWindowFixer extends AutoCloseable { * For many operations a running window (unbounded preceding to current row) can * process the data without dividing the data up into batches that contain all of the data * for a given group by key set. Instead we store a small amount of state from a previous result - * and use it to fix the final result. This is a memory optimization. Any - * GpuAggregateWindowFunction can still work for running window queries. This just reduces the - * maximum amount of memory needed to process them. + * and use it to fix the final result. This is a memory optimization. */ -trait GpuBatchedRunningWindowFunction[T <: Aggregation with RollingAggregation[T]] - extends GpuAggregateWindowFunction[T] { +trait GpuBatchedRunningWindowWithFixer { /** * Get a new class that can be used to fix up batched RunningWindowOperations. @@ -728,12 +774,105 @@ class BatchedRunningWindowBinaryFixer(val binOp: BinaryOp, val name: String) override def close(): Unit = previousResult.foreach(_.close()) } -case class GpuRowNumber() extends GpuBatchedRunningWindowFunction[RowNumberAggregation] { +/** + * This class fixes up batched running windows for sum. Sum is a lot like other binary op + * fixers, but it has to special case nulls and that is not super generic. In the future we + * might be able to make this more generic but we need to see what the use case really is. + */ +class SumBinaryFixer extends BatchedRunningWindowFixer with Arm with Logging { + private val name = "sum" + private val binOp = BinaryOp.ADD + private var previousResult: Option[Scalar] = None + + override def updateState(finalOutputColumn: GpuColumnVector): Unit = { + logDebug(s"$name: updateState from $previousResult to...") + previousResult.foreach(_.close) + previousResult = + Some(finalOutputColumn.getBase.getScalarElement(finalOutputColumn.getRowCount.toInt - 1)) + logDebug(s"$name: ... $previousResult") + } + + private def makeZeroScalar(dt: DataType): Scalar = dt match { + case ByteType => Scalar.fromByte(0.toByte) + case ShortType => Scalar.fromShort(0.toShort) + case IntegerType => Scalar.fromInt(0) + case LongType => Scalar.fromLong(0) + case FloatType => Scalar.fromFloat(0.0f) + case DoubleType => Scalar.fromDouble(0.0) + case dec: DecimalType => + if (dec.precision <= DType.DECIMAL32_MAX_PRECISION) { + Scalar.fromDecimal(-dec.scale, 0) + } else { + Scalar.fromDecimal(-dec.scale, 0L) + } + case other => + throw new IllegalArgumentException(s"Making a zero scalar for $other is not supported") + } + + override def fixUp(samePartitionMask: Either[GpuColumnVector, Boolean], + windowedColumnOutput: GpuColumnVector): GpuColumnVector = { + logDebug(s"$name: fix up $previousResult $samePartitionMask") + (previousResult, samePartitionMask) match { + case (None, _) => windowedColumnOutput.incRefCount() + case (Some(prev), scala.util.Right(mask)) => + if (mask) { + // ADD is not null safe, so we have to replace NULL with 0 if and only if prev is also + // not null + val base = windowedColumnOutput.getBase + if (prev.isValid) { + val nullsReplaced = withResource(base.isNull) { nulls => + withResource(makeZeroScalar(windowedColumnOutput.dataType())) { zero => + nulls.ifElse(zero, base) + } + } + withResource(nullsReplaced) { nullsReplaced => + GpuColumnVector.from(nullsReplaced.binaryOp(binOp, prev, prev.getType), + windowedColumnOutput.dataType()) + } + } else { + // prev is NULL but NULL + something == NULL which we don't want + windowedColumnOutput.incRefCount() + } + } else { + // The mask is all false so do nothing + windowedColumnOutput.incRefCount() + } + case (Some(prev), scala.util.Left(mask)) => + val base = windowedColumnOutput.getBase + if (prev.isValid) { + val nullsReplaced = withResource(base.isNull) { nulls => + withResource(nulls.and(mask.getBase)) { shouldReplace => + withResource(makeZeroScalar(windowedColumnOutput.dataType())) { zero => + shouldReplace.ifElse(zero, base) + } + } + } + withResource(nullsReplaced) { nullsReplaced => + withResource(nullsReplaced.binaryOp(binOp, prev, prev.getType)) { updated => + GpuColumnVector.from(mask.getBase.ifElse(updated, base), + windowedColumnOutput.dataType()) + } + } + } else { + // prev is NULL but NULL + something == NULL which we don't want + windowedColumnOutput.incRefCount() + } + } + } + + override def close(): Unit = previousResult.foreach(_.close()) +} + +case class GpuRowNumber() extends GpuAggregateWindowFunction[RowNumberAggregation] + with GpuBatchedRunningWindowWithFixer + with GpuRunningWindowFunction { override def nullable: Boolean = false override def dataType: DataType = IntegerType override def children: Seq[Expression] = Nil + // GENERAL WINDOW FUNCTION + override val windowInputProjection: Seq[Expression] = Nil override def windowAggregation( @@ -742,8 +881,24 @@ case class GpuRowNumber() extends GpuBatchedRunningWindowFunction[RowNumberAggre Aggregation.rowNumber().onColumn(0) } + // RUNNING WINDOW + override def newFixer(): BatchedRunningWindowFixer = new BatchedRunningWindowBinaryFixer(BinaryOp.ADD, "row_number") + + // For group by scans cudf does not support ROW_NUMBER so we will do a SUM + // on a column of 1s. We could do a COUNT_ALL too, but it would not be as consistent + // with the non group by scan + override val groupByScanInputProjection: Seq[Expression] = Seq(GpuLiteral(1, IntegerType)) + override def groupByScanAggregation(inputs: Seq[(ColumnVector, Int)]): AggregationOnColumn[_] = + Aggregation.sum().onColumn(inputs.head._2) + override def groupByReplaceNulls(index: Int): Option[ReplacePolicyWithColumn] = None + + // For regular scans cudf does not support ROW_NUMBER, nor does it support COUNT_ALL + // so we will do a SUM on a column of 1s + override val scanInputProjection: Expression = groupByScanInputProjection.head + override def scanAggregation: Aggregation = Aggregation.sum() + override val scanReplaceNulls: Option[ReplacePolicy] = None } abstract class OffsetWindowFunctionMeta[INPUT <: OffsetWindowFunction] ( diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index c1ca7ff2d0b..cdb640a796a 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.rapids import ai.rapids.cudf -import ai.rapids.cudf.{Aggregation, AggregationOnColumn, BinaryOp, ColumnVector, DType, NullPolicy, RollingAggregation} +import ai.rapids.cudf.{Aggregation, AggregationOnColumn, BinaryOp, ColumnVector, DType, NullPolicy, ReplacePolicy, ReplacePolicyWithColumn, RollingAggregation} import ai.rapids.cudf.Aggregation.{CollectListAggregation, CollectSetAggregation, CountAggregation, MaxAggregation, MeanAggregation, MinAggregation, SumAggregation} import com.nvidia.spark.rapids._ @@ -48,8 +48,6 @@ trait GpuAggregateFunction extends GpuExpression with GpuUnevaluable { */ def defaultResult: Option[GpuLiteral] = None - // TODO: Do we need toAggregateExpression methods? - def sql(isDistinct: Boolean): String = { val distinct = if (isDistinct) "DISTINCT " else "" s"$prettyName($distinct${children.map(_.sql).mkString(", ")})" @@ -287,8 +285,10 @@ class CudfLastExcludeNulls(ref: Expression) extends CudfFirstLastBase(ref) { override val offset: Int = -1 } -case class GpuMin(child: Expression) extends GpuAggregateFunction with - GpuBatchedRunningWindowFunction[MinAggregation] { +case class GpuMin(child: Expression) extends GpuAggregateFunction + with GpuBatchedRunningWindowWithFixer + with GpuAggregateWindowFunction[MinAggregation] + with GpuRunningWindowFunction { private lazy val cudfMin = AttributeReference("min", child.dataType)() override lazy val inputProjection: Seq[Expression] = Seq(child) @@ -307,18 +307,46 @@ case class GpuMin(child: Expression) extends GpuAggregateFunction with override def checkInputDataTypes(): TypeCheckResult = TypeUtils.checkForOrderingExpr(child.dataType, "function gpu min") - // WINDOW FUNCTION + // GENERAL WINDOW FUNCTION override lazy val windowInputProjection: Seq[Expression] = inputProjection override def windowAggregation( inputs: Seq[(ColumnVector, Int)]): AggregationOnColumn[MinAggregation] = Aggregation.min().onColumn(inputs.head._2) + // RUNNING WINDOW override def newFixer(): BatchedRunningWindowFixer = new BatchedRunningWindowBinaryFixer(BinaryOp.NULL_MIN, "min") + + override lazy val groupByScanInputProjection: Seq[Expression] = inputProjection + + override def groupByScanAggregation( + inputs: Seq[(ColumnVector, Int)]): AggregationOnColumn[MinAggregation] = + windowAggregation(inputs) + + override def groupByReplaceNulls(index: Int): Option[ReplacePolicyWithColumn] = + Some(ReplacePolicy.PRECEDING.onColumn(index)) + + override def isGroupByScanSupported: Boolean = child.dataType match { + case StringType | TimestampType | DateType => false + case _ => true + } + + override lazy val scanInputProjection: Expression = inputProjection.head + override def scanAggregation: MinAggregation = Aggregation.min() + override val scanReplaceNulls: Option[ReplacePolicy] = Some(ReplacePolicy.PRECEDING) + + override def isScanSupported: Boolean = child.dataType match { + // String type does not work because of https://github.com/rapidsai/cudf/issues/8684 + case TimestampType | DateType => false + case _ => true + } + } case class GpuMax(child: Expression) extends GpuAggregateFunction - with GpuBatchedRunningWindowFunction[MaxAggregation] { + with GpuBatchedRunningWindowWithFixer + with GpuAggregateWindowFunction[MaxAggregation] + with GpuRunningWindowFunction { private lazy val cudfMax = AttributeReference("max", child.dataType)() override lazy val inputProjection: Seq[Expression] = Seq(child) @@ -337,19 +365,46 @@ case class GpuMax(child: Expression) extends GpuAggregateFunction override def checkInputDataTypes(): TypeCheckResult = TypeUtils.checkForOrderingExpr(child.dataType, "function gpu max") - // WINDOW FUNCTION + // GENERAL WINDOW FUNCTION override lazy val windowInputProjection: Seq[Expression] = inputProjection override def windowAggregation( inputs: Seq[(ColumnVector, Int)]): AggregationOnColumn[MaxAggregation] = Aggregation.max().onColumn(inputs.head._2) + // RUNNING WINDOW override def newFixer(): BatchedRunningWindowFixer = new BatchedRunningWindowBinaryFixer(BinaryOp.NULL_MAX, "max") + + override lazy val groupByScanInputProjection: Seq[Expression] = inputProjection + + override def groupByScanAggregation( + inputs: Seq[(ColumnVector, Int)]): AggregationOnColumn[MaxAggregation] = + windowAggregation(inputs) + + override def groupByReplaceNulls(index: Int): Option[ReplacePolicyWithColumn] = + Some(ReplacePolicy.PRECEDING.onColumn(index)) + + override def isGroupByScanSupported: Boolean = child.dataType match { + case StringType | TimestampType | DateType => false + case _ => true + } + + override lazy val scanInputProjection: Expression = inputProjection.head + override def scanAggregation: MaxAggregation = Aggregation.max() + override val scanReplaceNulls: Option[ReplacePolicy] = Some(ReplacePolicy.PRECEDING) + + override def isScanSupported: Boolean = child.dataType match { + // String type does not work because of https://github.com/rapidsai/cudf/issues/8684 + case TimestampType | DateType => false + case _ => true + } } case class GpuSum(child: Expression, resultType: DataType) extends GpuAggregateFunction with ImplicitCastInputTypes - with GpuAggregateWindowFunction[SumAggregation] { + with GpuBatchedRunningWindowWithFixer + with GpuAggregateWindowFunction[SumAggregation] + with GpuRunningWindowFunction { private lazy val cudfSum = AttributeReference("sum", resultType)() @@ -370,11 +425,25 @@ case class GpuSum(child: Expression, resultType: DataType) override def checkInputDataTypes(): TypeCheckResult = TypeUtils.checkForNumericExpr(child.dataType, "function gpu sum") - // WINDOW FUNCTION + // GENERAL WINDOW FUNCTION override lazy val windowInputProjection: Seq[Expression] = inputProjection override def windowAggregation( inputs: Seq[(ColumnVector, Int)]): AggregationOnColumn[SumAggregation] = Aggregation.sum().onColumn(inputs.head._2) + + // RUNNING WINDOW + override def newFixer(): BatchedRunningWindowFixer = + new SumBinaryFixer() + + override def groupByScanInputProjection: Seq[Expression] = inputProjection + override def groupByScanAggregation(inputs: Seq[(ColumnVector, Int)]): AggregationOnColumn[_] = + Aggregation.sum().onColumn(inputs.head._2) + override def groupByReplaceNulls(index: Int): Option[ReplacePolicyWithColumn] = + Some(ReplacePolicy.PRECEDING.onColumn(index)) + + override def scanInputProjection: Expression = child + override def scanAggregation: Aggregation = Aggregation.sum() + override def scanReplaceNulls: Option[ReplacePolicy] = Some(ReplacePolicy.PRECEDING) } /* @@ -469,7 +538,9 @@ case class GpuPivotFirst( } case class GpuCount(children: Seq[Expression]) extends GpuAggregateFunction - with GpuBatchedRunningWindowFunction[CountAggregation] { + with GpuBatchedRunningWindowWithFixer + with GpuAggregateWindowFunction[CountAggregation] + with GpuRunningWindowFunction { // counts are Long private lazy val cudfCount = AttributeReference("count", LongType)() @@ -486,7 +557,7 @@ case class GpuCount(children: Seq[Expression]) extends GpuAggregateFunction override def nullable: Boolean = false override def dataType: DataType = LongType - // WINDOW FUNCTION + // GENERAL WINDOW FUNCTION // countDistinct is not supported for window functions in spark right now. // we could support it by doing an `Aggregation.nunique(false)` override lazy val windowInputProjection: Seq[Expression] = inputProjection @@ -494,8 +565,35 @@ case class GpuCount(children: Seq[Expression]) extends GpuAggregateFunction inputs: Seq[(ColumnVector, Int)]): AggregationOnColumn[CountAggregation] = Aggregation.count(NullPolicy.EXCLUDE).onColumn(inputs.head._2) + // RUNNING WINDOW override def newFixer(): BatchedRunningWindowFixer = new BatchedRunningWindowBinaryFixer(BinaryOp.ADD, "count") + + // Scan and group by scan do not support COUNT with nulls excluded. + // one of them does not even support count at all, so we are going to SUM + // ones and zeros based off of the validity + override def groupByScanInputProjection: Seq[Expression] = Seq(scanInputProjection) + + override def groupByScanAggregation(inputs: Seq[(ColumnVector, Int)]): AggregationOnColumn[_] = { + Aggregation.sum().onColumn(inputs.head._2) + } + + override def groupByReplaceNulls(index: Int): Option[ReplacePolicyWithColumn] = None + + override def scanInputProjection: Expression = { + // There can be only one child according to requirements for count right now + require(children.length == 1) + val child = children.head + if (child.nullable) { + GpuIf(GpuIsNull(child), GpuLiteral(0, IntegerType), GpuLiteral(1, IntegerType)) + } else { + GpuLiteral(1, IntegerType) + } + } + + override def scanAggregation: Aggregation = Aggregation.sum() + + override val scanReplaceNulls: Option[ReplacePolicy] = None } case class GpuAverage(child: Expression) extends GpuAggregateFunction diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/WindowFunctionSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/WindowFunctionSuite.scala index 2500d19e3a4..835d38b5770 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/WindowFunctionSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/WindowFunctionSuite.scala @@ -160,6 +160,16 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite { windowAggregationTesterForDecimal(rowsWindow, scale = -1) } + testSparkResultsAreEqual("[Window] [ROWS] [UNBOUNDED PRECEDING, CURRENT ROW] [NO PART]", + windowTestDfOrc, + new SparkConf().set("spark.sql.legacy.allowNegativeScaleOfDecimal", "true")) { + val rowsWindow = Window + .orderBy("uid", "dateLong", "dollars") + .rowsBetween(Window.unboundedPreceding, 0) + windowAggregationTester(rowsWindow) + windowAggregationTesterForDecimal(rowsWindow, scale = -1) + } + testSparkResultsAreEqual("[Window] [ROWS] [UNBOUNDED PRECEDING, CURRENT ROW] [ROW_NUMBER]", windowTestDfOrc) { val rowsWindow = Window.partitionBy("uid")