Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Sort on nested struct #3034

Merged
merged 3 commits into from
Jul 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ Accelerator supports are described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -14126,7 +14126,7 @@ Accelerator support is described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, STRUCT, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand All @@ -14147,7 +14147,7 @@ Accelerator support is described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, STRUCT, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -20873,7 +20873,7 @@ as `a` don't show up in the table. They are controlled by the rules for
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><em>PS* (Only supported for a single partition; missing nested BINARY, CALENDAR, ARRAY, STRUCT, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down
30 changes: 24 additions & 6 deletions integration_tests/src/main/python/sort_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ def test_single_orderby(data_gen, order):
@pytest.mark.parametrize('stable_sort', ['STABLE', 'OUTOFCORE'])
@pytest.mark.parametrize('data_gen', [
pytest.param(all_basic_struct_gen),
pytest.param(StructGen([['child0', all_basic_struct_gen]]),
revans2 marked this conversation as resolved.
Show resolved Hide resolved
marks=pytest.mark.xfail(reason='second-level structs are not supported')),
pytest.param(StructGen([['child0', all_basic_struct_gen]])),
pytest.param(ArrayGen(string_gen),
marks=pytest.mark.xfail(reason="arrays are not supported")),
pytest.param(MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen),
Expand All @@ -68,6 +67,23 @@ def test_single_nested_orderby_plain(data_gen, order, shuffle_parts, stable_sort
}
})

# only default null ordering for direction is supported for nested types
@pytest.mark.allow_non_gpu('SortExec', 'ShuffleExchangeExec', 'RangePartitioning', 'SortOrder')
@pytest.mark.parametrize('data_gen', [
pytest.param(all_basic_struct_gen),
pytest.param(StructGen([['child0', all_basic_struct_gen]])),
], ids=idfn)
@pytest.mark.parametrize('order', [
pytest.param(f.col('a').asc_nulls_last()),
pytest.param(f.col('a').desc_nulls_first()),
], ids=idfn)
def test_single_nested_orderby_fallback_for_nullorder(data_gen, order):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).orderBy(order),
conf = {
**allow_negative_scale_of_decimal_conf,
})

# SPARK CPU itself has issue with negative scale for take ordered and project
orderable_without_neg_decimal = [n for n in (orderable_gens + orderable_not_null_gen) if not (isinstance(n, DecimalGen) and n.scale < 0)]
@pytest.mark.parametrize('data_gen', orderable_without_neg_decimal, ids=idfn)
Expand All @@ -78,8 +94,7 @@ def test_single_orderby_with_limit(data_gen, order):

@pytest.mark.parametrize('data_gen', [
pytest.param(all_basic_struct_gen),
pytest.param(StructGen([['child0', all_basic_struct_gen]]),
marks=pytest.mark.xfail(reason='second-level structs are not supported')),
pytest.param(StructGen([['child0', all_basic_struct_gen]])),
pytest.param(ArrayGen(string_gen),
marks=pytest.mark.xfail(reason="arrays are not supported")),
pytest.param(MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen),
Expand Down Expand Up @@ -110,7 +125,10 @@ def test_single_sort_in_part(data_gen, order):
lambda spark : unary_op_df(spark, data_gen, num_slices=12).sortWithinPartitions(order),
conf = allow_negative_scale_of_decimal_conf)

@pytest.mark.parametrize('data_gen', [all_basic_struct_gen], ids=idfn)
@pytest.mark.parametrize('data_gen', [
pytest.param(all_basic_struct_gen),
pytest.param(StructGen([['child0', all_basic_struct_gen]])),
], ids=idfn)
@pytest.mark.parametrize('order', [
pytest.param(f.col('a').asc()),
pytest.param(f.col('a').asc_nulls_first()),
Expand Down Expand Up @@ -194,7 +212,7 @@ def test_single_orderby_with_skew(data_gen):


# We are not trying all possibilities, just doing a few with numbers so the query works.
@pytest.mark.parametrize('data_gen', [all_basic_struct_gen], ids=idfn)
@pytest.mark.parametrize('data_gen', [all_basic_struct_gen, StructGen([['child0', all_basic_struct_gen]])], ids=idfn)
@pytest.mark.parametrize('stable_sort', ['STABLE', 'OUTOFCORE'], ids=idfn)
def test_single_nested_orderby_with_skew(data_gen, stable_sort):
sort_conf = {'spark.rapids.sql.stableSort.enabled': stable_sort == 'STABLE'}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1916,11 +1916,11 @@ object GpuOverrides {
expr[SortOrder](
"Sort order",
ExprChecks.projectOnly(
pluginSupportedOrderableSig,
pluginSupportedOrderableSig + TypeSig.STRUCT.nested(),
jlowe marked this conversation as resolved.
Show resolved Hide resolved
TypeSig.orderable,
Seq(ParamCheck(
"input",
pluginSupportedOrderableSig,
pluginSupportedOrderableSig + TypeSig.STRUCT.nested(),
TypeSig.orderable))),
(sortOrder, conf, p, r) => new BaseExprMeta[SortOrder](sortOrder, conf, p, r) {
override def tagExprForGpu(): Unit = {
Expand Down Expand Up @@ -2775,8 +2775,7 @@ object GpuOverrides {
part[RangePartitioning](
"Range partitioning",
PartChecks(RepeatingParamCheck("order_key",
pluginSupportedOrderableSig +
TypeSig.psNote(TypeEnum.STRUCT, "Only supported for a single partition"),
pluginSupportedOrderableSig + TypeSig.STRUCT.nested(),
TypeSig.orderable)),
(rp, conf, p, r) => new PartMeta[RangePartitioning](rp, conf, p, r) {
override val childExprs: Seq[BaseExprMeta[_]] =
Expand Down Expand Up @@ -2903,7 +2902,7 @@ object GpuOverrides {
}),
exec[TakeOrderedAndProjectExec](
"Take the first limit elements as defined by the sortOrder, and do projection if needed.",
ExecChecks(pluginSupportedOrderableSig, TypeSig.all),
ExecChecks(pluginSupportedOrderableSig + TypeSig.STRUCT.nested(), TypeSig.all),
(takeExec, conf, p, r) =>
new SparkPlanMeta[TakeOrderedAndProjectExec](takeExec, conf, p, r) {
val sortOrder: Seq[BaseExprMeta[SortOrder]] =
Expand Down