diff --git a/docs/additional-functionality/advanced_configs.md b/docs/additional-functionality/advanced_configs.md
index 883aab24cdb..0ce1a964ca7 100644
--- a/docs/additional-functionality/advanced_configs.md
+++ b/docs/additional-functionality/advanced_configs.md
@@ -383,6 +383,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
spark.rapids.sql.expression.Last|`last`, `last_value`|last aggregate operator|true|None|
spark.rapids.sql.expression.Max|`max`|Max aggregate operator|true|None|
spark.rapids.sql.expression.Min|`min`|Min aggregate operator|true|None|
+spark.rapids.sql.expression.Percentile|`percentile`|Aggregation computing exact percentile|true|None|
spark.rapids.sql.expression.PivotFirst| |PivotFirst operator|true|None|
spark.rapids.sql.expression.StddevPop|`stddev_pop`|Aggregation computing population standard deviation|true|None|
spark.rapids.sql.expression.StddevSamp|`stddev_samp`, `std`, `stddev`|Aggregation computing sample standard deviation|true|None|
diff --git a/docs/supported_ops.md b/docs/supported_ops.md
index 48949ab00ef..f0358163cc0 100644
--- a/docs/supported_ops.md
+++ b/docs/supported_ops.md
@@ -17629,6 +17629,180 @@ are limited.
UDT |
+Percentile |
+`percentile` |
+Aggregation computing exact percentile |
+None |
+aggregation |
+input |
+ |
+S |
+S |
+S |
+S |
+S |
+S |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+
+
+percentage |
+ |
+ |
+ |
+ |
+ |
+ |
+PS Literal value only |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+S |
+ |
+ |
+ |
+
+
+frequency |
+ |
+ |
+ |
+ |
+S |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+S |
+ |
+ |
+ |
+
+
+result |
+ |
+ |
+ |
+ |
+ |
+ |
+S |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+S |
+ |
+ |
+ |
+
+
+reduction |
+input |
+ |
+S |
+S |
+S |
+S |
+S |
+S |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+
+
+percentage |
+ |
+ |
+ |
+ |
+ |
+ |
+PS Literal value only |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+S |
+ |
+ |
+ |
+
+
+frequency |
+ |
+ |
+ |
+ |
+S |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+S |
+ |
+ |
+ |
+
+
+result |
+ |
+ |
+ |
+ |
+ |
+ |
+S |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+S |
+ |
+ |
+ |
+
+
PivotFirst |
|
PivotFirst operator |
@@ -17894,6 +18068,32 @@ are limited.
|
+Expression |
+SQL Functions(s) |
+Description |
+Notes |
+Context |
+Param/Output |
+BOOLEAN |
+BYTE |
+SHORT |
+INT |
+LONG |
+FLOAT |
+DOUBLE |
+DATE |
+TIMESTAMP |
+STRING |
+DECIMAL |
+NULL |
+BINARY |
+CALENDAR |
+ARRAY |
+MAP |
+STRUCT |
+UDT |
+
+
StddevSamp |
`stddev_samp`, `std`, `stddev` |
Aggregation computing sample standard deviation |
@@ -18027,32 +18227,6 @@ are limited.
|
-Expression |
-SQL Functions(s) |
-Description |
-Notes |
-Context |
-Param/Output |
-BOOLEAN |
-BYTE |
-SHORT |
-INT |
-LONG |
-FLOAT |
-DOUBLE |
-DATE |
-TIMESTAMP |
-STRING |
-DECIMAL |
-NULL |
-BINARY |
-CALENDAR |
-ARRAY |
-MAP |
-STRUCT |
-UDT |
-
-
Sum |
`sum` |
Sum aggregate operator |
@@ -18319,6 +18493,32 @@ are limited.
|
+Expression |
+SQL Functions(s) |
+Description |
+Notes |
+Context |
+Param/Output |
+BOOLEAN |
+BYTE |
+SHORT |
+INT |
+LONG |
+FLOAT |
+DOUBLE |
+DATE |
+TIMESTAMP |
+STRING |
+DECIMAL |
+NULL |
+BINARY |
+CALENDAR |
+ARRAY |
+MAP |
+STRUCT |
+UDT |
+
+
VarianceSamp |
`var_samp`, `variance` |
Aggregation computing sample variance |
@@ -18452,32 +18652,6 @@ are limited.
|
-Expression |
-SQL Functions(s) |
-Description |
-Notes |
-Context |
-Param/Output |
-BOOLEAN |
-BYTE |
-SHORT |
-INT |
-LONG |
-FLOAT |
-DOUBLE |
-DATE |
-TIMESTAMP |
-STRING |
-DECIMAL |
-NULL |
-BINARY |
-CALENDAR |
-ARRAY |
-MAP |
-STRUCT |
-UDT |
-
-
NormalizeNaNAndZero |
|
Normalize NaN and zero |
diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py
index 4f58278360c..a9300a51c79 100644
--- a/integration_tests/src/main/python/hash_aggregate_test.py
+++ b/integration_tests/src/main/python/hash_aggregate_test.py
@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import math
import pytest
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_row_counts_equal,\
@@ -883,6 +884,170 @@ def test_hash_groupby_collect_partial_replace_with_distinct_fallback(data_gen,
group by a""",
conf=conf)
+
+exact_percentile_data_gen = [ByteGen(), ShortGen(), IntegerGen(), LongGen(), FloatGen(), DoubleGen(),
+ RepeatSeqGen(ByteGen(), length=100),
+ RepeatSeqGen(ShortGen(), length=100),
+ RepeatSeqGen(IntegerGen(), length=100),
+ RepeatSeqGen(LongGen(), length=100),
+ RepeatSeqGen(FloatGen(), length=100),
+ RepeatSeqGen(DoubleGen(), length=100),
+ FloatGen().with_special_case(math.nan, 500.0)
+ .with_special_case(math.inf, 500.0),
+ DoubleGen().with_special_case(math.nan, 500.0)
+ .with_special_case(math.inf, 500.0)]
+
+exact_percentile_reduction_data_gen = [
+ [('val', data_gen),
+ ('freq', LongGen(min_val=0, max_val=1000000, nullable=False)
+ .with_special_case(0, weight=100))]
+ for data_gen in exact_percentile_data_gen]
+
+def exact_percentile_reduction(df):
+ return df.selectExpr(
+ 'percentile(val, 0.1)',
+ 'percentile(val, 0)',
+ 'percentile(val, 1)',
+ 'percentile(val, array(0.1))',
+ 'percentile(val, array())',
+ 'percentile(val, array(0.1, 0.5, 0.9))',
+ 'percentile(val, array(0, 0.0001, 0.5, 0.9999, 1))',
+ # There is issue with python data generation that still produces negative values for freq.
+ # See https://github.com/NVIDIA/spark-rapids/issues/9452.
+ # Thus, freq needs to be wrapped in abs.
+ 'percentile(val, 0.1, abs(freq))',
+ 'percentile(val, 0, abs(freq))',
+ 'percentile(val, 1, abs(freq))',
+ 'percentile(val, array(0.1), abs(freq))',
+ 'percentile(val, array(), abs(freq))',
+ 'percentile(val, array(0.1, 0.5, 0.9), abs(freq))',
+ 'percentile(val, array(0, 0.0001, 0.5, 0.9999, 1), abs(freq))'
+ )
+
+@pytest.mark.parametrize('data_gen', exact_percentile_reduction_data_gen, ids=idfn)
+def test_exact_percentile_reduction(data_gen):
+ assert_gpu_and_cpu_are_equal_collect(
+ lambda spark: exact_percentile_reduction(gen_df(spark, data_gen))
+ )
+
+exact_percentile_reduction_cpu_fallback_data_gen = [
+ [('val', data_gen),
+ ('freq', LongGen(min_val=0, max_val=1000000, nullable=False)
+ .with_special_case(0, weight=100))]
+ for data_gen in [IntegerGen(), DoubleGen()]]
+
+@allow_non_gpu('ObjectHashAggregateExec', 'SortAggregateExec', 'ShuffleExchangeExec', 'HashPartitioning',
+ 'AggregateExpression', 'Alias', 'Cast', 'Literal', 'ProjectExec',
+ 'Percentile')
+@pytest.mark.parametrize('data_gen', exact_percentile_reduction_cpu_fallback_data_gen, ids=idfn)
+@pytest.mark.parametrize('replace_mode', ['partial', 'final|complete'], ids=idfn)
+@pytest.mark.parametrize('use_obj_hash_agg', ['false', 'true'], ids=idfn)
+@pytest.mark.xfail(condition=is_databricks104_or_later(), reason='https://github.com/NVIDIA/spark-rapids/issues/9494')
+def test_exact_percentile_reduction_partial_fallback_to_cpu(data_gen, replace_mode,
+ use_obj_hash_agg):
+ cpu_clz, gpu_clz = ['Percentile'], ['GpuPercentileDefault']
+ exist_clz, non_exist_clz = [], []
+ # For aggregations without distinct, Databricks runtime removes the partial Aggregate stage (
+ # map-side combine). There only exists an AggregateExec in Databricks runtimes. So, we need to
+ # set the expected exist_classes according to runtime.
+ if is_databricks_runtime():
+ if replace_mode == 'partial':
+ exist_clz, non_exist_clz = cpu_clz, gpu_clz
+ else:
+ exist_clz, non_exist_clz = gpu_clz, cpu_clz
+ else:
+ exist_clz = cpu_clz + gpu_clz
+
+ assert_cpu_and_gpu_are_equal_collect_with_capture(
+ lambda spark: gen_df(spark, data_gen).selectExpr(
+ 'percentile(val, 0.1)',
+ 'percentile(val, array(0, 0.0001, 0.5, 0.9999, 1))',
+ 'percentile(val, 0.1, abs(freq))',
+ 'percentile(val, array(0, 0.0001, 0.5, 0.9999, 1), abs(freq))'),
+ exist_classes=','.join(exist_clz),
+ non_exist_classes=','.join(non_exist_clz),
+ conf={'spark.rapids.sql.hashAgg.replaceMode': replace_mode,
+ 'spark.sql.execution.useObjectHashAggregateExec': use_obj_hash_agg}
+ )
+
+
+exact_percentile_groupby_data_gen = [
+ [('key', RepeatSeqGen(IntegerGen(), length=100)),
+ ('val', data_gen),
+ ('freq', LongGen(min_val=0, max_val=1000000, nullable=False)
+ .with_special_case(0, weight=100))]
+ for data_gen in exact_percentile_data_gen]
+
+def exact_percentile_groupby(df):
+ return df.groupby('key').agg(
+ f.expr('percentile(val, 0.1)'),
+ f.expr('percentile(val, 0)'),
+ f.expr('percentile(val, 1)'),
+ f.expr('percentile(val, array(0.1))'),
+ f.expr('percentile(val, array())'),
+ f.expr('percentile(val, array(0.1, 0.5, 0.9))'),
+ f.expr('percentile(val, array(0, 0.0001, 0.5, 0.9999, 1))'),
+ # There is issue with python data generation that still produces negative values for freq.
+ # See https://github.com/NVIDIA/spark-rapids/issues/9452.
+ # Thus, freq needs to be wrapped in abs.
+ f.expr('percentile(val, 0.1, abs(freq))'),
+ f.expr('percentile(val, 0, abs(freq))'),
+ f.expr('percentile(val, 1, abs(freq))'),
+ f.expr('percentile(val, array(0.1), abs(freq))'),
+ f.expr('percentile(val, array(), abs(freq))'),
+ f.expr('percentile(val, array(0.1, 0.5, 0.9), abs(freq))'),
+ f.expr('percentile(val, array(0, 0.0001, 0.5, 0.9999, 1), abs(freq))')
+ )
+
+@ignore_order
+@pytest.mark.parametrize('data_gen', exact_percentile_groupby_data_gen, ids=idfn)
+def test_exact_percentile_groupby(data_gen):
+ assert_gpu_and_cpu_are_equal_collect(
+ lambda spark: exact_percentile_groupby(gen_df(spark, data_gen))
+ )
+
+exact_percentile_groupby_cpu_fallback_data_gen = [
+ [('key', RepeatSeqGen(IntegerGen(), length=100)),
+ ('val', data_gen),
+ ('freq', LongGen(min_val=0, max_val=1000000, nullable=False)
+ .with_special_case(0, weight=100))]
+ for data_gen in [IntegerGen(), DoubleGen()]]
+
+@ignore_order
+@allow_non_gpu('ObjectHashAggregateExec', 'SortAggregateExec', 'ShuffleExchangeExec', 'HashPartitioning',
+ 'AggregateExpression', 'Alias', 'Cast', 'Literal', 'ProjectExec',
+ 'Percentile')
+@pytest.mark.parametrize('data_gen', exact_percentile_groupby_cpu_fallback_data_gen, ids=idfn)
+@pytest.mark.parametrize('replace_mode', ['partial', 'final|complete'], ids=idfn)
+@pytest.mark.parametrize('use_obj_hash_agg', ['false', 'true'], ids=idfn)
+@pytest.mark.xfail(condition=is_databricks104_or_later(), reason='https://github.com/NVIDIA/spark-rapids/issues/9494')
+def test_exact_percentile_groupby_partial_fallback_to_cpu(data_gen, replace_mode, use_obj_hash_agg):
+ cpu_clz, gpu_clz = ['Percentile'], ['GpuPercentileDefault']
+ exist_clz, non_exist_clz = [], []
+ # For aggregations without distinct, Databricks runtime removes the partial Aggregate stage (
+ # map-side combine). There only exists an AggregateExec in Databricks runtimes. So, we need to
+ # set the expected exist_classes according to runtime.
+ if is_databricks_runtime():
+ if replace_mode == 'partial':
+ exist_clz, non_exist_clz = cpu_clz, gpu_clz
+ else:
+ exist_clz, non_exist_clz = gpu_clz, cpu_clz
+ else:
+ exist_clz = cpu_clz + gpu_clz
+
+ assert_cpu_and_gpu_are_equal_collect_with_capture(
+ lambda spark: gen_df(spark, data_gen).groupby('key').agg(
+ f.expr('percentile(val, 0.1)'),
+ f.expr('percentile(val, array(0, 0.0001, 0.5, 0.9999, 1))'),
+ f.expr('percentile(val, 0.1, abs(freq))'),
+ f.expr('percentile(val, array(0, 0.0001, 0.5, 0.9999, 1), abs(freq))')),
+ exist_classes=','.join(exist_clz),
+ non_exist_classes=','.join(non_exist_clz),
+ conf={'spark.rapids.sql.hashAgg.replaceMode': replace_mode,
+ 'spark.sql.execution.useObjectHashAggregateExec': use_obj_hash_agg}
+ )
+
+
@ignore_order(local=True)
@allow_non_gpu('ObjectHashAggregateExec', 'ShuffleExchangeExec',
'HashAggregateExec', 'HashPartitioning',
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
index 92469f1768d..5ce6b57259a 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
@@ -3348,13 +3348,13 @@ object GpuOverrides extends Logging {
"Collect a set of unique elements, not supported in reduction",
ExprChecks.fullAgg(
TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 +
- TypeSig.NULL + TypeSig.STRUCT + TypeSig.ARRAY),
+ TypeSig.NULL + TypeSig.STRUCT + TypeSig.ARRAY),
TypeSig.ARRAY.nested(TypeSig.all),
Seq(ParamCheck("input",
(TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 +
- TypeSig.NULL +
- TypeSig.STRUCT +
- TypeSig.ARRAY).nested(),
+ TypeSig.NULL +
+ TypeSig.STRUCT +
+ TypeSig.ARRAY).nested(),
TypeSig.all))),
(c, conf, p, r) => new TypedImperativeAggExprMeta[CollectSet](c, conf, p, r) {
@@ -3422,6 +3422,73 @@ object GpuOverrides extends Logging {
GpuVarianceSamp(childExprs.head, !legacyStatisticalAggregate)
}
}),
+ expr[Percentile](
+ "Aggregation computing exact percentile",
+ ExprChecks.reductionAndGroupByAgg(
+ // The output can be a single number or array depending on whether percentiles param
+ // is a single number or an array.
+ TypeSig.DOUBLE + TypeSig.ARRAY.nested(TypeSig.DOUBLE),
+ TypeSig.DOUBLE + TypeSig.ARRAY.nested(TypeSig.DOUBLE),
+ Seq(
+ // ANSI interval types are new in Spark 3.2.0 and are not yet supported by the
+ // current GPU implementation.
+ ParamCheck("input", TypeSig.integral + TypeSig.fp, TypeSig.integral + TypeSig.fp),
+ ParamCheck("percentage",
+ TypeSig.lit(TypeEnum.DOUBLE) + TypeSig.ARRAY.nested(TypeSig.lit(TypeEnum.DOUBLE)),
+ TypeSig.DOUBLE + TypeSig.ARRAY.nested(TypeSig.DOUBLE)),
+ ParamCheck("frequency",
+ TypeSig.LONG + TypeSig.ARRAY.nested(TypeSig.LONG),
+ TypeSig.LONG + TypeSig.ARRAY.nested(TypeSig.LONG)))),
+ (c, conf, p, r) => new TypedImperativeAggExprMeta[Percentile](c, conf, p, r) {
+ override def tagAggForGpu(): Unit = {
+ // Check if the input percentage can be supported on GPU.
+ GpuOverrides.extractLit(childExprs(1).wrapped.asInstanceOf[Expression]) match {
+ case None =>
+ willNotWorkOnGpu("percentile on GPU only supports literal percentages")
+ case Some(Literal(null, _)) =>
+ willNotWorkOnGpu("percentile on GPU only supports non-null literal percentages")
+ case Some(Literal(a: ArrayData, _)) => {
+ if((0 until a.numElements).exists(a.isNullAt)) {
+ willNotWorkOnGpu(
+ "percentile on GPU does not support percentage arrays containing nulls")
+ }
+ if (a.toDoubleArray().exists(percentage => percentage < 0.0 || percentage > 1.0)) {
+ willNotWorkOnGpu(
+ "percentile requires the input percentages given in the range [0, 1]")
+ }
+ }
+ case Some(_) => // This is fine
+ }
+ }
+
+ override def convertToGpu(childExprs: Seq[Expression]): GpuExpression = {
+ val exprMeta = p.get.asInstanceOf[BaseExprMeta[_]]
+ val isReduction = exprMeta.context match {
+ case ReductionAggExprContext => true
+ case GroupByAggExprContext => false
+ case _ => throw new IllegalStateException(
+ s"Invalid aggregation context: ${exprMeta.context}")
+ }
+ GpuPercentile(childExprs.head, childExprs(1).asInstanceOf[GpuLiteral], childExprs(2),
+ isReduction)
+ }
+ // Declare the data type of the internal buffer so it can be serialized and
+ // deserialized correctly during shuffling.
+ override def aggBufferAttribute: AttributeReference = {
+ val aggBuffer = c.aggBufferAttributes.head
+ val dataType: DataType = ArrayType(StructType(Seq(
+ StructField("value", childExprs.head.dataType),
+ StructField("frequency", LongType))), containsNull = false)
+ aggBuffer.copy(dataType = dataType)(aggBuffer.exprId, aggBuffer.qualifier)
+ }
+
+ override val needsAnsiCheck: Boolean = false
+ override val supportBufferConversion: Boolean = true
+ override def createCpuToGpuBufferConverter(): CpuToGpuAggregateBufferConverter =
+ CpuToGpuPercentileBufferConverter(childExprs.head.dataType)
+ override def createGpuToCpuBufferConverter(): GpuToCpuAggregateBufferConverter =
+ GpuToCpuPercentileBufferConverter(childExprs.head.dataType)
+ }),
expr[ApproximatePercentile](
"Approximate percentile",
ExprChecks.reductionAndGroupByAgg(
diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/GpuPercentile.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/GpuPercentile.scala
new file mode 100644
index 00000000000..ddab05814f6
--- /dev/null
+++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/GpuPercentile.scala
@@ -0,0 +1,280 @@
+/*
+ * Copyright (c) 2023, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.rapids.aggregate
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+
+import ai.rapids.cudf
+import ai.rapids.cudf.{DType, GroupByAggregation, ReductionAggregation}
+import com.nvidia.spark.rapids._
+import com.nvidia.spark.rapids.Arm.{withResource, withResourceIfAllowed}
+import com.nvidia.spark.rapids.RapidsPluginImplicits.ReallyAGpuExpression
+import com.nvidia.spark.rapids.jni.Histogram
+import com.nvidia.spark.rapids.shims.ShimExpression
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, UnsafeArrayData, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+case class CudfHistogram(override val dataType: DataType) extends CudfAggregate {
+ override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar =
+ (input: cudf.ColumnVector) => input.reduce(ReductionAggregation.histogram(), DType.LIST)
+ override lazy val groupByAggregate: GroupByAggregation = GroupByAggregation.histogram()
+ override val name: String = "CudfHistogram"
+}
+
+case class CudfMergeHistogram(override val dataType: DataType)
+ extends CudfAggregate {
+ override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar =
+ (input: cudf.ColumnVector) =>
+ input.getType match {
+ // This is called from updateAggregate in GpuPercentileWithFrequency.
+ case DType.STRUCT => input.reduce(ReductionAggregation.mergeHistogram(), DType.LIST)
+
+ // This is always called from mergeAggregate.
+ case DType.LIST => withResource(input.getChildColumnView(0)) { histogram =>
+ histogram.reduce(ReductionAggregation.mergeHistogram(), DType.LIST)
+ }
+
+ case _ => throw new IllegalStateException("Invalid input in CudfMergeHistogram.")
+ }
+
+ override lazy val groupByAggregate: GroupByAggregation = GroupByAggregation.mergeHistogram()
+ override val name: String = "CudfMergeHistogram"
+}
+
+/**
+ * Perform the final evaluation step to compute percentiles from histograms.
+ */
+case class GpuPercentileEvaluation(childExpr: Expression, percentage: Either[Double, Array[Double]],
+ outputType: DataType, isReduction: Boolean)
+ extends GpuExpression with ShimExpression {
+ override def dataType: DataType = outputType
+ override def prettyName: String = "percentile_evaluation"
+ override def nullable: Boolean = true
+ override def children: Seq[Expression] = Seq(childExpr)
+
+ private lazy val percentageArray = percentage match {
+ case Left(p) => Array(p)
+ case Right(p) => p
+ }
+ private lazy val outputAsList = outputType match {
+ case _: ArrayType => true
+ case _ => false
+ }
+
+ override def columnarEval(batch: ColumnarBatch): GpuColumnVector = {
+ withResourceIfAllowed(childExpr.columnarEval(batch)) { histograms =>
+ val percentiles = Histogram.percentileFromHistogram(histograms.getBase,
+ percentageArray, outputAsList)
+ GpuColumnVector.from(percentiles, outputType)
+ }
+ }
+}
+
+abstract class GpuPercentile(childExpr: Expression, percentageLit: GpuLiteral,
+ isReduction: Boolean)
+ extends GpuAggregateFunction with Serializable {
+ protected lazy val histogramBufferType: DataType =
+ ArrayType(StructType(Seq(StructField("value", childExpr.dataType),
+ StructField("frequency", LongType))),
+ containsNull = false)
+ protected lazy val histogramBuffer: AttributeReference =
+ AttributeReference("histogramBuff", histogramBufferType)()
+ override def aggBufferAttributes: Seq[AttributeReference] = histogramBuffer :: Nil
+
+ override lazy val initialValues: Seq[Expression] =
+ Seq(GpuLiteral.create(new GenericArrayData(Array.empty[Any]), histogramBufferType))
+ override lazy val mergeAggregates: Seq[CudfAggregate] =
+ Seq(CudfMergeHistogram(histogramBufferType))
+ override lazy val evaluateExpression: Expression =
+ GpuPercentileEvaluation(histogramBuffer, percentages, outputType, isReduction)
+
+ override def dataType: DataType = histogramBufferType
+ override def prettyName: String = "percentile"
+ override def nullable: Boolean = true
+ override def children: Seq[Expression] = Seq(childExpr, percentageLit)
+
+ private lazy val (returnPercentileArray, percentages): (Boolean, Either[Double, Array[Double]]) =
+ percentageLit.value match {
+ case null => (false, Right(Array()))
+ case num: Double => (false, Left(num))
+ case arrayData: ArrayData => (true, Right(arrayData.toDoubleArray()))
+ case other => throw new IllegalStateException(s"Invalid percentage expression: $other")
+ }
+ private lazy val outputType: DataType =
+ if (returnPercentileArray) ArrayType(DoubleType, containsNull = false) else DoubleType
+}
+
+/**
+ * Compute percentiles from just the input values.
+ */
+case class GpuPercentileDefault(childExpr: Expression, percentage: GpuLiteral,
+ isReduction: Boolean)
+ extends GpuPercentile(childExpr, percentage, isReduction) {
+ override lazy val inputProjection: Seq[Expression] = Seq(childExpr)
+ override lazy val updateAggregates: Seq[CudfAggregate] = Seq(CudfHistogram(histogramBufferType))
+}
+
+/**
+ * Compute percentiles from the input values associated with frequencies.
+ */
+case class GpuPercentileWithFrequency(childExpr: Expression, percentage: GpuLiteral,
+ frequencyExpr: Expression, isReduction: Boolean)
+ extends GpuPercentile(childExpr, percentage, isReduction) {
+ override lazy val inputProjection: Seq[Expression] = {
+ val outputType: DataType = if(isReduction) {
+ StructType(Seq(StructField("value", childExpr.dataType), StructField("frequency", LongType)))
+ } else {
+ histogramBufferType
+ }
+ Seq(GpuCreateHistogramIfValid(childExpr, frequencyExpr, isReduction, outputType))
+ }
+ override lazy val updateAggregates: Seq[CudfAggregate] =
+ Seq(CudfMergeHistogram(histogramBufferType))
+}
+
+/**
+ * Create a histogram buffer from the input values and frequencies.
+ *
+ * The frequencies are also checked to ensure that they are non-negative. If a negative frequency
+ * exists, an exception will be thrown.
+ */
+case class GpuCreateHistogramIfValid(valuesExpr: Expression, frequenciesExpr: Expression,
+ isReduction: Boolean, outputType: DataType)
+ extends GpuExpression with ShimExpression {
+ override def dataType: DataType = outputType
+ override def prettyName: String = "create_histogram_if_valid"
+ override def nullable: Boolean = false
+ override def children: Seq[Expression] = Seq(valuesExpr, frequenciesExpr)
+
+ override def columnarEval(batch: ColumnarBatch): GpuColumnVector = {
+ withResourceIfAllowed(valuesExpr.columnarEval(batch)) { values =>
+ withResourceIfAllowed(frequenciesExpr.columnarEval(batch)) { frequencies =>
+ // If a negative frequency exists, an exception will be thrown from here.
+ val histograms = Histogram.createHistogramIfValid(values.getBase, frequencies.getBase,
+ /*outputAsLists = */ !isReduction)
+ GpuColumnVector.from(histograms, outputType)
+ }
+ }
+ }
+}
+
+object GpuPercentile{
+ def apply(childExpr: Expression, percentageLit: GpuLiteral, frequencyExpr: Expression,
+ isReduction: Boolean): GpuPercentile = {
+ frequencyExpr match {
+ case GpuLiteral(freq, LongType) if freq == 1 =>
+ GpuPercentileDefault(childExpr, percentageLit, isReduction)
+ case _ =>
+ GpuPercentileWithFrequency(childExpr, percentageLit, frequencyExpr, isReduction)
+ }
+ }
+}
+
+/**
+ * Convert the incoming byte stream received from Spark CPU into internal histogram buffer format.
+ */
+case class CpuToGpuPercentileBufferConverter(elementType: DataType)
+ extends CpuToGpuAggregateBufferConverter {
+ override def createExpression(child: Expression): CpuToGpuBufferTransition =
+ CpuToGpuPercentileBufferTransition(child, elementType)
+}
+
+case class CpuToGpuPercentileBufferTransition(override val child: Expression, elementType: DataType)
+ extends CpuToGpuBufferTransition {
+ override def dataType: DataType =
+ ArrayType(StructType(Seq(StructField("value", elementType),
+ StructField("frequency", LongType))),
+ containsNull = false)
+
+ // Deserialization from the input byte stream into the internal histogram buffer format.
+ override protected def nullSafeEval(input: Any): ArrayData = {
+ val bytes = input.asInstanceOf[Array[Byte]]
+ val bis = new ByteArrayInputStream(bytes)
+ val ins = new DataInputStream(bis)
+
+ // Store a column of STRUCT
+ val histogram = ArrayBuffer[InternalRow]()
+ val row = new UnsafeRow(2)
+
+ try {
+ var sizeOfNextRow = ins.readInt()
+ while (sizeOfNextRow >= 0) {
+ val bs = new Array[Byte](sizeOfNextRow)
+ ins.readFully(bs)
+ row.pointTo(bs, sizeOfNextRow)
+ val element = row.get(0, elementType)
+ val count = row.get(1, LongType).asInstanceOf[Long]
+ histogram.append(InternalRow.apply(element, count))
+ sizeOfNextRow = ins.readInt()
+ }
+ ArrayData.toArrayData(histogram)
+ } finally {
+ ins.close()
+ bis.close()
+ }
+ }
+}
+
+/**
+ * Convert the internal histogram buffer into a byte stream that can be deserialized by Spark CPU.
+ */
+case class GpuToCpuPercentileBufferConverter(elementType: DataType)
+ extends GpuToCpuAggregateBufferConverter {
+ override def createExpression(child: Expression): GpuToCpuBufferTransition =
+ GpuToCpuPercentileBufferTransition(child, elementType)
+}
+
+case class GpuToCpuPercentileBufferTransition(override val child: Expression, elementType: DataType)
+ extends GpuToCpuBufferTransition {
+ // Serialization the internal histogram buffer into a byte array.
+ override protected def nullSafeEval(input: Any): Array[Byte] = {
+ val buffer = new Array[Byte](4 << 10) // 4K
+ val bos = new ByteArrayOutputStream()
+ val out = new DataOutputStream(bos)
+
+ val histogram = input.asInstanceOf[UnsafeArrayData]
+ val projection = UnsafeProjection.create(Array[DataType](elementType, LongType))
+
+ try {
+ (0 until histogram.numElements()).foreach { i =>
+ val row = histogram.getStruct(i, 2)
+ val element = row.get(0, elementType)
+ // The internal histogram buffer may contain null elements.
+ // We need to skip them as the Spark CPU does not process nulls after
+ // the updateAggregates step.
+ if(element!= null) {
+ val unsafeRow = projection.apply(row)
+ out.writeInt(unsafeRow.getSizeInBytes)
+ unsafeRow.writeToStream(out, buffer)
+ }
+ }
+ // Need to write a negative integer to indicate the end of the stream.
+ out.writeInt(-1)
+ out.flush()
+ bos.toByteArray
+ } finally {
+ out.close()
+ bos.close()
+ }
+ }
+}
diff --git a/tools/generated_files/operatorsScore.csv b/tools/generated_files/operatorsScore.csv
index 0c583853490..b904921ca2b 100644
--- a/tools/generated_files/operatorsScore.csv
+++ b/tools/generated_files/operatorsScore.csv
@@ -182,6 +182,7 @@ NthValue,4
OctetLength,4
Or,4
PercentRank,4
+Percentile,4
PivotFirst,4
Pmod,4
PosExplode,4
diff --git a/tools/generated_files/supportedExprs.csv b/tools/generated_files/supportedExprs.csv
index 5b16293d07a..38c504f8cd1 100644
--- a/tools/generated_files/supportedExprs.csv
+++ b/tools/generated_files/supportedExprs.csv
@@ -668,6 +668,14 @@ Min,S,`min`,None,reduction,input,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,PS,NA,PS,NS
Min,S,`min`,None,reduction,result,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,PS,NA,PS,NS
Min,S,`min`,None,window,input,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,NS,NA,NS,NS
Min,S,`min`,None,window,result,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,NS,NA,NS,NS
+Percentile,S,`percentile`,None,aggregation,input,NA,S,S,S,S,S,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
+Percentile,S,`percentile`,None,aggregation,percentage,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA
+Percentile,S,`percentile`,None,aggregation,frequency,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA
+Percentile,S,`percentile`,None,aggregation,result,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA
+Percentile,S,`percentile`,None,reduction,input,NA,S,S,S,S,S,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
+Percentile,S,`percentile`,None,reduction,percentage,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA
+Percentile,S,`percentile`,None,reduction,frequency,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA
+Percentile,S,`percentile`,None,reduction,result,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA
PivotFirst,S, ,None,aggregation,pivotColumn,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,NS,NS,NS,NS
PivotFirst,S, ,None,aggregation,valueColumn,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,NS,NS,NS,NS
PivotFirst,S, ,None,aggregation,result,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,PS,NS,NS,NS