Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent approx_percentile aggregate from being split between CPU and GPU #3862

Merged
merged 21 commits into from
Nov 2, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,31 @@ def test_hash_groupby_approx_percentile_double_scalar():
('v', DoubleGen())], length=100),
0.05)

@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
@ignore_order(local=True)
@allow_non_gpu('TakeOrderedAndProjectExec', 'Alias', 'Cast', 'ObjectHashAggregateExec', 'AggregateExpression',
'ApproximatePercentile', 'Literal', 'ShuffleExchangeExec', 'HashPartitioning', 'CollectLimitExec')
def test_hash_groupby_approx_percentile_partial_fallback_to_cpu(aqe_enabled):
conf = copy_and_update(_approx_percentile_conf, {
'spark.sql.adaptive.enabled': aqe_enabled,
'spark.rapids.sql.explain': 'ALL'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this test work in a similar way to the assert_cpu_and_gpu_are_equal_collect_with_capture, where a list of "exist" and "non_exist" classes are used to assert that the query has indeed fallen back.

Thinking of the case if the cast gets "pushed up" to a projection after the hash agg in the future.

})

def create_and_show_df(spark):
df = gen_df(spark, [('k', StringGen(nullable=False)),
('v', DoubleGen())], length=100)
df.createOrReplaceTempView("t")
df2 = spark.sql("SELECT k, approx_percentile(v, array(0.1, 0.2)) from t group by k")

# the "show" introduces a `CAST(approx_percentile(...) AS string)` on the final aggregate and this is
# not supported on GPU so falls back to CPU and the purpose of this test is to make sure that the
# partial aggregate also falls back to CPU
df2.show()

return df2

run_with_cpu_and_gpu(create_and_show_df, 'COLLECT', conf)

# The percentile approx tests differ from other tests because we do not expect the CPU and GPU to produce the same
# results due to the different algorithms being used. Instead we compute an exact percentile on the CPU and then
# compute approximate percentiles on CPU and GPU and assert that the GPU numbers are accurate within some percentage
Expand Down
11 changes: 11 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.QueryStageExec
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
Expand Down Expand Up @@ -627,6 +628,16 @@ abstract class SparkPlanMeta[INPUT <: SparkPlan](plan: INPUT,
}
}

def recursivelyCheckTags() {
if (wrapped.isInstanceOf[QueryStageExec] ||
ShimLoader.getSparkShims.isCustomReaderExec(wrapped)) {
// stop recursion once we hit an already-executed query stage or a reader for it
} else {
wrapped.getTagValue(gpuSupportedTag).foreach(_.foreach(willNotWorkOnGpu))
childPlans.foreach(_.recursivelyCheckTags())
}
}

/**
* Run rules that happen for the entire tree after it has been tagged initially.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ class GpuBroadcastMeta(
}
// when AQE is enabled and we are planning a new query stage, we need to look at meta-data
// previously stored on the spark plan to determine whether this exchange can run on GPU
wrapped.getTagValue(gpuSupportedTag).foreach(_.foreach(willNotWorkOnGpu))
recursivelyCheckTags()
}

override def convertToGpu(): GpuExec = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class GpuShuffleMeta(
override def tagPlanForGpu(): Unit = {
// when AQE is enabled and we are planning a new query stage, we need to look at meta-data
// previously stored on the spark plan to determine whether this exchange can run on GPU
wrapped.getTagValue(gpuSupportedTag).foreach(_.foreach(willNotWorkOnGpu))
recursivelyCheckTags()

shuffle.outputPartitioning match {
case _: RoundRobinPartitioning
Expand Down