Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent approx_percentile aggregate from being split between CPU and GPU #3862

Merged
merged 21 commits into from
Nov 2, 2021
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,11 @@ Accelerator supports are described below.
<td>S</td>
<td><em>PS<br/>max DECIMAL precision of 18</em></td>
<td>S</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -556,11 +556,11 @@ Accelerator supports are described below.
<td>S</td>
<td><em>PS<br/>max DECIMAL precision of 18</em></td>
<td>S</td>
<td><em>PS<br/>only allowed when aggregate buffers can be converted between CPU and GPU</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions if containing Array or Map as child;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions if containing Array or Map as child;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand All @@ -580,11 +580,11 @@ Accelerator supports are described below.
<td>S</td>
<td><em>PS<br/>max DECIMAL precision of 18</em></td>
<td>S</td>
<td><em>PS<br/>only allowed when aggregate buffers can be converted between CPU and GPU</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions if containing Array or Map as child;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions if containing Array or Map as child;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ def _assert_equal(cpu, gpu, float_check, path):
assert cpu == gpu, "GPU and CPU boolean values are different at {}".format(path)
elif isinstance(cpu, Decimal):
assert cpu == gpu, "GPU and CPU decimal values are different at {}".format(path)
elif isinstance(cpu, bytearray):
assert cpu == gpu, "GPU and CPU bytearray values are different at {}".format(path)
elif (cpu == None):
assert cpu == gpu, "GPU and CPU are not both null at {}".format(path)
else:
Expand Down
26 changes: 23 additions & 3 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,6 @@ def test_hash_groupby_collect_partial_replace_fallback(data_gen,
# Databricks runtime: GPU(Final&Complete) -> CPU(PartialMerge)
'final|partialMerge&partial|final&complete',
]
# TODO: add param of use_obj_hash_agg after https://github.com/NVIDIA/spark-rapids/issues/3367 getting fixed
@ignore_order(local=True)
@allow_non_gpu('ObjectHashAggregateExec', 'SortAggregateExec',
'ShuffleExchangeExec', 'HashPartitioning', 'SortExec',
Expand All @@ -608,11 +607,14 @@ def test_hash_groupby_collect_partial_replace_fallback(data_gen,
@pytest.mark.parametrize('data_gen', _gen_data_for_collect_op, ids=idfn)
@pytest.mark.parametrize('replace_mode', _replace_modes_single_distinct, ids=idfn)
@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
@pytest.mark.parametrize('use_obj_hash_agg', ['false', 'true'], ids=idfn)
def test_hash_groupby_collect_partial_replace_with_distinct_fallback(data_gen,
replace_mode,
aqe_enabled):
aqe_enabled,
use_obj_hash_agg):
conf = {'spark.rapids.sql.hashAgg.replaceMode': replace_mode,
'spark.sql.adaptive.enabled': aqe_enabled}
'spark.sql.adaptive.enabled': aqe_enabled,
'spark.sql.execution.useObjectHashAggregateExec': use_obj_hash_agg}
# test with single Distinct
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: gen_df(spark, data_gen, length=100)
Expand Down Expand Up @@ -1164,6 +1166,24 @@ def test_hash_groupby_approx_percentile_double_scalar():
('v', DoubleGen())], length=100),
0.05)

@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
@ignore_order(local=True)
@allow_non_gpu('TakeOrderedAndProjectExec', 'Alias', 'Cast', 'ObjectHashAggregateExec', 'AggregateExpression',
'ApproximatePercentile', 'Literal', 'ShuffleExchangeExec', 'HashPartitioning', 'CollectLimitExec')
def test_hash_groupby_approx_percentile_partial_fallback_to_cpu(aqe_enabled):
conf = copy_and_update(_approx_percentile_conf, {
'spark.rapids.sql.hashAgg.replaceMode': 'partial',
'spark.sql.adaptive.enabled': aqe_enabled
})

def approx_percentile_query(spark):
df = gen_df(spark, [('k', StringGen(nullable=False)),
('v', DoubleGen())], length=100)
df.createOrReplaceTempView("t")
return spark.sql("select k, approx_percentile(v, array(0.1, 0.2)) from t group by k")

