Skip to content

Commit

Permalink
Prevent approx_percentile aggregate from being split between CPU and …
Browse files Browse the repository at this point in the history
…GPU (#3862)

* Implement fix and basic test

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* improve test based on PR feedback

* check tags more consistently

* Add test that does not depend on CAST of array falling back to CPU

* add license header

* simplify test to use spark.rapids.sql.hashAgg.replaceMode

* Update comments

* revert plugin changes

* fix some regressions

* WIP temporarily allow ObjectHashAggregate/SortAggregate/Sort to allow binary input to see what other issues remain

* scalastyle

* add placeholders for BinaryType checks

* ps notes and type checks

* enable more tests

* remove redundant and untested type check

* add test for sort fallback to cpu with binary input

* test for SortExec with BinaryType

* revert changes to aggregate.scala

* remove ps note for SortExec BinaryType
  • Loading branch information
andygrove authored Nov 2, 2021
1 parent 3949168 commit e03c66b
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 26 deletions.
24 changes: 12 additions & 12 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,11 @@ Accelerator supports are described below.
<td>S</td>
<td><em>PS<br/>max DECIMAL precision of 18</em></td>
<td>S</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -556,11 +556,11 @@ Accelerator supports are described below.
<td>S</td>
<td><em>PS<br/>max DECIMAL precision of 18</em></td>
<td>S</td>
<td><em>PS<br/>only allowed when aggregate buffers can be converted between CPU and GPU</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions if containing Array or Map as child;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions if containing Array or Map as child;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand All @@ -580,11 +580,11 @@ Accelerator supports are described below.
<td>S</td>
<td><em>PS<br/>max DECIMAL precision of 18</em></td>
<td>S</td>
<td><em>PS<br/>only allowed when aggregate buffers can be converted between CPU and GPU</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions if containing Array or Map as child;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions if containing Array or Map as child;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ def _assert_equal(cpu, gpu, float_check, path):
assert cpu == gpu, "GPU and CPU boolean values are different at {}".format(path)
elif isinstance(cpu, Decimal):
assert cpu == gpu, "GPU and CPU decimal values are different at {}".format(path)
elif isinstance(cpu, bytearray):
assert cpu == gpu, "GPU and CPU bytearray values are different at {}".format(path)
elif (cpu == None):
assert cpu == gpu, "GPU and CPU are not both null at {}".format(path)
else:
Expand Down
26 changes: 23 additions & 3 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,6 @@ def test_hash_groupby_collect_partial_replace_fallback(data_gen,
# Databricks runtime: GPU(Final&Complete) -> CPU(PartialMerge)
'final|partialMerge&partial|final&complete',
]
# TODO: add param of use_obj_hash_agg after https://github.com/NVIDIA/spark-rapids/issues/3367 getting fixed
@ignore_order(local=True)
@allow_non_gpu('ObjectHashAggregateExec', 'SortAggregateExec',
'ShuffleExchangeExec', 'HashPartitioning', 'SortExec',
Expand All @@ -608,11 +607,14 @@ def test_hash_groupby_collect_partial_replace_fallback(data_gen,
@pytest.mark.parametrize('data_gen', _gen_data_for_collect_op, ids=idfn)
@pytest.mark.parametrize('replace_mode', _replace_modes_single_distinct, ids=idfn)
@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
@pytest.mark.parametrize('use_obj_hash_agg', ['false', 'true'], ids=idfn)
def test_hash_groupby_collect_partial_replace_with_distinct_fallback(data_gen,
replace_mode,
aqe_enabled):
aqe_enabled,
use_obj_hash_agg):
conf = {'spark.rapids.sql.hashAgg.replaceMode': replace_mode,
'spark.sql.adaptive.enabled': aqe_enabled}
'spark.sql.adaptive.enabled': aqe_enabled,
'spark.sql.execution.useObjectHashAggregateExec': use_obj_hash_agg}
# test with single Distinct
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: gen_df(spark, data_gen, length=100)
Expand Down Expand Up @@ -1164,6 +1166,24 @@ def test_hash_groupby_approx_percentile_double_scalar():
('v', DoubleGen())], length=100),
0.05)

