Skip to content

Commit

Permalink
Don't do an extra shuffle in some TopN cases (#2536)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Jun 1, 2021
1 parent a44211c commit 061095f
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 9 deletions.
7 changes: 7 additions & 0 deletions integration_tests/src/main/python/sort_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ def test_multi_orderby_with_limit(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).orderBy(f.col('a'), f.col('b').desc()).limit(100))

# We added in a partitioning optimization to take_ordered_and_project
# This should trigger it.
@pytest.mark.parametrize('data_gen', orderable_gens_sort_without_neg_decimal, ids=idfn)
def test_multi_orderby_with_limit_single_part(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).coalesce(1).orderBy(f.col('a'), f.col('b').desc()).limit(100))

# We are not trying all possibilities, just doing a few with numbers so the query works.
@pytest.mark.parametrize('data_gen', [byte_gen, long_gen, float_gen], ids=idfn)
def test_orderby_with_processing(data_gen):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2850,16 +2850,23 @@ object GpuOverrides {
override val childExprs: Seq[BaseExprMeta[_]] = sortOrder ++ projectList

override def convertToGpu(): GpuExec = {
// To avoid metrics confusion we split a single stage up into multiple parts
// To avoid metrics confusion we split a single stage up into multiple parts but only
// if there are multiple partitions to make it worth doing.
val so = sortOrder.map(_.convertToGpu().asInstanceOf[SortOrder])
GpuTopN(takeExec.limit,
so,
projectList.map(_.convertToGpu().asInstanceOf[NamedExpression]),
ShimLoader.getSparkShims.getGpuShuffleExchangeExec(GpuSinglePartitioning,
GpuTopN(takeExec.limit,
so,
takeExec.child.output,
childPlans.head.convertIfNeeded())))
if (takeExec.child.outputPartitioning.numPartitions == 1) {
GpuTopN(takeExec.limit, so,
projectList.map(_.convertToGpu().asInstanceOf[NamedExpression]),
childPlans.head.convertIfNeeded())
} else {
GpuTopN(takeExec.limit,
so,
projectList.map(_.convertToGpu().asInstanceOf[NamedExpression]),
ShimLoader.getSparkShims.getGpuShuffleExchangeExec(GpuSinglePartitioning,
GpuTopN(takeExec.limit,
so,
takeExec.child.output,
childPlans.head.convertIfNeeded())))
}
}
}),
exec[LocalLimitExec](
Expand Down

0 comments on commit 061095f

Please sign in to comment.