assert_gpu_fallback_collect(lambda spark: approx_percentile_query(spark), 'ApproximatePercentile', conf)

# The percentile approx tests differ from other tests because we do not expect the CPU and GPU to produce the same
# results due to the different algorithms being used. Instead we compute an exact percentile on the CPU and then
# compute approximate percentiles on CPU and GPU and assert that the GPU numbers are accurate within some percentage
Expand Down
16 changes: 16 additions & 0 deletions integration_tests/src/main/python/sort_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,22 @@
DecimalGen(precision=7, scale=-3, nullable=False), DecimalGen(precision=7, scale=3, nullable=False),
DecimalGen(precision=7, scale=7, nullable=False), DecimalGen(precision=12, scale=2, nullable=False)]

@allow_non_gpu('SortExec', 'ShuffleExchangeExec', 'RangePartitioning', 'SortOrder')
@pytest.mark.parametrize('data_gen', [StringGen(nullable=False)], ids=idfn)
@pytest.mark.parametrize('order', [f.col('a').cast(BinaryType())], ids=idfn)
def test_sort_binary_fallback(data_gen, order):
assert_gpu_fallback_collect(
lambda spark : unary_op_df(spark, data_gen).orderBy(order),
"SortExec")

@allow_non_gpu('ProjectExec', 'ShuffleExchangeExec', 'RangePartitioning')
@pytest.mark.parametrize('data_gen', [StringGen(nullable=False)], ids=idfn)
def test_sort_nonbinary_carry_binary(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen)
.withColumn("binary_string", f.col("a").cast(BinaryType()))
.orderBy(f.col('a')))