@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
@ignore_order(local=True)
@allow_non_gpu('TakeOrderedAndProjectExec', 'Alias', 'Cast', 'ObjectHashAggregateExec', 'AggregateExpression',
'ApproximatePercentile', 'Literal', 'ShuffleExchangeExec', 'HashPartitioning', 'CollectLimitExec')
def test_hash_groupby_approx_percentile_partial_fallback_to_cpu(aqe_enabled):
conf = copy_and_update(_approx_percentile_conf, {
'spark.rapids.sql.hashAgg.replaceMode': 'partial',
'spark.sql.adaptive.enabled': aqe_enabled
})

def approx_percentile_query(spark):
df = gen_df(spark, [('k', StringGen(nullable=False)),
('v', DoubleGen())], length=100)
df.createOrReplaceTempView("t")
return spark.sql("select k, approx_percentile(v, array(0.1, 0.2)) from t group by k")

assert_gpu_fallback_collect(lambda spark: approx_percentile_query(spark), 'ApproximatePercentile', conf)

# The percentile approx tests differ from other tests because we do not expect the CPU and GPU to produce the same
# results due to the different algorithms being used. Instead we compute an exact percentile on the CPU and then
# compute approximate percentiles on CPU and GPU and assert that the GPU numbers are accurate within some percentage
Expand Down
16 changes: 16 additions & 0 deletions integration_tests/src/main/python/sort_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,22 @@
DecimalGen(precision=7, scale=-3, nullable=False), DecimalGen(precision=7, scale=3, nullable=False),
DecimalGen(precision=7, scale=7, nullable=False), DecimalGen(precision=12, scale=2, nullable=False)]

@allow_non_gpu('SortExec', 'ShuffleExchangeExec', 'RangePartitioning', 'SortOrder')
@pytest.mark.parametrize('data_gen', [StringGen(nullable=False)], ids=idfn)
@pytest.mark.parametrize('order', [f.col('a').cast(BinaryType())], ids=idfn)
def test_sort_binary_fallback(data_gen, order):
assert_gpu_fallback_collect(
lambda spark : unary_op_df(spark, data_gen).orderBy(order),
"SortExec")

@allow_non_gpu('ProjectExec', 'ShuffleExchangeExec', 'RangePartitioning')
@pytest.mark.parametrize('data_gen', [StringGen(nullable=False)], ids=idfn)
def test_sort_nonbinary_carry_binary(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen)
.withColumn("binary_string", f.col("a").cast(BinaryType()))
.orderBy(f.col('a')))

