Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't do an extra shuffle in some TopN cases #2536

Merged
merged 1 commit into from
Jun 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions integration_tests/src/main/python/sort_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ def test_multi_orderby_with_limit(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).orderBy(f.col('a'), f.col('b').desc()).limit(100))

# We added in a partitioning optimization to take_ordered_and_project
# This should trigger it.
@pytest.mark.parametrize('data_gen', orderable_gens_sort_without_neg_decimal, ids=idfn)
def test_multi_orderby_with_limit_single_part(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).coalesce(1).orderBy(f.col('a'), f.col('b').desc()).limit(100))

# We are not trying all possibilities, just doing a few with numbers so the query works.
@pytest.mark.parametrize('data_gen', [byte_gen, long_gen, float_gen], ids=idfn)
def test_orderby_with_processing(data_gen):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2858,16 +2858,23 @@ object GpuOverrides {
override val childExprs: Seq[BaseExprMeta[_]] = sortOrder ++ projectList

override def convertToGpu(): GpuExec = {
// To avoid metrics confusion we split a single stage up into multiple parts
// To avoid metrics confusion we split a single stage up into multiple parts but only
// if there are multiple partitions to make it worth doing.
val so = sortOrder.map(_.convertToGpu().asInstanceOf[SortOrder])
GpuTopN(takeExec.limit,
so,
projectList.map(_.convertToGpu().asInstanceOf[NamedExpression]),
ShimLoader.getSparkShims.getGpuShuffleExchangeExec(GpuSinglePartitioning,
GpuTopN(takeExec.limit,
so,
takeExec.child.output,
childPlans.head.convertIfNeeded())))
if (takeExec.child.outputPartitioning.numPartitions == 1) {
GpuTopN(takeExec.limit, so,
projectList.map(_.convertToGpu().asInstanceOf[NamedExpression]),
childPlans.head.convertIfNeeded())
} else {
GpuTopN(takeExec.limit,
so,
projectList.map(_.convertToGpu().asInstanceOf[NamedExpression]),
ShimLoader.getSparkShims.getGpuShuffleExchangeExec(GpuSinglePartitioning,
GpuTopN(takeExec.limit,
so,
takeExec.child.output,
childPlans.head.convertIfNeeded())))
}
}
}),
exec[LocalLimitExec](
Expand Down