@pytest.mark.parametrize('data_gen', orderable_gens + orderable_not_null_gen, ids=idfn)
@pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn)
def test_single_orderby(data_gen, order):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ class GpuCustomShuffleReaderMeta(reader: CustomShuffleReaderExec,
rule: DataFromReplacementRule)
extends SparkPlanMeta[CustomShuffleReaderExec](reader, conf, parent, rule) {


override def checkExistingTags(): Unit = {
// CoalesceShufflePartitions performs a transformUp and may replace ShuffleQueryStageExec
// with CustomShuffleReaderExec, causing tags to be copied from ShuffleQueryStageExec to
// CustomShuffleReaderExec, including the "no need to replace ShuffleQueryStageExec" tag.
wrapped.getTagValue(RapidsMeta.gpuSupportedTag)
.foreach(_.diff(cannotBeReplacedReasons.get)
.filterNot(_ == s"there is no need to replace ${classOf[ShuffleQueryStageExec]}")
.foreach(willNotWorkOnGpu))
}

override def tagPlanForGpu(): Unit = {
if (!reader.child.supportsColumnar) {
willNotWorkOnGpu(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3530,9 +3530,13 @@ object GpuOverrides extends Logging {
exec[ObjectHashAggregateExec](
"The backend for hash based aggregations supporting TypedImperativeAggregate functions",
ExecChecks(
// note that binary input is allowed here but there are additional checks later on to
// check that we have can support binary in the context of aggregate buffer conversions
(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 +
TypeSig.MAP + TypeSig.ARRAY + TypeSig.STRUCT)
TypeSig.MAP + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.BINARY)
.nested()
.withPsNote(TypeEnum.BINARY, "only allowed when aggregate buffers can be " +
"converted between CPU and GPU")
.withPsNote(TypeEnum.ARRAY, "not allowed for grouping expressions")
.withPsNote(TypeEnum.MAP, "not allowed for grouping expressions")
.withPsNote(TypeEnum.STRUCT,
Expand All @@ -3543,8 +3547,10 @@ object GpuOverrides extends Logging {
"The backend for sort based aggregations",
ExecChecks(
(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 +
TypeSig.MAP + TypeSig.ARRAY + TypeSig.STRUCT)
TypeSig.MAP + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.BINARY)
.nested()
.withPsNote(TypeEnum.BINARY, "only allowed when aggregate buffers can be " +
"converted between CPU and GPU")
.withPsNote(TypeEnum.ARRAY, "not allowed for grouping expressions")
.withPsNote(TypeEnum.MAP, "not allowed for grouping expressions")
.withPsNote(TypeEnum.STRUCT,
Expand All @@ -3556,7 +3562,7 @@ object GpuOverrides extends Logging {
// The SortOrder TypeSig will govern what types can actually be used as sorting key data type.
// The types below are allowed as inputs and outputs.
ExecChecks(pluginSupportedOrderableSig + (TypeSig.ARRAY + TypeSig.STRUCT +
TypeSig.MAP).nested(), TypeSig.all),
TypeSig.MAP + TypeSig.BINARY).nested(), TypeSig.all),
(sort, conf, p, r) => new GpuSortMeta(sort, conf, p, r)),
exec[ExpandExec](
"The backend for the expand operator",
Expand Down
21 changes: 19 additions & 2 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.QueryStageExec
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
Expand Down Expand Up @@ -53,6 +54,10 @@ final class NoRuleDataFromReplacementRule extends DataFromReplacementRule {
override def getChecks: Option[TypeChecks[_]] = None
}

object RapidsMeta {
val gpuSupportedTag = TreeNodeTag[Set[String]]("rapids.gpu.supported")
}

/**
* Holds metadata about a stage in the physical plan that is separate from the plan itself.
* This is helpful in deciding when to replace part of the plan with a GPU enabled version.
Expand Down Expand Up @@ -111,15 +116,15 @@ abstract class RapidsMeta[INPUT <: BASE, BASE, OUTPUT <: BASE](
*/
def convertToCpu(): BASE = wrapped

private var cannotBeReplacedReasons: Option[mutable.Set[String]] = None
protected var cannotBeReplacedReasons: Option[mutable.Set[String]] = None
private var mustBeReplacedReasons: Option[mutable.Set[String]] = None
private var cannotReplaceAnyOfPlanReasons: Option[mutable.Set[String]] = None
private var shouldBeRemovedReasons: Option[mutable.Set[String]] = None
private var typeConversionReasons: Option[mutable.Set[String]] = None
protected var cannotRunOnGpuBecauseOfSparkPlan: Boolean = false
protected var cannotRunOnGpuBecauseOfCost: Boolean = false

val gpuSupportedTag = TreeNodeTag[Set[String]]("rapids.gpu.supported")
import RapidsMeta.gpuSupportedTag

/**
* Recursively force a section of the plan back onto CPU, stopping once a plan
Expand Down Expand Up @@ -677,9 +682,21 @@ abstract class SparkPlanMeta[INPUT <: SparkPlan](plan: INPUT,
willNotWorkOnGpu("not all data writing commands can be replaced")
}

checkExistingTags()

tagPlanForGpu()
}

/**
* When AQE is enabled and we are planning a new query stage, we need to look at meta-data
* previously stored on the spark plan to determine whether this operator can run on GPU
*/
def checkExistingTags(): Unit = {
wrapped.getTagValue(RapidsMeta.gpuSupportedTag)
.foreach(_.diff(cannotBeReplacedReasons.get)
.foreach(willNotWorkOnGpu))
}

/**
* Called to verify that this plan will work on the GPU. Generic checks will have already been
* done. In general this method should only tag this operator as bad. If it needs to tag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,6 @@ class GpuBroadcastMeta(
"with a GPU version of BroadcastHashJoinExec or BroadcastNestedLoopJoinExec")
}
}
// when AQE is enabled and we are planning a new query stage, we need to look at meta-data
// previously stored on the spark plan to determine whether this exchange can run on GPU
wrapped.getTagValue(gpuSupportedTag).foreach(_.foreach(willNotWorkOnGpu))
}

override def convertToGpu(): GpuExec = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ class GpuShuffleMeta(
childPlans.head.availableRuntimeDataTransition

override def tagPlanForGpu(): Unit = {
// when AQE is enabled and we are planning a new query stage, we need to look at meta-data
// previously stored on the spark plan to determine whether this exchange can run on GPU
wrapped.getTagValue(gpuSupportedTag).foreach(_.foreach(willNotWorkOnGpu))

shuffle.outputPartitioning match {
case _: RoundRobinPartitioning
Expand Down