diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index b192424eb3c..ca1468c70e6 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -27,6 +27,10 @@ 'spark.rapids.sql.castStringToFloat.enabled': 'true' } +_no_nans_float_smallbatch_conf = _no_nans_float_conf.copy() +_no_nans_float_smallbatch_conf.update( + {'spark.rapids.sql.batchSizeBytes' : '1000'}) + _no_nans_float_conf_partial = _no_nans_float_conf.copy() _no_nans_float_conf_partial.update( {'spark.rapids.sql.hashAgg.replaceMode': 'partial'}) @@ -169,7 +173,7 @@ def get_params(init_list, marked_params=[]): # Run these tests with in 3 modes, all on the GPU, only partial aggregates on GPU and # only final aggregates on the GPU with conf for spark.rapids.sql.hasNans set to false/true -_confs = [_no_nans_float_conf, _no_nans_float_conf_final, _no_nans_float_conf_partial] +_confs = [_no_nans_float_conf, _no_nans_float_smallbatch_conf, _no_nans_float_conf_final, _no_nans_float_conf_partial] _confs_with_nans = [_nans_float_conf, _nans_float_conf_partial, _nans_float_conf_final] # Pytest marker for list of operators allowed to run on the CPU, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala index aaa654e4617..6c2354c79f8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala @@ -68,6 +68,7 @@ object GpuMetric extends Logging { val SPILL_AMOUNT = "spillData" val SPILL_AMOUNT_DISK = "spillDisk" val SPILL_AMOUNT_HOST = "spillHost" + val NUM_TASKS_FALL_BACKED = "numTasksFallBacked" // Metric Descriptions. val DESCRIPTION_BUFFER_TIME = "buffer time" @@ -96,6 +97,7 @@ object GpuMetric extends Logging { val DESCRIPTION_SPILL_AMOUNT = "bytes spilled from GPU" val DESCRIPTION_SPILL_AMOUNT_DISK = "bytes spilled to disk" val DESCRIPTION_SPILL_AMOUNT_HOST = "bytes spilled to host" + val DESCRIPTION_NUM_TASKS_FALL_BACKED = "number of sort fallback tasks" def unwrap(input: GpuMetric): SQLMetric = input match { case w :WrappedGpuMetric => w.sqlMetric diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuKeyBatchingIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuKeyBatchingIterator.scala index 544bc00e637..d6a3e82e5ff 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuKeyBatchingIterator.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuKeyBatchingIterator.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch * target size. It assumes that the input batches will already be close to that size and does * not try to split them too much further. */ -class GpuKeyBatchingIterator private ( +class GpuKeyBatchingIterator( iter: Iterator[ColumnarBatch], sorter: GpuSorter, types: Array[DataType], diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index 316e8c7ace5..e788c93df93 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -16,7 +16,9 @@ package com.nvidia.spark.rapids -import scala.collection.mutable.ArrayBuffer +import java.util + +import scala.collection.mutable import ai.rapids.cudf import ai.rapids.cudf.{NvtxColor, Scalar} @@ -24,16 +26,19 @@ import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSeq, AttributeSet, Expression, If, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeReference, AttributeSeq, AttributeSet, Expression, If, NamedExpression, NullsFirst} import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, HashPartitioning, Partitioning, UnspecifiedDistribution} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.{ExplainUtils, SortExec, SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, SortAggregateExec} import org.apache.spark.sql.rapids.{CudfAggregate, GpuAggregateExpression} -import org.apache.spark.sql.types.{ArrayType, LongType, MapType, StructType} +import org.apache.spark.sql.rapids.execution.TrampolineUtil +import org.apache.spark.sql.types.{ArrayType, DataType, LongType, MapType, StructType} import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} object AggregateUtils { @@ -68,484 +73,538 @@ object AggregateUtils { } } } -} -abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( - plan: INPUT, - aggRequiredChildDistributionExpressions: Option[Seq[Expression]], - conf: RapidsConf, - parent: Option[RapidsMeta[_, _, _]], - rule: DataFromReplacementRule) extends SparkPlanMeta[INPUT](plan, conf, parent, rule) { + /** + * Computes a target input batch size based on the assumption that computation can consume up to + * 4X the configured batch size. + * @param confTargetSize user-configured maximum desired batch size + * @param inputTypes input batch schema + * @param outputTypes output batch schema + * @param isReductionOnly true if this is a reduction-only aggregation without grouping + * @return maximum target batch size to keep computation under the 4X configured batch limit + */ + def computeTargetBatchSize( + confTargetSize: Long, + inputTypes: Seq[DataType], + outputTypes: Seq[DataType], + isReductionOnly: Boolean): Long = { + def typesToSize(types: Seq[DataType]): Long = + types.map(GpuBatchUtils.estimateGpuMemory(_, nullable = false, rowCount = 1)).sum + val inputRowSize = typesToSize(inputTypes) + val outputRowSize = typesToSize(outputTypes) + // The cudf hash table implementation allocates four 32-bit integers per input row. + val hashTableRowSize = 4 * 4 + + // Using the memory management for joins as a reference, target 4X batch size as a budget. + var totalBudget = 4 * confTargetSize + + // Compute the amount of memory being consumed per-row in the computation + var computationBytesPerRow = inputRowSize + hashTableRowSize + if (isReductionOnly) { + // Remove the lone output row size from the budget rather than track per-row in computation + totalBudget -= outputRowSize + } else { + // The worst-case memory consumption during a grouping aggregation is the case where the + // grouping does not combine any input rows, so just as many rows appear in the output. + computationBytesPerRow += outputRowSize + } - val agg: BaseAggregateExec + // Calculate the max rows that can be processed during computation within the budget + val maxRows = totalBudget / computationBytesPerRow - private val requiredChildDistributionExpressions: Option[Seq[BaseExprMeta[_]]] = - aggRequiredChildDistributionExpressions.map(_.map(GpuOverrides.wrapExpr(_, conf, Some(this)))) - private val groupingExpressions: Seq[BaseExprMeta[_]] = - agg.groupingExpressions.map(GpuOverrides.wrapExpr(_, conf, Some(this))) - private val aggregateExpressions: Seq[BaseExprMeta[_]] = - agg.aggregateExpressions.map(GpuOverrides.wrapExpr(_, conf, Some(this))) - private val aggregateAttributes: Seq[BaseExprMeta[_]] = - agg.aggregateAttributes.map(GpuOverrides.wrapExpr(_, conf, Some(this))) - private val resultExpressions: Seq[BaseExprMeta[_]] = - agg.resultExpressions.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + // Finally compute the input target batching size taking into account the cudf row limits + Math.min(inputRowSize * maxRows, Int.MaxValue) + } - override val childExprs: Seq[BaseExprMeta[_]] = - requiredChildDistributionExpressions.getOrElse(Seq.empty) ++ - groupingExpressions ++ - aggregateExpressions ++ - aggregateAttributes ++ - resultExpressions + /** + * Compute the aggregation modes and aggregate expressions for all aggregation expressions + * @param aggExpressions the aggregate expressions + * @param aggBufferAttributes attributes to be bound to the aggregate expressions + */ + def computeAggModeCudfAggregates( + aggExpressions: Seq[GpuAggregateExpression], + aggBufferAttributes: Seq[Attribute]): Seq[(AggregateMode, Seq[CudfAggregate])] = { + // + // update expressions are those performed on the raw input data + // e.g. for count it's count, and for average it's sum and count. + // + val updateExpressionsSeq = aggExpressions.map(_.aggregateFunction.updateExpressions) - override def tagPlanForGpu(): Unit = { - agg.groupingExpressions - .find(_.dataType match { - case _@(ArrayType(_, _) | MapType(_, _, _)) | _@StructType(_) => true - case _ => false - }) - .foreach(_ => - willNotWorkOnGpu("Nested types in grouping expressions are not supported")) - if (agg.resultExpressions.isEmpty) { - willNotWorkOnGpu("result expressions is empty") + // + // merge expressions are used while merging multiple batches, or while on final mode + // e.g. for count it's sum, and for average it's sum and sum. + // + val mergeExpressionsSeq = aggExpressions.map(_.aggregateFunction.mergeExpressions) + + aggExpressions.zipWithIndex.map { case (expr, modeIndex) => + val cudfAggregates = if (expr.mode == Partial || expr.mode == Complete) { + GpuBindReferences.bindGpuReferences(updateExpressionsSeq(modeIndex), aggBufferAttributes) + .asInstanceOf[Seq[CudfAggregate]] + } else { + GpuBindReferences.bindGpuReferences(mergeExpressionsSeq(modeIndex), aggBufferAttributes) + .asInstanceOf[Seq[CudfAggregate]] + } + (expr.mode, cudfAggregates) } + } +} - tagForReplaceMode() +/** Utility class to hold all of the metrics related to hash aggregation */ +case class GpuHashAggregateMetrics( + numOutputRows: GpuMetric, + numOutputBatches: GpuMetric, + numTasksFallBacked: GpuMetric, + computeAggTime: GpuMetric, + concatTime: GpuMetric, + sortTime: GpuMetric, + spillCallback: RapidsBuffer.SpillCallback) + +/** Utility class to convey information on the aggregation modes being used */ +case class AggregateModeInfo( + uniqueModes: Seq[AggregateMode], + hasPartialMode: Boolean, + hasPartialMerge: Boolean, + hasFinalMode: Boolean, + hasCompleteMode: Boolean) + +object AggregateModeInfo { + def apply(uniqueModes: Seq[AggregateMode]): AggregateModeInfo = { + val hasPartialMerge = uniqueModes.contains(PartialMerge) + AggregateModeInfo( + uniqueModes = uniqueModes, + hasPartialMode = hasPartialMerge || uniqueModes.contains(Partial), + hasPartialMerge = hasPartialMerge, + hasFinalMode = uniqueModes.contains(Final), + hasCompleteMode = uniqueModes.contains(Complete) + ) + } +} - if (agg.aggregateExpressions.exists(expr => expr.isDistinct) - && agg.aggregateExpressions.exists(expr => expr.filter.isDefined)) { - // Distinct with Filter is not supported on the GPU currently, - // This makes sure that if we end up here, the plan falls back to the CPU - // which will do the right thing. - willNotWorkOnGpu( - "DISTINCT and FILTER cannot be used in aggregate functions at the same time") +/** + * Iterator that takes another columnar batch iterator as input and emits new columnar batches that + * are aggregated based on the specified grouping and aggregation expressions. This iterator tries + * to perform a hash-based aggregation but is capable of falling back to a sort-based aggregation + * which can operate on data that is either larger than can be represented by a cudf column or + * larger than can fit in GPU memory. + * + * The iterator starts by pulling all batches from the input iterator, performing an initial + * projection and aggregation on each individual batch via `aggregateInputBatches()`. The resulting + * aggregated batches are cached in memory as spillable batches. Once all input batches have been + * aggregated, `tryMergeAggregatedBatches()` is called to attempt a merge of the aggregated batches + * into a single batch. If this is successful then the resulting batch can be returned, otherwise + * `buildSortFallbackIterator` is used to sort the aggregated batches by the grouping keys and + * performs a final merge aggregation pass on the sorted batches. + * + * @param cbIter iterator providing the nput columnar batches + * @param groupingExpressions expressions used for producing the grouping keys + * @param aggregateExpressions GPU aggregate expressions used to produce the aggregations + * @param aggregateAttributes attribute references to each aggregate expression + * @param resultExpressions output expression for the aggregation + * @param childOutput input attributes to identify the input columns from the input batches + * @param modeInfo identifies which aggregation modes are being used + * @param metrics metrics that will be updated during aggregation + * @param configuredTargetBatchSize user-specified value for the targeted input batch size + */ +class GpuHashAggregateIterator( + cbIter: Iterator[ColumnarBatch], + groupingExpressions: Seq[Expression], + aggregateExpressions: Seq[GpuAggregateExpression], + aggregateAttributes: Seq[Attribute], + resultExpressions: Seq[NamedExpression], + childOutput: Seq[Attribute], + modeInfo: AggregateModeInfo, + metrics: GpuHashAggregateMetrics, + configuredTargetBatchSize: Long) + extends Iterator[ColumnarBatch] with Arm with AutoCloseable with Logging { + // Partial mode: + // 1. boundInputReferences: picks column from raw input + // 2. boundUpdateAgg: performs the partial half of the aggregates (GpuCount => CudfCount) + // 3. boundMergeAgg: (if needed) perform a merge of partial aggregates (CudfCount => CudfSum) + // 4. boundResultReferences: is a pass-through of the merged aggregate + // + // Final mode: + // 1. boundInputReferences: is a pass-through of the merged aggregate + // 2. boundMergeAgg: perform merges of incoming, and subsequent batches if required. + // 3. boundFinalProjections: on merged batches, finalize aggregates + // (GpuAverage => CudfSum/CudfCount) + // 4. boundResultReferences: project the result expressions Spark expects in the output. + // + // Complete mode: + // 1. boundInputReferences: picks column from raw input + // 2. boundUpdateAgg: performs the partial half of the aggregates (GpuCount => CudfCount) + // 3. boundMergeAgg: (if needed) perform a merge of partial aggregates (CudfCount => CudfSum) + // 4. boundResultReferences: project the result expressions Spark expects in the output. + private case class BoundExpressionsModeAggregates( + boundInputReferences: Seq[GpuExpression], + boundFinalProjections: Option[Seq[GpuExpression]], + boundResultReferences: Seq[Expression], + aggModeCudfAggregates: Seq[(AggregateMode, Seq[CudfAggregate])]) + + Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => close())) + + private[this] val isReductionOnly = groupingExpressions.isEmpty + private[this] val boundExpressions = setupReferences(childOutput) + private[this] val targetMergeBatchSize = computeTargetMergeBatchSize(configuredTargetBatchSize) + private[this] val aggregatedBatches = new util.ArrayDeque[LazySpillableColumnarBatch] + private[this] var outOfCoreIter: Option[GpuOutOfCoreSortIterator] = None + + /** Iterator for fetching aggregated batches if a sort-based fallback has occurred */ + private[this] var sortFallbackIter: Option[Iterator[ColumnarBatch]] = None + + /** Whether a batch is pending for a reduction-only aggregation */ + private[this] var hasReductionOnlyBatch: Boolean = isReductionOnly + + override def hasNext: Boolean = { + sortFallbackIter.map(_.hasNext).getOrElse { + // reductions produce a result even if the input is empty + hasReductionOnlyBatch || !aggregatedBatches.isEmpty || cbIter.hasNext } } - /** Tagging checks tied to configs that control the aggregation modes that are replaced */ - private def tagForReplaceMode(): Unit = { - val hashAggModes = agg.aggregateExpressions.map(_.mode).distinct - val hashAggReplaceMode = conf.hashAggReplaceMode.toLowerCase - hashAggReplaceMode match { - case "all" => - case "partial" => - if (hashAggModes.contains(Final) || hashAggModes.contains(Complete)) { - // replacing only Partial hash aggregates, so a Final or Complete one should not replace - willNotWorkOnGpu("Replacing Final or Complete hash aggregates disabled") - } - // In partial mode, if there are non-distinct functions and multiple distinct functions, - // non-distinct functions are computed using the First operator. The final result would be - // incorrect for non-distinct functions for partition size > 1. Reason for this is - if - // the first batch computed and sent to CPU doesn't contain all the rows required to - // compute non-distinct function(s), then Spark would consider that value as final result - // (due to First). Fall back to CPU in this case. - if (AggregateUtils.shouldFallbackMultiDistinct(agg.aggregateExpressions)) { - willNotWorkOnGpu("Aggregates of non-distinct functions with multiple distinct " + - "functions are non-deterministic for non-distinct functions as it is " + - "computed using First.") - } - case "final" => - if (hashAggModes.contains(Partial) || hashAggModes.contains(Complete)) { - // replacing only Final hash aggregates, so a Partial or Complete one should not replace - willNotWorkOnGpu("Replacing Partial or Complete hash aggregates disabled") + override def next(): ColumnarBatch = { + val batch = sortFallbackIter.map(_.next()).getOrElse { + // aggregate and merge all pending inputs + if (cbIter.hasNext) { + aggregateInputBatches() + tryMergeAggregatedBatches() + } + + if (aggregatedBatches.size() > 1) { + // Unable to merge to a single output, so must fall back to a sort-based approach. + sortFallbackIter = Some(buildSortFallbackIterator()) + sortFallbackIter.get.next() + } else if (aggregatedBatches.isEmpty) { + if (hasReductionOnlyBatch) { + hasReductionOnlyBatch = false + generateEmptyReductionBatch() + } else { + throw new NoSuchElementException("batches exhausted") } - case "complete" => - if (hashAggModes.contains(Partial) || hashAggModes.contains(Final)) { - // replacing only Complete hash aggregates, so a Partial or Final one should not replace - willNotWorkOnGpu("Replacing Partial or Final hash aggregates disabled") + } else { + // this will be the last batch + hasReductionOnlyBatch = false + withResource(aggregatedBatches.pop()) { lazyBatch => + GpuColumnVector.incRefCounts(lazyBatch.getBatch) } - case _ => - throw new IllegalArgumentException(s"The hash aggregate replacement mode " + - s"$hashAggReplaceMode is not valid. Valid options are: 'partial', " + - s"'final', 'complete', or 'all'") + } } - if (!conf.partialMergeDistinctEnabled && hashAggModes.contains(PartialMerge)) { - willNotWorkOnGpu("Replacing Partial Merge aggregates disabled. " + - s"Set ${conf.partialMergeDistinctEnabled} to true if desired") - } + finalProjectBatch(batch) } - override def convertToGpu(): GpuExec = { - GpuHashAggregateExec( - requiredChildDistributionExpressions.map(_.map(_.convertToGpu())), - groupingExpressions.map(_.convertToGpu()), - aggregateExpressions.map(_.convertToGpu()).asInstanceOf[Seq[GpuAggregateExpression]], - aggregateAttributes.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], - resultExpressions.map(_.convertToGpu()).asInstanceOf[Seq[NamedExpression]], - childPlans.head.convertIfNeeded()) + override def close(): Unit = { + aggregatedBatches.forEach(_.safeClose()) + aggregatedBatches.clear() + outOfCoreIter.foreach(_.close()) + outOfCoreIter = None + sortFallbackIter = None + hasReductionOnlyBatch = false } -} - -class GpuHashAggregateMeta( - override val agg: HashAggregateExec, - conf: RapidsConf, - parent: Option[RapidsMeta[_, _, _]], - rule: DataFromReplacementRule) - extends GpuBaseAggregateMeta(agg, agg.requiredChildDistributionExpressions, - conf, parent, rule) -class GpuSortAggregateMeta( - override val agg: SortAggregateExec, - conf: RapidsConf, - parent: Option[RapidsMeta[_, _, _]], - rule: DataFromReplacementRule) - extends GpuBaseAggregateMeta(agg, agg.requiredChildDistributionExpressions, - conf, parent, rule) { - override def tagPlanForGpu(): Unit = { - super.tagPlanForGpu() + private def computeTargetMergeBatchSize(confTargetSize: Long): Long = { + val aggregates = boundExpressions.aggModeCudfAggregates.flatMap(_._2) + val mergedTypes = groupingExpressions.map(_.dataType) ++ aggregates.map(_.dataType) + AggregateUtils.computeTargetBatchSize(confTargetSize, mergedTypes, mergedTypes,isReductionOnly) + } - // Make sure this is the last check - if this is SortAggregate, the children can be sorts and we - // want to validate they can run on GPU and remove them before replacing this with a - // HashAggregate. We don't want to do this if there is a first or last aggregate, - // because dropping the sort will make them no longer deterministic. - // In the future we might be able to pull the sort functionality into the aggregate so - // we can sort a single batch at a time and sort the combined result as well which would help - // with data skew. - val hasFirstOrLast = agg.aggregateExpressions.exists { agg => - agg.aggregateFunction match { - case _: First | _: Last => true - case _ => false + /** Aggregate all input batches and place the results in the aggregatedBatches queue. */ + private def aggregateInputBatches(): Unit = { + while (cbIter.hasNext) { + val (childCvs, isLastInputBatch) = withResource(cbIter.next()) { inputBatch => + val isLast = GpuColumnVector.isTaggedAsFinalBatch(inputBatch) + (processIncomingBatch(inputBatch), isLast) + } + withResource(childCvs) { _ => + withResource(computeAggregate(childCvs, merge = false)) { aggBatch => + val batch = LazySpillableColumnarBatch(aggBatch, metrics.spillCallback, "aggbatch") + // Avoid making batch spillable for the common case of the last and only batch + if (!(isLastInputBatch && aggregatedBatches.isEmpty)) { + batch.allowSpilling() + } + aggregatedBatches.add(batch) + } } } - if (canThisBeReplaced && !hasFirstOrLast) { - childPlans.foreach { plan => - if (plan.wrapped.isInstanceOf[SortExec]) { - if (!plan.canThisBeReplaced) { - willNotWorkOnGpu("one of the preceding SortExec's cannot be replaced") - } else { - plan.shouldBeRemoved("replacing sort aggregate with hash aggregate") + } + + /** + * Attempt to merge adjacent batches in the aggregatedBatches queue until either there is only + * one batch or merging adjacent batches would exceed the target batch size. + */ + private def tryMergeAggregatedBatches(): Unit = { + while (aggregatedBatches.size() > 1) { + val concatTime = metrics.concatTime + withResource(new NvtxWithMetrics("agg merge pass", NvtxColor.BLUE, concatTime)) { _ => + // continue merging as long as some batches are able to be combined + if (!mergePass()) { + if (aggregatedBatches.size() > 1 && isReductionOnly) { + // We were unable to merge the aggregated batches within the target batch size limit, + // which means normally we would fallback to a sort-based approach. However for + // reduction-only aggregation there are no keys to use for a sort. The only way this + // can work is if all batches are merged. This will exceed the target batch size limit, + // but at this point it is either risk an OOM/cudf error and potentially work or + // not work at all. + logWarning(s"Unable to merge reduction-only aggregated batches within " + + s"target batch limit of $targetMergeBatchSize, attempting to merge remaining " + + s"${aggregatedBatches.size()} batches beyond limit") + withResource(mutable.ArrayBuffer[LazySpillableColumnarBatch]()) { batchesToConcat => + aggregatedBatches.forEach(b => batchesToConcat += b) + aggregatedBatches.clear() + val batch = concatenateAndMerge(batchesToConcat) + // batch does not need to be marked spillable since it is the last and only batch + // and will be immediately retrieved on the next() call. + aggregatedBatches.add(batch) + } } + return } } } } -} -/** - * GpuHashAggregateExec - is the GPU version of HashAggregateExec, with some major differences: - * - it doesn't support spilling to disk - * - it doesn't support strings in the grouping key - * - it doesn't support count(col1, col2, ..., colN) - * - it doesn't support distinct - * @param requiredChildDistributionExpressions this is unchanged by the GPU. It is used in - * EnsureRequirements to be able to add shuffle nodes - * @param groupingExpressions The expressions that, when applied to the input batch, return the - * grouping key - * @param aggregateExpressions The GpuAggregateExpression instances for this node - * @param aggregateAttributes References to each GpuAggregateExpression (attribute references) - * @param resultExpressions the expected output expression of this hash aggregate (which this - * node should project) - * @param child incoming plan (where we get input columns from) - */ -case class GpuHashAggregateExec( - requiredChildDistributionExpressions: Option[Seq[Expression]], - groupingExpressions: Seq[Expression], - aggregateExpressions: Seq[GpuAggregateExpression], - aggregateAttributes: Seq[Attribute], - resultExpressions: Seq[NamedExpression], - child: SparkPlan) extends UnaryExecNode with GpuExec with Arm { + /** + * Perform a single pass over the aggregated batches attempting to merge adjacent batches. + * @return true if at least one merge operation occurred + */ + private def mergePass(): Boolean = { + val batchesToConcat: mutable.ArrayBuffer[LazySpillableColumnarBatch] = mutable.ArrayBuffer.empty + var wasBatchMerged = false + // Current size in bytes of the batches targeted for the next concatenation + var concatSize: Long = 0L + var batchesLeftInPass = aggregatedBatches.size() + + while (batchesLeftInPass > 0) { + closeOnExcept(batchesToConcat) { _ => + var isConcatSearchFinished = false + // Old batches are picked up at the front of the queue and freshly merged batches are + // appended to the back of the queue. Although tempting to allow the pass to "wrap around" + // and pick up batches freshly merged in this pass, it's avoided to prevent changing the + // order of aggregated batches. + while (batchesLeftInPass > 0 && !isConcatSearchFinished) { + val candidate = aggregatedBatches.getFirst + val potentialSize = concatSize + candidate.deviceMemorySize + isConcatSearchFinished = concatSize > 0 && potentialSize > targetMergeBatchSize + if (!isConcatSearchFinished) { + batchesLeftInPass -= 1 + batchesToConcat += aggregatedBatches.removeFirst() + concatSize = potentialSize + } + } + } - override def verboseStringWithOperatorId(): String = { - s""" - |$formattedNodeName - |${ExplainUtils.generateFieldString("Input", child.output)} - |${ExplainUtils.generateFieldString("Keys", groupingExpressions)} - |${ExplainUtils.generateFieldString("Functions", aggregateExpressions)} - |${ExplainUtils.generateFieldString("Aggregate Attributes", aggregateAttributes)} - |${ExplainUtils.generateFieldString("Results", resultExpressions)} - |""".stripMargin - } + val mergedBatch = if (batchesToConcat.length > 1) { + wasBatchMerged = true + val batch = concatenateAndMerge(batchesToConcat) + batch.allowSpilling() + batch + } else { + // Unable to find a neighboring buffer to produce a valid merge in this pass, + // so simply put this buffer back on the queue for other passes. + batchesToConcat.remove(0) + } - case class BoundExpressionsModeAggregates(boundInputReferences: Seq[GpuExpression] , - boundFinalProjections: Option[scala.Seq[GpuExpression]], - boundResultReferences: scala.Seq[Expression] , - aggModeCudfAggregates: scala.Seq[(AggregateMode, scala.Seq[CudfAggregate])]) - // This handles GPU hash aggregation without spilling. - // - // The CPU version of this is (assume no fallback to sort-based aggregation) - // Re: TungstenAggregationIterator.scala, and AggregationIterator.scala - // - // 1) Obtaining an input row and finding a buffer (by hash of the grouping key) where - // to aggregate on. - // 2) Once it has a buffer, it calls processRow on it with the incoming row. - // 3) This will in turn update the buffer, for each row received, agg function by agg function - // 4) In the happy case, we never spill, and an iterator (from the HashMap) is produced s.t. - // downstream nodes can access the aggregated and projected results. - // 5) Spill case (not handled in gpu case) - // a) we create an external sorter [[UnsafeKVExternalSorter]] from the hash map (spilling) - // this leaves room for more aggregations to happen. The external sorter first sorts, and - // then stores the results to disk, and last it clears the in-memory hash map. - // b) we merge external sorters as we spill the map further. - // c) after the hash agg is done with its incoming rows, it will switch to sort-based - // aggregation, because it detected a spill - // d) sort based aggregation is then the mode forward, and not covered in this. - override def doExecuteColumnar(): RDD[ColumnarBatch] = { - val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS) - val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES) - val totalTime = gpuLongMetric(TOTAL_TIME) - val computeAggTime = gpuLongMetric(AGG_TIME) - val concatTime = gpuLongMetric(CONCAT_TIME) - val rdd = child.executeColumnar() + // Add the merged batch to the end of the aggregated batch queue. Only a single pass over + // the batches is being performed due to the batch count check above, so the single-pass + // loop will terminate before picking up this new batch. + aggregatedBatches.addLast(mergedBatch) + batchesToConcat.clear() + concatSize = 0 + } - // cache in a local variable to avoid serializing the full child plan - val childOutput = child.output + wasBatchMerged + } - rdd.mapPartitions { cbIter => { - var batch: ColumnarBatch = null // incoming batch - // - // aggregated[Input]Cb - // This is the equivalent of the aggregation buffer for the cpu case with the grouping key. - // Its columns are: [key1, key2, ..., keyN, cudfAgg1, cudfAgg2, ..., cudfAggN] - // - // For aggregate expressions that are multiple cudf aggregates (like average), - // aggregated[Input]Cb can have one or more cudf aggregate columns. This isn't different than - // the cpu version, other than in the cpu version everything is a catalyst expression. - var aggregatedInputCb: ColumnarBatch = null // aggregated raw incoming batch - var aggregatedCb: ColumnarBatch = null // aggregated concatenated batches - - var finalCb: ColumnarBatch = null // batch after the final projection for each aggregator - var resultCb: ColumnarBatch = null // after the result projection given in resultExpressions - var success = false - - var childCvs: Seq[GpuColumnVector] = null - var concatCvs: Seq[GpuColumnVector] = null - var resultCvs: Seq[GpuColumnVector] = null - - // - // For aggregate exec there are four stages of operation: - // 1) extract columns from input - // 2) aggregation (update/merge) - // 3) finalize aggregations (avg = sum/count) - // 4) result projection (resolve any expressions in the output) - // - // In the CPU hash aggregate, Spark is using a buffer to aggregate the results. - // This buffer has room for each aggregate it is computing (based on type). - // Note that some AggregateExpressions are more than one slot in this buffer - // (avg is a sum and a count slot). - // - // In the GPU, we don't have an aggregation buffer in Spark code (this happens behind the - // scenes in cudf), but we still need to be able to pick out columns out of the input CB (for - // aggregation) - // and the aggregated CB (for the result projection). - // - // Partial mode: - // 1. boundInputReferences: picks column from raw input - // 2. boundUpdateAgg: performs the partial half of the aggregates (GpuCount => CudfCount) - // 3. boundMergeAgg: (if needed) perform a merge of partial aggregates (CudfCount => CudfSum) - // 4. boundResultReferences: is a pass-through of the merged aggregate - // - // Final mode: - // 1. boundInputReferences: is a pass-through of the merged aggregate - // 2. boundMergeAgg: perform merges of incoming, and subsequent batches if required. - // 3. boundFinalProjections: on merged batches, finalize aggregates - // (GpuAverage => CudfSum/CudfCount) - // 4. boundResultReferences: project the result expressions Spark expects in the output. - val boundExpression = setupReferences(childOutput, groupingExpressions, aggregateExpressions) - try { - while (cbIter.hasNext) { - // 1) Consume the raw incoming batch, evaluating nested expressions - // (e.g. avg(col1 + col2)), obtaining ColumnVectors that can be aggregated - batch = cbIter.next() - if (batch.numRows() == 0) { - batch.close() - batch = null - } else { - withResource(new NvtxWithMetrics("Hash Aggregate Batch", NvtxColor.YELLOW, - totalTime)) { _ => - childCvs = processIncomingBatch(batch, boundExpression.boundInputReferences) - - // done with the batch, clean it as soon as possible - batch.close() - batch = null - - // 2) When a partition gets multiple batches, we need to do two things: - // a) if this is the first batch, run aggregation and store the aggregated result - // b) if this is a subsequent batch, we need to merge the previously aggregated - // results with the incoming batch - // c) also update total time and aggTime metrics - aggregatedInputCb = computeAggregate(childCvs, groupingExpressions, - boundExpression.aggModeCudfAggregates, merge = false, computeAggTime) - - childCvs.safeClose() - childCvs = null - - if (aggregatedCb == null) { - // this is the first batch, regardless of mode. - aggregatedCb = aggregatedInputCb - aggregatedInputCb = null - } else { - // this is a subsequent batch, and we must: - // 1) concatenate aggregatedInputCb with the prior result (aggregatedCb) - // 2) perform a merge aggregate on the concatenated columns - // - // In the future, we could plugin in spilling here, where if the concatenated - // batch sizes would go over a threshold, we'd spill the aggregatedCb, - // and perform aggregation on the new batch (which would need to be merged, with the - // spilled aggregates) - // Please note that in order for first/last to work properly we have to maintain - // the order of the input batches. - concatCvs = concatenateBatches(aggregatedCb, aggregatedInputCb, concatTime) - aggregatedCb.close() - aggregatedCb = null - aggregatedInputCb.close() - aggregatedInputCb = null - - // 3) Compute aggregate. In subsequent iterations we'll use this result - // to concatenate against incoming batches (step 2) - aggregatedCb = computeAggregate(concatCvs, groupingExpressions, - boundExpression.aggModeCudfAggregates, merge = true, computeAggTime) - concatCvs.safeClose() - concatCvs = null - } - } - } + /** + * Concatenate batches together and perform a merge aggregation on the result. The input batches + * will be closed as part of this operation. + * @param batches batches to concatenate and merge aggregate + * @return lazy spillable batch which has NOT been marked spillable + */ + private def concatenateAndMerge( + batches: mutable.ArrayBuffer[LazySpillableColumnarBatch]): LazySpillableColumnarBatch = { + withResource(batches) { _ => + withResource(concatenateBatches(batches)) { concatVectors => + withResource(computeAggregate(concatVectors, merge = true)) { mergedBatch => + LazySpillableColumnarBatch(mergedBatch, metrics.spillCallback, "agg merged batch") } + } + } + } - // if - the input iterator was empty && - // we weren't grouping (reduction op) - // We need to return a single row, that contains the initial values of each - // aggregator. - // Note: for grouped aggregates, we will eventually return an empty iterator. - withResource(new NvtxWithMetrics("Final column eval", NvtxColor.YELLOW, - totalTime)) { _ => - if (aggregatedCb == null && groupingExpressions.isEmpty) { - val aggregateFunctions = aggregateExpressions.map(_.aggregateFunction) - val defaultValues = - aggregateFunctions.flatMap(_.initialValues) - val vecs = defaultValues.safeMap { ref => - withResource(GpuScalar.from(ref.asInstanceOf[GpuLiteral].value, ref.dataType)) { - scalar => GpuColumnVector.from(scalar, 1, ref.dataType) - } - } - aggregatedCb = new ColumnarBatch(vecs.toArray, 1) - } + /** Build an iterator that uses a sort-based approach to merge aggregated batches together. */ + private def buildSortFallbackIterator(): Iterator[ColumnarBatch] = { + logInfo("Falling back to sort-based aggregation with ${aggregatedBatches.size()} batches") + metrics.numTasksFallBacked += 1 + val aggregatedBatchIter = new Iterator[ColumnarBatch] { + override def hasNext: Boolean = !aggregatedBatches.isEmpty - // 4) Finally, project the result to the expected layout that Spark expects - // i.e.: select avg(foo) from bar group by baz will produce: - // Partial mode: 3 columns => [bar, sum(foo) as sum_foo, count(foo) as count_foo] - // Final mode: 2 columns => [bar, sum(sum_foo) / sum(count_foo)] - finalCb = if (boundExpression.boundFinalProjections.isDefined) { - if (aggregatedCb != null) { - val finalCvs = - boundExpression.boundFinalProjections.get.map { ref => - // aggregatedCb is made up of ColumnVectors - // and the final projections from the aggregates won't change that, - // so we can assume they will be vectors after we eval - ref.columnarEval(aggregatedCb).asInstanceOf[GpuColumnVector] - } - aggregatedCb.close() - aggregatedCb = null - new ColumnarBatch(finalCvs.toArray, finalCvs.head.getRowCount.toInt) - } else { - null // this was a grouped aggregate, with an empty input - } - } else { - aggregatedCb - } + override def next(): ColumnarBatch = { + withResource(aggregatedBatches.removeFirst()) { lazyBatch => + GpuColumnVector.incRefCounts(lazyBatch.getBatch) + } + } + } - aggregatedCb = null + if (isReductionOnly) { + // Normally this should never happen because `tryMergeAggregatedBatches` should have done + // a last-ditch effort to concatenate all batches together regardless of target limits. + throw new IllegalStateException("Unable to fallback to sort-based aggregation " + + "without grouping keys") + } - if (finalCb != null) { - // Perform the last project to get the correct shape that Spark expects. Note this will - // add things like literals, that were not part of the aggregate into the batch. - resultCvs = boundExpression.boundResultReferences.map { ref => - // Result references can be virtually anything, we need to coerce - // them to be vectors since this is going into a ColumnarBatch - GpuExpressionsUtils.columnarEvalToColumn(ref, finalCb) - } - finalCb.close() - finalCb = null - resultCb = if (resultCvs.isEmpty) { - new ColumnarBatch(Seq().toArray, 0) - } else { - numOutputRows += resultCvs.head.getBase.getRowCount - new ColumnarBatch(resultCvs.toArray, resultCvs.head.getBase.getRowCount.toInt) - } - numOutputBatches += 1 - success = true - new Iterator[ColumnarBatch] { - TaskContext.get().addTaskCompletionListener[Unit] { _ => - if (resultCb != null) { - resultCb.close() - resultCb = null - } - } + val shims = ShimLoader.getSparkShims + val ordering = groupingExpressions.map(shims.sortOrder(_, Ascending, NullsFirst)) + val groupingAttributes = groupingExpressions.map(_.asInstanceOf[NamedExpression].toAttribute) + val aggBufferAttributes = groupingAttributes ++ + aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) + val sorter = new GpuSorter(ordering, aggBufferAttributes) + val aggregates = boundExpressions.aggModeCudfAggregates.flatMap(_._2) + val aggBatchTypes = groupingExpressions.map(_.dataType) ++ aggregates.map(_.dataType) + + // Use the out of core sort iterator to sort the batches by grouping key + outOfCoreIter = Some(GpuOutOfCoreSortIterator( + aggregatedBatchIter, + sorter, + LazilyGeneratedOrdering.forSchema(TrampolineUtil.fromAttributes(groupingAttributes)), + configuredTargetBatchSize, + totalTime = NoopMetric, + sortTime = metrics.sortTime, + outputBatches = NoopMetric, + outputRows = NoopMetric, + peakDevMemory = NoopMetric, + spillCallback = metrics.spillCallback)) + + // The out of core sort iterator does not guarantee that a batch contains all of the values + // for a particular key, so add a key batching iterator to enforce this. That allows each batch + // to be merge-aggregated safely since all values associated with a particular key are + // guaranteed to be in the same batch. + val keyBatchingIter = new GpuKeyBatchingIterator( + outOfCoreIter.get, + sorter, + aggBatchTypes.toArray, + configuredTargetBatchSize, + numInputRows = NoopMetric, + numInputBatches = NoopMetric, + numOutputRows = NoopMetric, + numOutputBatches = NoopMetric, + collectTime = NoopMetric, + concatTime = metrics.concatTime, + totalTime = NoopMetric, + peakDevMemory = NoopMetric, + spillCallback = metrics.spillCallback) + + // Finally wrap the key batching iterator with a merge aggregation on the output batches. + new Iterator[ColumnarBatch] { + override def hasNext: Boolean = keyBatchingIter.hasNext + + override def next(): ColumnarBatch = { + // batches coming out of the sort need to be merged + withResource(keyBatchingIter.next()) { batch => + // TODO: Normally we would want to hint to cudf that the data is already sorted on the + // grouping keys here, but this doesn't always produce the expected result due to a bug. + // See https://github.com/rapidsai/cudf/issues/8717 for details. + computeAggregate(GpuColumnVector.extractColumns(batch), merge = true, isSorted = false) + } + } + } + } - override def hasNext: Boolean = resultCb != null + /** + * Generates the result of a reduction-only aggregation on empty input by emitting the + * initial value of each aggregator. + */ + private def generateEmptyReductionBatch(): ColumnarBatch = { + val aggregateFunctions = aggregateExpressions.map(_.aggregateFunction) + val defaultValues = + aggregateFunctions.flatMap(_.initialValues) + val vecs = defaultValues.safeMap { ref => + withResource(GpuScalar.from(ref.asInstanceOf[GpuLiteral].value, ref.dataType)) { + scalar => GpuColumnVector.from(scalar, 1, ref.dataType) + } + } + new ColumnarBatch(vecs.toArray, 1) + } - override def next(): ColumnarBatch = { - val out = resultCb - resultCb = null - out - } - } - } else { - // we had a grouped aggregate, without input - Iterator.empty + /** + * Project a merged aggregated batch result to the layout that Spark expects + * i.e.: select avg(foo) from bar group by baz will produce: + * Partial mode: 3 columns => [bar, sum(foo) as sum_foo, count(foo) as count_foo] + * Final mode: 2 columns => [bar, sum(sum_foo) / sum(count_foo)] + */ + private def finalProjectBatch(batch: ColumnarBatch): ColumnarBatch = { + val aggTime = metrics.computeAggTime + withResource(new NvtxWithMetrics("finalize agg", NvtxColor.DARK_GREEN, aggTime)) { _ => + val finalBatch = if (boundExpressions.boundFinalProjections.isDefined) { + withResource(batch) { _ => + val finalCvs = boundExpressions.boundFinalProjections.get.map { ref => + // aggregatedCb is made up of ColumnVectors + // and the final projections from the aggregates won't change that, + // so we can assume they will be vectors after we eval + ref.columnarEval(batch).asInstanceOf[GpuColumnVector] } + new ColumnarBatch(finalCvs.toArray, finalCvs.head.getRowCount.toInt) } - } finally { - if (!success) { - if (resultCvs != null) { - resultCvs.safeClose() - } + } else { + batch + } + + // Perform the last project to get the correct shape that Spark expects. Note this may + // add things like literals that were not part of the aggregate into the batch. + val resultCvs = withResource(finalBatch) { _ => + boundExpressions.boundResultReferences.safeMap { ref => + // Result references can be virtually anything, we need to coerce + // them to be vectors since this is going into a ColumnarBatch + GpuExpressionsUtils.columnarEvalToColumn(ref, finalBatch) } - childCvs.safeClose() - concatCvs.safeClose() - Seq(batch, aggregatedInputCb, aggregatedCb, finalCb) - .safeClose() } - }} + closeOnExcept(resultCvs) { _ => + val rowCount = if (resultCvs.isEmpty) 0 else resultCvs.head.getRowCount.toInt + metrics.numOutputRows += rowCount + metrics.numOutputBatches += 1 + new ColumnarBatch(resultCvs.toArray, rowCount) + } + } } - private def processIncomingBatch(batch: ColumnarBatch, - boundInputReferences: Seq[Expression]): Seq[GpuColumnVector] = { - boundInputReferences.safeMap { ref => - val childCv = GpuExpressionsUtils.columnarEvalToColumn(ref, batch) - if (childCv.dataType == ref.dataType) { - childCv - } else { - withResource(childCv) { childCv => - val rapidsType = GpuColumnVector.getNonNestedRapidsType(ref.dataType) - GpuColumnVector.from(childCv.getBase.castTo(rapidsType), ref.dataType) + /** Perform the initial projection on the input batch and extract the result columns */ + private def processIncomingBatch(batch: ColumnarBatch): Seq[GpuColumnVector] = { + val aggTime = metrics.computeAggTime + withResource(new NvtxWithMetrics("prep agg batch", NvtxColor.CYAN, aggTime)) { _ => + boundExpressions.boundInputReferences.safeMap { ref => + val childCv = GpuExpressionsUtils.columnarEvalToColumn(ref, batch) + if (childCv.dataType == ref.dataType) { + childCv + } else { + withResource(childCv) { childCv => + val rapidsType = GpuColumnVector.getNonNestedRapidsType(ref.dataType) + GpuColumnVector.from(childCv.getBase.castTo(rapidsType), ref.dataType) + } } } } } /** - * concatenateBatches - given two ColumnarBatch instances, return a sequence of GpuColumnVector - * that is the concatenated columns of the two input batches. - * @param aggregatedInputCb this is an incoming batch - * @param aggregatedCb this is a batch that was kept for concatenation - * @return Seq[GpuColumnVector] with concatenated vectors + * Concatenates batches by concatenating the corresponding column vectors within the batches. + * @note the input batches are not closed as part of this operation + * @param batchesToConcat batches to concatenate + * @return concatenated vectors that together represent the concatenated batch result */ - private def concatenateBatches(aggregatedInputCb: ColumnarBatch, - aggregatedCb: ColumnarBatch, - concatTime: GpuMetric): Seq[GpuColumnVector] = { + private def concatenateBatches( + batchesToConcat: mutable.ArrayBuffer[LazySpillableColumnarBatch]): Seq[GpuColumnVector] = { + val concatTime = metrics.concatTime withResource(new NvtxWithMetrics("concatenateBatches", NvtxColor.BLUE, concatTime)) { _ => - // get tuples of columns to concatenate - - val zipped = (0 until aggregatedCb.numCols()).map { i => - (aggregatedInputCb.column(i), aggregatedCb.column(i)) - } - - zipped.map { - case (col1, col2) => - GpuColumnVector.from( - cudf.ColumnVector.concatenate( - col1.asInstanceOf[GpuColumnVector].getBase, - col2.asInstanceOf[GpuColumnVector].getBase), col1.dataType()) + val numCols = batchesToConcat.head.numCols + (0 until numCols).safeMap { i => + val columnType = batchesToConcat.head.getBatch.column(i).dataType() + val columnsToConcat = batchesToConcat.map { + _.getBatch.column(i).asInstanceOf[GpuColumnVector].getBase + } + GpuColumnVector.from(cudf.ColumnVector.concatenate(columnsToConcat: _*), columnType) } } } - private lazy val allModes: Seq[AggregateMode] = aggregateExpressions.map(_.mode) - private lazy val uniqueModes: Seq[AggregateMode] = allModes.distinct - private lazy val partialMode = uniqueModes.contains(Partial) || uniqueModes.contains(PartialMerge) - private lazy val finalMode = uniqueModes.contains(Final) - private lazy val completeMode = uniqueModes.contains(Complete) - /** * getCudfAggregates returns a sequence of `cudf.Aggregate`, given the current mode * `AggregateMode`, and a sequence of all expressions for this [[GpuHashAggregateExec]] @@ -560,39 +619,13 @@ case class GpuHashAggregateExec( * @return Seq of `cudf.Aggregate`, with one or more aggregates that correspond to each * expression in allExpressions */ - def setupReferences(childAttr: AttributeSeq, - groupingExpressions: Seq[Expression], - aggregateExpressions: Seq[GpuAggregateExpression]): BoundExpressionsModeAggregates = { - + private def setupReferences(childAttr: AttributeSeq): BoundExpressionsModeAggregates = { val groupingAttributes = groupingExpressions.map(_.asInstanceOf[NamedExpression].toAttribute) val aggBufferAttributes = groupingAttributes ++ - aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) + aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) - // - // update expressions are those performed on the raw input data - // e.g. for count it's count, and for average it's sum and count. - // - val updateExpressionsSeq = - aggregateExpressions.map( - _.aggregateFunction.updateExpressions) - // - // merge expressions are used while merging multiple batches, or while on final mode - // e.g. for count it's sum, and for average it's sum and sum. - // - val mergeExpressionsSeq = - aggregateExpressions.map( - _.aggregateFunction.mergeExpressions) - - val aggModeCudfAggregates = aggregateExpressions.zipWithIndex.map { case (expr, modeIndex) => - val cudfAggregates = if (expr.mode == Partial || expr.mode == Complete) { - GpuBindReferences.bindGpuReferences(updateExpressionsSeq(modeIndex), aggBufferAttributes) - .asInstanceOf[Seq[CudfAggregate]] - } else { - GpuBindReferences.bindGpuReferences(mergeExpressionsSeq(modeIndex), aggBufferAttributes) - .asInstanceOf[Seq[CudfAggregate]] - } - (expr.mode, cudfAggregates) - } + val aggModeCudfAggregates = + AggregateUtils.computeAggModeCudfAggregates(aggregateExpressions, aggBufferAttributes) // // expressions to pick input to the aggregate, and finalize the output to the result projection. @@ -611,21 +644,21 @@ case class GpuHashAggregateExec( // Pick merge non-distinct for PartialMerge val mergeExpressionsNonDistinct = nonDistinctAggExpressions - .flatMap(_.aggregateFunction.mergeExpressions) - .map(_.asInstanceOf[CudfAggregate].ref) + .flatMap(_.aggregateFunction.mergeExpressions) + .map(_.asInstanceOf[CudfAggregate].ref) val mergeAttributesNonDistinct = nonDistinctAggExpressions.flatMap( _.aggregateFunction.aggBufferAttributes) // Partial with no distinct or when modes are empty val inputProjections: Seq[Expression] = groupingExpressions ++ aggregateExpressions - .flatMap(_.aggregateFunction.inputProjection) + .flatMap(_.aggregateFunction.inputProjection) var distinctAttributes = Seq[Attribute]() var distinctExpressions = Seq[Expression]() var nonDistinctAttributes = Seq[Attribute]() var nonDistinctExpressions = Seq[Expression]() - uniqueModes.foreach { + modeInfo.uniqueModes.foreach { case PartialMerge => nonDistinctAttributes = mergeAttributesNonDistinct nonDistinctExpressions = mergeExpressionsNonDistinct @@ -633,7 +666,7 @@ case class GpuHashAggregateExec( // Partial with distinct case val updateExpressionsCudfAggsDistinct = updateExpressionsDistinct.filter(_.isInstanceOf[CudfAggregate]) - .map(_.asInstanceOf[CudfAggregate].ref) + .map(_.asInstanceOf[CudfAggregate].ref) if (inputProjectionsDistinct.exists(p => !p.isInstanceOf[NamedExpression])) { // Case of distinct average we need to evaluate the "GpuCast and GpuIsNotNull" columns. // Refer to how input projections are setup for GpuAverage. @@ -652,7 +685,7 @@ case class GpuHashAggregateExec( val resultingBindAttributes = groupingAttributes ++ distinctAttributes ++ nonDistinctAttributes val finalProjections = groupingExpressions ++ - aggregateExpressions.map(_.aggregateFunction.evaluateExpression) + aggregateExpressions.map(_.aggregateFunction.evaluateExpression) // boundInputReferences is used to pick out of the input batch the appropriate columns // for aggregation @@ -662,15 +695,15 @@ case class GpuHashAggregateExec( // - Partial, PartialMerge mode: we use the inputProjections or distinct update expressions // for Partial and non distinct merge expressions for PartialMerge. // - Final mode: we pick the columns in the order as handed to us. - val boundInputReferences = if (uniqueModes.contains(PartialMerge)) { + val boundInputReferences = if (modeInfo.hasPartialMerge) { GpuBindReferences.bindGpuReferences(inputBindExpressions, resultingBindAttributes) - } else if (finalMode) { + } else if (modeInfo.hasFinalMode) { GpuBindReferences.bindGpuReferences(childAttr.attrs.asInstanceOf[Seq[Expression]], childAttr) } else { GpuBindReferences.bindGpuReferences(inputProjections, childAttr) } - val boundFinalProjections = if (finalMode || completeMode) { + val boundFinalProjections = if (modeInfo.hasFinalMode || modeInfo.hasCompleteMode) { Some(GpuBindReferences.bindGpuReferences(finalProjections, aggBufferAttributes)) } else { None @@ -686,11 +719,11 @@ case class GpuHashAggregateExec( // out of the node as is. // - Final or Complete mode: we use resultExpressions to pick out the correct columns that // finalReferences has pre-processed for us - val boundResultReferences = if (partialMode) { + val boundResultReferences = if (modeInfo.hasPartialMode) { GpuBindReferences.bindGpuReferences( resultExpressions, resultExpressions.map(_.toAttribute)) - } else if (finalMode || completeMode) { + } else if (modeInfo.hasFinalMode || modeInfo.hasCompleteMode) { GpuBindReferences.bindGpuReferences( resultExpressions, finalAttributes) @@ -703,11 +736,19 @@ case class GpuHashAggregateExec( boundResultReferences, aggModeCudfAggregates) } - def computeAggregate(toAggregateCvs: Seq[GpuColumnVector], - groupingExpressions: Seq[Expression], - aggModeCudfAggregates : Seq[(AggregateMode, Seq[CudfAggregate])], - merge : Boolean, - computeAggTime: GpuMetric): ColumnarBatch = { + /** + * Compute the aggregations on the projected input columns. + * @param toAggregateCvs column vectors representing the input batch to aggregate + * @param merge true indicates a merge aggregation should be performed + * @param isSorted true indicates the data is already sorted by the grouping keys + * @return aggregated batch + */ + private def computeAggregate( + toAggregateCvs: Seq[GpuColumnVector], + merge: Boolean, + isSorted: Boolean = false): ColumnarBatch = { + val aggModeCudfAggregates = boundExpressions.aggModeCudfAggregates + val computeAggTime = metrics.computeAggTime withResource(new NvtxWithMetrics("computeAggregate", NvtxColor.CYAN, computeAggTime)) { _ => if (groupingExpressions.nonEmpty) { // Perform group by aggregation @@ -725,8 +766,12 @@ case class GpuHashAggregateExec( aggregates.map(a => a.mergeAggregate.onColumn(a.getOrdinal(a.ref))) } } + val groupOptions = cudf.GroupByOptions.builder() + .withIgnoreNullKeys(false) + .withKeysSorted(isSorted) + .build() val result = withResource(new cudf.Table(toAggregateCvs.map(_.getBase): _*)) { tbl => - tbl.groupBy(groupingExpressions.indices: _*).aggregate(cudfAggregates: _*) + tbl.groupBy(groupOptions, groupingExpressions.indices: _*).aggregate(cudfAggregates: _*) } withResource(result) { result => // Turn aggregation into a ColumnarBatch for the result evaluation @@ -742,7 +787,7 @@ case class GpuHashAggregateExec( // later. Cast here to the type that the aggregate expects (e.g. Long in case of count) val dataTypes = groupingExpressions.map(_.dataType) ++ aggregates.map(_.dataType) - val resCols = new ArrayBuffer[ColumnVector](result.getNumberOfColumns) + val resCols = new mutable.ArrayBuffer[ColumnVector](result.getNumberOfColumns) for (i <- 0 until result.getNumberOfColumns) { val rapidsType = GpuColumnVector.getNonNestedRapidsType(dataTypes(i)) // cast will be cheap if type matches, only does refCount++ in that case @@ -756,7 +801,7 @@ case class GpuHashAggregateExec( // Reduction aggregate // we ask the appropriate merge or update CudfAggregates, what their // reduction merge or update aggregates functions are - val cvs = ArrayBuffer[GpuColumnVector]() + val cvs = mutable.ArrayBuffer[GpuColumnVector]() aggModeCudfAggregates.foreach { case (mode, aggs) => aggs.foreach { agg => val aggFn = if ((mode == Partial || mode == Complete) && !merge) { @@ -787,14 +832,237 @@ case class GpuHashAggregateExec( } } } +} + +abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( + plan: INPUT, + aggRequiredChildDistributionExpressions: Option[Seq[Expression]], + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: DataFromReplacementRule) extends SparkPlanMeta[INPUT](plan, conf, parent, rule) { + + val agg: BaseAggregateExec + + private val requiredChildDistributionExpressions: Option[Seq[BaseExprMeta[_]]] = + aggRequiredChildDistributionExpressions.map(_.map(GpuOverrides.wrapExpr(_, conf, Some(this)))) + private val groupingExpressions: Seq[BaseExprMeta[_]] = + agg.groupingExpressions.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + private val aggregateExpressions: Seq[BaseExprMeta[_]] = + agg.aggregateExpressions.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + private val aggregateAttributes: Seq[BaseExprMeta[_]] = + agg.aggregateAttributes.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + private val resultExpressions: Seq[BaseExprMeta[_]] = + agg.resultExpressions.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + + override val childExprs: Seq[BaseExprMeta[_]] = + requiredChildDistributionExpressions.getOrElse(Seq.empty) ++ + groupingExpressions ++ + aggregateExpressions ++ + aggregateAttributes ++ + resultExpressions + + override def tagPlanForGpu(): Unit = { + agg.groupingExpressions + .find(_.dataType match { + case _@(ArrayType(_, _) | MapType(_, _, _)) | _@StructType(_) => true + case _ => false + }) + .foreach(_ => + willNotWorkOnGpu("Nested types in grouping expressions are not supported")) + if (agg.resultExpressions.isEmpty) { + willNotWorkOnGpu("result expressions is empty") + } + + tagForReplaceMode() + + if (agg.aggregateExpressions.exists(expr => expr.isDistinct) + && agg.aggregateExpressions.exists(expr => expr.filter.isDefined)) { + // Distinct with Filter is not supported on the GPU currently, + // This makes sure that if we end up here, the plan falls back to the CPU + // which will do the right thing. + willNotWorkOnGpu( + "DISTINCT and FILTER cannot be used in aggregate functions at the same time") + } + } + + /** Tagging checks tied to configs that control the aggregation modes that are replaced */ + private def tagForReplaceMode(): Unit = { + val hashAggModes = agg.aggregateExpressions.map(_.mode).distinct + val hashAggReplaceMode = conf.hashAggReplaceMode.toLowerCase + hashAggReplaceMode match { + case "all" => + case "partial" => + if (hashAggModes.contains(Final) || hashAggModes.contains(Complete)) { + // replacing only Partial hash aggregates, so a Final or Complete one should not replace + willNotWorkOnGpu("Replacing Final or Complete hash aggregates disabled") + } + // In partial mode, if there are non-distinct functions and multiple distinct functions, + // non-distinct functions are computed using the First operator. The final result would be + // incorrect for non-distinct functions for partition size > 1. Reason for this is - if + // the first batch computed and sent to CPU doesn't contain all the rows required to + // compute non-distinct function(s), then Spark would consider that value as final result + // (due to First). Fall back to CPU in this case. + if (AggregateUtils.shouldFallbackMultiDistinct(agg.aggregateExpressions)) { + willNotWorkOnGpu("Aggregates of non-distinct functions with multiple distinct " + + "functions are non-deterministic for non-distinct functions as it is " + + "computed using First.") + } + case "final" => + if (hashAggModes.contains(Partial) || hashAggModes.contains(Complete)) { + // replacing only Final hash aggregates, so a Partial or Complete one should not replace + willNotWorkOnGpu("Replacing Partial or Complete hash aggregates disabled") + } + case "complete" => + if (hashAggModes.contains(Partial) || hashAggModes.contains(Final)) { + // replacing only Complete hash aggregates, so a Partial or Final one should not replace + willNotWorkOnGpu("Replacing Partial or Final hash aggregates disabled") + } + case _ => + throw new IllegalArgumentException(s"The hash aggregate replacement mode " + + s"$hashAggReplaceMode is not valid. Valid options are: 'partial', " + + s"'final', 'complete', or 'all'") + } + + if (!conf.partialMergeDistinctEnabled && hashAggModes.contains(PartialMerge)) { + willNotWorkOnGpu("Replacing Partial Merge aggregates disabled. " + + s"Set ${conf.partialMergeDistinctEnabled} to true if desired") + } + } + + override def convertToGpu(): GpuExec = { + GpuHashAggregateExec( + requiredChildDistributionExpressions.map(_.map(_.convertToGpu())), + groupingExpressions.map(_.convertToGpu()), + aggregateExpressions.map(_.convertToGpu()).asInstanceOf[Seq[GpuAggregateExpression]], + aggregateAttributes.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], + resultExpressions.map(_.convertToGpu()).asInstanceOf[Seq[NamedExpression]], + childPlans.head.convertIfNeeded(), + conf.gpuTargetBatchSizeBytes) + } +} + +class GpuHashAggregateMeta( + override val agg: HashAggregateExec, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: DataFromReplacementRule) + extends GpuBaseAggregateMeta(agg, agg.requiredChildDistributionExpressions, + conf, parent, rule) + +class GpuSortAggregateMeta( + override val agg: SortAggregateExec, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: DataFromReplacementRule) + extends GpuBaseAggregateMeta(agg, agg.requiredChildDistributionExpressions, + conf, parent, rule) { + override def tagPlanForGpu(): Unit = { + super.tagPlanForGpu() + + // Make sure this is the last check - if this is SortAggregate, the children can be sorts and we + // want to validate they can run on GPU and remove them before replacing this with a + // HashAggregate. We don't want to do this if there is a first or last aggregate, + // because dropping the sort will make them no longer deterministic. + // In the future we might be able to pull the sort functionality into the aggregate so + // we can sort a single batch at a time and sort the combined result as well which would help + // with data skew. + val hasFirstOrLast = agg.aggregateExpressions.exists { agg => + agg.aggregateFunction match { + case _: First | _: Last => true + case _ => false + } + } + if (canThisBeReplaced && !hasFirstOrLast) { + childPlans.foreach { plan => + if (plan.wrapped.isInstanceOf[SortExec]) { + if (!plan.canThisBeReplaced) { + willNotWorkOnGpu("one of the preceding SortExec's cannot be replaced") + } else { + plan.shouldBeRemoved("replacing sort aggregate with hash aggregate") + } + } + } + } + } +} + +/** + * The GPU version of HashAggregateExec + * + * @param requiredChildDistributionExpressions this is unchanged by the GPU. It is used in + * EnsureRequirements to be able to add shuffle nodes + * @param groupingExpressions The expressions that, when applied to the input batch, return the + * grouping key + * @param aggregateExpressions The GpuAggregateExpression instances for this node + * @param aggregateAttributes References to each GpuAggregateExpression (attribute references) + * @param resultExpressions the expected output expression of this hash aggregate (which this + * node should project) + * @param child incoming plan (where we get input columns from) + * @param configuredTargetBatchSize user-configured maximum device memory size of a batch + */ +case class GpuHashAggregateExec( + requiredChildDistributionExpressions: Option[Seq[Expression]], + groupingExpressions: Seq[Expression], + aggregateExpressions: Seq[GpuAggregateExpression], + aggregateAttributes: Seq[Attribute], + resultExpressions: Seq[NamedExpression], + child: SparkPlan, + configuredTargetBatchSize: Long) extends UnaryExecNode with GpuExec with Arm { + private lazy val uniqueModes: Seq[AggregateMode] = aggregateExpressions.map(_.mode).distinct protected override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL protected override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL - override lazy val additionalMetrics = Map( - TOTAL_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_TOTAL_TIME), + override lazy val additionalMetrics: Map[String, GpuMetric] = Map( + NUM_TASKS_FALL_BACKED -> createMetric(MODERATE_LEVEL, DESCRIPTION_NUM_TASKS_FALL_BACKED), AGG_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_AGG_TIME), - CONCAT_TIME-> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_CONCAT_TIME) - ) + CONCAT_TIME-> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_CONCAT_TIME), + SORT_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_SORT_TIME) + ) ++ spillMetrics + + override def verboseStringWithOperatorId(): String = { + s""" + |$formattedNodeName + |${ExplainUtils.generateFieldString("Input", child.output)} + |${ExplainUtils.generateFieldString("Keys", groupingExpressions)} + |${ExplainUtils.generateFieldString("Functions", aggregateExpressions)} + |${ExplainUtils.generateFieldString("Aggregate Attributes", aggregateAttributes)} + |${ExplainUtils.generateFieldString("Results", resultExpressions)} + |""".stripMargin + } + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val aggMetrics = GpuHashAggregateMetrics( + numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS), + numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES), + numTasksFallBacked = gpuLongMetric(NUM_TASKS_FALL_BACKED), + computeAggTime = gpuLongMetric(AGG_TIME), + concatTime = gpuLongMetric(CONCAT_TIME), + sortTime = gpuLongMetric(SORT_TIME), + makeSpillCallback(allMetrics)) + + // cache in a local variable to avoid serializing the full child plan + val childOutput = child.output + val groupingExprs = groupingExpressions + val aggregateExprs = aggregateExpressions + val aggregateAttrs = aggregateAttributes + val resultExprs = resultExpressions + val modeInfo = AggregateModeInfo(uniqueModes) + + val rdd = child.executeColumnar() + + rdd.mapPartitions { cbIter => + new GpuHashAggregateIterator( + cbIter, + groupingExprs, + aggregateExprs, + aggregateAttrs, + resultExprs, + childOutput, + modeInfo, + aggMetrics, + configuredTargetBatchSize) + } + } protected def outputExpressions: Seq[NamedExpression] = resultExpressions