Skip to content

Commit

Permalink
Improve test coverage for sorting structs (#2507)
Browse files Browse the repository at this point in the history
* Improve test coverage for sorting structs

Relates to #1607

Signed-off-by: Gera Shegalov <gera@apache.org>

* cudf fix
  • Loading branch information
gerashegalov authored May 27, 2021
1 parent 3da9ba9 commit 1cd417e
Showing 1 changed file with 58 additions and 10 deletions.
68 changes: 58 additions & 10 deletions integration_tests/src/main/python/sort_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from asserts import assert_gpu_and_cpu_are_equal_collect
from data_gen import *
from marks import *
from pyspark.sql.types import *
import pyspark.sql.functions as f
from spark_session import is_before_spark_311
Expand All @@ -38,10 +37,7 @@ def test_single_orderby(data_gen, order):
pytest.param(1),
pytest.param(200)
])
@pytest.mark.parametrize('stable_sort', [
pytest.param(True),
pytest.param(False)
])
@pytest.mark.parametrize('stable_sort', ['STABLE', 'OUTOFCORE'])
@pytest.mark.parametrize('data_gen', [
pytest.param(all_basic_struct_gen),
pytest.param(StructGen([['child0', all_basic_struct_gen]]),
Expand All @@ -68,7 +64,7 @@ def test_single_nested_orderby_plain(data_gen, order, shuffle_parts, stable_sort
**allow_negative_scale_of_decimal_conf,
**{
'spark.sql.shuffle.partitions': shuffle_parts,
'spark.rapids.sql.stableSort.enabled': stable_sort
'spark.rapids.sql.stableSort.enabled': stable_sort == 'STABLE'
}
})

Expand Down Expand Up @@ -113,12 +109,31 @@ def test_single_sort_in_part(data_gen, order):
lambda spark : unary_op_df(spark, data_gen).sortWithinPartitions(order),
conf = allow_negative_scale_of_decimal_conf)


@pytest.mark.parametrize('data_gen', [all_basic_struct_gen], ids=idfn)
@pytest.mark.parametrize('order', [
pytest.param(f.col('a').asc()),
pytest.param(f.col('a').asc_nulls_first()),
pytest.param(f.col('a').asc_nulls_last(),
marks=pytest.mark.xfail(reason='opposite null order not supported')),
pytest.param(f.col('a').desc()),
pytest.param(f.col('a').desc_nulls_first(),
marks=pytest.mark.xfail(reason='opposite null order not supported')),
pytest.param(f.col('a').desc_nulls_last()),
], ids=idfn)
@pytest.mark.parametrize('stable_sort', ['STABLE', 'OUTOFCORE'], ids=idfn)
def test_single_nested_sort_in_part(data_gen, order, stable_sort):
sort_conf = {'spark.rapids.sql.stableSort.enabled': stable_sort == 'STABLE'}
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).sortWithinPartitions(order),
conf={**allow_negative_scale_of_decimal_conf, **sort_conf})

orderable_gens_sort = [byte_gen, short_gen, int_gen, long_gen,
pytest.param(float_gen, marks=pytest.mark.xfail(condition=is_before_spark_311(),
reason='Spark has -0.0 < 0.0 before Spark 3.1')),
pytest.param(double_gen, marks=pytest.mark.xfail(condition=is_before_spark_311(),
reason='Spark has -0.0 < 0.0 before Spark 3.1')),
boolean_gen, timestamp_gen, date_gen, string_gen, null_gen] + decimal_gens
boolean_gen, timestamp_gen, date_gen, string_gen, null_gen, StructGen([('child0', long_gen)])] + decimal_gens
@pytest.mark.parametrize('data_gen', orderable_gens_sort, ids=idfn)
def test_multi_orderby(data_gen):
assert_gpu_and_cpu_are_equal_collect(
Expand Down Expand Up @@ -146,6 +161,16 @@ def test_orderby_with_processing_and_limit(data_gen):
# avoid ambiguity in the order by statement for floating point by including a as a backup ordering column
lambda spark : unary_op_df(spark, data_gen).orderBy(f.lit(100) - f.col('a'), f.col('a')).limit(100))


# We are not trying all possibilities, just doing a few with numbers so the query works.
@pytest.mark.parametrize('data_gen', [StructGen([('child0', long_gen)])], ids=idfn)
def test_single_nested_orderby_with_processing_and_limit(data_gen):
assert_gpu_and_cpu_are_equal_collect(
# avoid ambiguity in the order by statement for floating point by including a as a backup ordering column
lambda spark : unary_op_df(spark, data_gen)\
.orderBy(f.struct(f.lit(100) - f.col('a.child0')), f.col('a'))\
.limit(100))

# We are not trying all possibilities, just doing a few with numbers so the query works.
@pytest.mark.parametrize('data_gen', [byte_gen, long_gen, float_gen], ids=idfn)
def test_single_orderby_with_skew(data_gen):
Expand All @@ -160,15 +185,38 @@ def test_single_orderby_with_skew(data_gen):
.selectExpr('a'),
conf = allow_negative_scale_of_decimal_conf)


# We are not trying all possibilities, just doing a few with numbers so the query works.
@pytest.mark.parametrize('data_gen', [all_basic_struct_gen], ids=idfn)
@pytest.mark.parametrize('stable_sort', ['STABLE', 'OUTOFCORE'], ids=idfn)
def test_single_nested_orderby_with_skew(data_gen, stable_sort):
sort_conf = {'spark.rapids.sql.stableSort.enabled': stable_sort == 'STABLE'}
# When doing range partitioning the upstream data is sampled to try and get the bounds for cutoffs.
# If the data comes back with skewed partitions then those partitions will be resampled for more data.
# This is to try and trigger it to happen.
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen) \
.selectExpr('a', 'random(1) > 0.5 as b') \
.repartition(f.col('b')) \
.orderBy(f.col('a')) \
.selectExpr('a'),
conf={**allow_negative_scale_of_decimal_conf, **sort_conf})


# This is primarily to test the out of core sort with multiple batches. For this we set the data size to
# be relatively large (1 MiB across all tasks) and the target size to be small (16 KiB). This means we
# should see around 64 batches of data. So this is the most valid if there are less than 64 tasks
# in the cluster, but it should still work even then.
def test_large_orderby():
@pytest.mark.parametrize('data_gen', [long_gen, StructGen([('child0', long_gen)])], ids=idfn)
@pytest.mark.parametrize('stable_sort', ['STABLE', 'OUTOFCORE'], ids=idfn)
def test_large_orderby(data_gen, stable_sort):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, long_gen, length=1024*128)\
lambda spark : unary_op_df(spark, data_gen, length=1024*128)\
.orderBy(f.col('a')),
conf = {'spark.rapids.sql.batchSizeBytes': '16384'})
conf={
'spark.rapids.sql.batchSizeBytes': '16384',
'spark.rapids.sql.stableSort.enabled': stable_sort == 'STABLE'
})

# This is similar to test_large_orderby, but here we want to test some types
# that are not being sorted on, but are going along with it
Expand Down

0 comments on commit 1cd417e

Please sign in to comment.