@pytest.mark.parametrize('data_gen', orderable_gens + orderable_not_null_gen, ids=idfn)
@pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn)
def test_single_orderby(data_gen, order):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ class GpuCustomShuffleReaderMeta(reader: CustomShuffleReaderExec,
rule: DataFromReplacementRule)
extends SparkPlanMeta[CustomShuffleReaderExec](reader, conf, parent, rule) {


override def checkExistingTags(): Unit = {
// CoalesceShufflePartitions performs a transformUp and may replace ShuffleQueryStageExec
// with CustomShuffleReaderExec, causing tags to be copied from ShuffleQueryStageExec to
// CustomShuffleReaderExec, including the "no need to replace ShuffleQueryStageExec" tag.
wrapped.getTagValue(RapidsMeta.gpuSupportedTag)
.foreach(_.diff(cannotBeReplacedReasons.get)
.filterNot(_ == s"there is no need to replace ${classOf[ShuffleQueryStageExec]}")
.foreach(willNotWorkOnGpu))
}

override def tagPlanForGpu(): Unit = {
if (!reader.child.supportsColumnar) {
willNotWorkOnGpu(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3528,9 +3528,13 @@ object GpuOverrides extends Logging {
exec[ObjectHashAggregateExec](
"The backend for hash based aggregations supporting TypedImperativeAggregate functions",
ExecChecks(
// note that binary input is allowed here but there are additional checks later on to
// check that we have can support binary in the context of aggregate buffer conversions
(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 +
TypeSig.MAP + TypeSig.ARRAY + TypeSig.STRUCT)
TypeSig.MAP + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.BINARY)
.nested()
.withPsNote(TypeEnum.BINARY, "only allowed when aggregate buffers can be " +
"converted between CPU and GPU")
.withPsNote(TypeEnum.ARRAY, "not allowed for grouping expressions")
.withPsNote(TypeEnum.MAP, "not allowed for grouping expressions")
.withPsNote(TypeEnum.STRUCT,
Expand All @@ -3541,8 +3545,10 @@ object GpuOverrides extends Logging {
"The backend for sort based aggregations",
ExecChecks(
(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 +
TypeSig.MAP + TypeSig.ARRAY + TypeSig.STRUCT)
TypeSig.MAP + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.BINARY)
.nested()
.withPsNote(TypeEnum.BINARY, "only allowed when aggregate buffers can be " +
"converted between CPU and GPU")
.withPsNote(TypeEnum.ARRAY, "not allowed for grouping expressions")
.withPsNote(TypeEnum.MAP, "not allowed for grouping expressions")
.withPsNote(TypeEnum.STRUCT,
Expand All @@ -3554,7 +3560,7 @@ object GpuOverrides extends Logging {
// The SortOrder TypeSig will govern what types can actually be used as sorting key data type.
// The types below are allowed as inputs and outputs.
ExecChecks(pluginSupportedOrderableSig + (TypeSig.ARRAY + TypeSig.STRUCT +
TypeSig.MAP).nested(), TypeSig.all),
TypeSig.MAP + TypeSig.BINARY).nested(), TypeSig.all),
(sort, conf, p, r) => new GpuSortMeta(sort, conf, p, r)),
exec[ExpandExec](
"The backend for the expand operator",
Expand Down
21 changes: 19 additions & 2 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.QueryStageExec
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
Expand Down Expand Up @@ -53,6 +54,10 @@ final class NoRuleDataFromReplacementRule extends DataFromReplacementRule {
override def getChecks: Option[TypeChecks[_]] = None
}

object RapidsMeta {
val gpuSupportedTag = TreeNodeTag[Set[String]]("rapids.gpu.supported")
}

/**
* Holds metadata about a stage in the physical plan that is separate from the plan itself.
* This is helpful in deciding when to replace part of the plan with a GPU enabled version.
Expand Down Expand Up @@ -111,15 +116,15 @@ abstract class RapidsMeta[INPUT <: BASE, BASE, OUTPUT <: BASE](
*/
def convertToCpu(): BASE = wrapped

private var cannotBeReplacedReasons: Option[mutable.Set[String]] = None
protected var cannotBeReplacedReasons: Option[mutable.Set[String]] = None
private var mustBeReplacedReasons: Option[mutable.Set[String]] = None
private var cannotReplaceAnyOfPlanReasons: Option[mutable.Set[String]] = None
private var shouldBeRemovedReasons: Option[mutable.Set[String]] = None
private var typeConversionReasons: Option[mutable.Set[String]] = None
protected var cannotRunOnGpuBecauseOfSparkPlan: Boolean = false
protected var cannotRunOnGpuBecauseOfCost: Boolean = false

val gpuSupportedTag = TreeNodeTag[Set[String]]("rapids.gpu.supported")
import RapidsMeta.gpuSupportedTag

/**
* Recursively force a section of the plan back onto CPU, stopping once a plan
Expand Down Expand Up @@ -677,9 +682,21 @@ abstract class SparkPlanMeta[INPUT <: SparkPlan](plan: INPUT,
willNotWorkOnGpu("not all data writing commands can be replaced")
}

checkExistingTags()

tagPlanForGpu()
}

/**
* When AQE is enabled and we are planning a new query stage, we need to look at meta-data
* previously stored on the spark plan to determine whether this operator can run on GPU
*/
def checkExistingTags(): Unit = {
wrapped.getTagValue(RapidsMeta.gpuSupportedTag)
.foreach(_.diff(cannotBeReplacedReasons.get)
.foreach(willNotWorkOnGpu))
}

/**
* Called to verify that this plan will work on the GPU. Generic checks will have already been
* done. In general this method should only tag this operator as bad. If it needs to tag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,6 @@ class GpuBroadcastMeta(
"with a GPU version of BroadcastHashJoinExec or BroadcastNestedLoopJoinExec")
}
}
// when AQE is enabled and we are planning a new query stage, we need to look at meta-data
// previously stored on the spark plan to determine whether this exchange can run on GPU
wrapped.getTagValue(gpuSupportedTag).foreach(_.foreach(willNotWorkOnGpu))
}

override def convertToGpu(): GpuExec = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ class GpuShuffleMeta(
childPlans.head.availableRuntimeDataTransition

override def tagPlanForGpu(): Unit = {
// when AQE is enabled and we are planning a new query stage, we need to look at meta-data
// previously stored on the spark plan to determine whether this exchange can run on GPU
wrapped.getTagValue(gpuSupportedTag).foreach(_.foreach(willNotWorkOnGpu))

shuffle.outputPartitioning match {
case _: RoundRobinPartitioning
Expand Down

0 comments on commit e03c66b

Please sign in to comment.