Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ANSI check for aggregates #3597

Merged
merged 6 commits into from
Sep 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 104 additions & 23 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@
_grpkey_floats_with_nulls_and_nans]


# Used to test ANSI-mode fallback
_no_overflow_ansi_gens = [
ByteGen(min_val = 1, max_val = 10, special_cases=[]),
ShortGen(min_val = 1, max_val = 100, special_cases=[]),
IntegerGen(min_val = 1, max_val = 1000, special_cases=[]),
LongGen(min_val = 1, max_val = 3000, special_cases=[])]


def get_params(init_list, marked_params=[]):
"""
A method to build the test inputs along with their passed in markers to allow testing
Expand Down Expand Up @@ -259,29 +267,6 @@ def test_hash_grpby_avg(data_gen, conf):
conf=conf
)

@ignore_order
@pytest.mark.parametrize('data_gen', [_grpkey_strings_with_extra_nulls], ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
@pytest.mark.parametrize('ansi_enabled', ['true', 'false'])
def test_hash_grpby_avg_nulls(data_gen, conf, ansi_enabled):
local_conf = copy_and_update(conf, {'spark.sql.ansi.enabled': ansi_enabled})
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100).groupby('a')
.agg(f.avg('c')),
conf=local_conf
)

@ignore_order
@pytest.mark.parametrize('data_gen', [_grpkey_strings_with_extra_nulls], ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
@pytest.mark.parametrize('ansi_enabled', ['true', 'false'])
def test_hash_reduction_avg_nulls(data_gen, conf, ansi_enabled):
local_conf = copy_and_update(conf, {'spark.sql.ansi.enabled': ansi_enabled})
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100)
.agg(f.avg('c')),
conf=local_conf
)

# tracks https://github.com/NVIDIA/spark-rapids/issues/154
@approximate_float
Expand Down Expand Up @@ -1088,3 +1073,99 @@ def do_it(spark):
df = two_col_df(spark, StringGen('k{1,5}'), ArrayGen(MapGen(StringGen('a{1,5}', nullable=False), StringGen('[ab]{1,5}'))))
return df.groupBy('a').agg(f.min(df.b[1]["a"]))
assert_gpu_and_cpu_are_equal_collect(do_it)

@ignore_order
@pytest.mark.parametrize('data_gen', [_grpkey_strings_with_extra_nulls], ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_hash_grpby_avg_nulls(data_gen, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100).groupby('a')
.agg(f.avg('c')),
conf=conf
)

@ignore_order
@allow_non_gpu('HashAggregateExec', 'Alias', 'AggregateExpression', 'Cast',
'HashPartitioning', 'ShuffleExchangeExec', 'Average')
@pytest.mark.parametrize('data_gen', [_grpkey_strings_with_extra_nulls], ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_hash_grpby_avg_nulls_ansi(data_gen, conf):
local_conf = copy_and_update(conf, {'spark.sql.ansi.enabled': 'true'})
assert_gpu_fallback_collect(
lambda spark: gen_df(spark, data_gen, length=100).groupby('a')
.agg(f.avg('c')),
'Average',
conf=local_conf
)

@ignore_order
@pytest.mark.parametrize('data_gen', [_grpkey_strings_with_extra_nulls], ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_hash_reduction_avg_nulls(data_gen, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100)
.agg(f.avg('c')),
conf=conf
)

@ignore_order
@allow_non_gpu('HashAggregateExec', 'Alias', 'AggregateExpression', 'Cast',
'HashPartitioning', 'ShuffleExchangeExec', 'Average')
@pytest.mark.parametrize('data_gen', [_grpkey_strings_with_extra_nulls], ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_hash_reduction_avg_nulls_ansi(data_gen, conf):
local_conf = copy_and_update(conf, {'spark.sql.ansi.enabled': 'true'})
assert_gpu_fallback_collect(
lambda spark: gen_df(spark, data_gen, length=100)
.agg(f.avg('c')),
'Average',
conf=local_conf
)


@allow_non_gpu('HashAggregateExec', 'Alias', 'AggregateExpression', 'Cast',
'HashPartitioning', 'ShuffleExchangeExec', 'Sum')
@pytest.mark.parametrize('data_gen', _no_overflow_ansi_gens, ids=idfn)
def test_sum_fallback_when_ansi_enabled(data_gen):
def do_it(spark):
df = gen_df(spark, [('a', data_gen), ('b', data_gen)], length=100)
return df.groupBy('a').agg(f.sum("b"))

assert_gpu_fallback_collect(do_it, 'Sum',
conf={'spark.sql.ansi.enabled': 'true'})


@allow_non_gpu('HashAggregateExec', 'Alias', 'AggregateExpression', 'Cast',
'HashPartitioning', 'ShuffleExchangeExec', 'Average')
@pytest.mark.parametrize('data_gen', _no_overflow_ansi_gens, ids=idfn)
def test_avg_fallback_when_ansi_enabled(data_gen):
def do_it(spark):
df = gen_df(spark, [('a', data_gen), ('b', data_gen)], length=100)
return df.groupBy('a').agg(f.avg("b"))

assert_gpu_fallback_collect(do_it, 'Average',
conf={'spark.sql.ansi.enabled': 'true'})


@allow_non_gpu('HashAggregateExec', 'Alias', 'AggregateExpression',
'HashPartitioning', 'ShuffleExchangeExec', 'Count', 'Literal')
@pytest.mark.parametrize('data_gen', _no_overflow_ansi_gens, ids=idfn)
def test_count_fallback_when_ansi_enabled(data_gen):
def do_it(spark):
df = gen_df(spark, [('a', data_gen), ('b', data_gen)], length=100)
return df.groupBy('a').agg(f.count("b"), f.count("*"))

assert_gpu_fallback_collect(do_it, 'Count',
conf={'spark.sql.ansi.enabled': 'true'})


@pytest.mark.parametrize('data_gen', _no_overflow_ansi_gens, ids=idfn)
def test_no_fallback_when_ansi_enabled(data_gen):
def do_it(spark):
df = gen_df(spark, [('a', data_gen), ('b', data_gen)], length=100)
# coalescing because of first/last are not deterministic
df = df.coalesce(1).orderBy("a", "b")
return df.groupBy('a').agg(f.first("b"), f.last("b"), f.min("b"), f.max("b"))

assert_gpu_and_cpu_are_equal_collect(do_it,
conf={'spark.sql.ansi.enabled': 'true'})
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,16 @@ class Spark320Shims extends Spark32XShims {
Seq(ParamCheck("input", TypeSig.integral + TypeSig.fp + TypeSig.NULL,
TypeSig.numericAndInterval + TypeSig.NULL))),
(a, conf, p, r) => new AggExprMeta[Average](a, conf, p, r) {
override def tagExprForGpu(): Unit = {
override def tagAggForGpu(): Unit = {
val dataType = a.child.dataType
GpuOverrides.checkAndTagFloatAgg(dataType, conf, this)
}

override def convertToGpu(child: Expression): GpuExpression = GpuAverage(child)
override def convertToGpu(childExprs: Seq[Expression]): GpuExpression =
GpuAverage(childExprs.head)

// Average is not supported in ANSI mode right now, no matter the type
override val ansiTypeToCheck: Option[DataType] = None
}),
GpuOverrides.expr[Abs](
"Absolute value",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,12 +310,16 @@ abstract class SparkBaseShims extends Spark30XShims {
TypeSig.DOUBLE, TypeSig.DOUBLE + TypeSig.DECIMAL_128_FULL,
Seq(ParamCheck("input", TypeSig.integral + TypeSig.fp, TypeSig.numeric))),
(a, conf, p, r) => new AggExprMeta[Average](a, conf, p, r) {
override def tagExprForGpu(): Unit = {
override def tagAggForGpu(): Unit = {
val dataType = a.child.dataType
GpuOverrides.checkAndTagFloatAgg(dataType, conf, this)
}

override def convertToGpu(child: Expression): GpuExpression = GpuAverage(child)
override def convertToGpu(childExprs: Seq[Expression]): GpuExpression =
GpuAverage(childExprs.head)

// Average is not supported in ANSI mode right now, no matter the type
override val ansiTypeToCheck: Option[DataType] = None
}),
GpuOverrides.expr[Abs](
"Absolute value",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,16 @@ abstract class SparkBaseShims extends Spark30XShims {
TypeSig.DOUBLE, TypeSig.DOUBLE + TypeSig.DECIMAL_128_FULL,
Seq(ParamCheck("input", TypeSig.integral + TypeSig.fp, TypeSig.numeric))),
(a, conf, p, r) => new AggExprMeta[Average](a, conf, p, r) {
override def tagExprForGpu(): Unit = {
override def tagAggForGpu(): Unit = {
val dataType = a.child.dataType
GpuOverrides.checkAndTagFloatAgg(dataType, conf, this)
}

override def convertToGpu(child: Expression): GpuExpression = GpuAverage(child)
override def convertToGpu(childExprs: Seq[Expression]): GpuExpression =
GpuAverage(childExprs.head)

// Average is not supported in ANSI mode right now, no matter the type
override val ansiTypeToCheck: Option[DataType] = None
}),
GpuOverrides.expr[Abs](
"Absolute value",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,16 @@ abstract class SparkBaseShims extends Spark30XShims {
TypeSig.DOUBLE, TypeSig.DOUBLE + TypeSig.DECIMAL_128_FULL,
Seq(ParamCheck("input", TypeSig.integral + TypeSig.fp, TypeSig.numeric))),
(a, conf, p, r) => new AggExprMeta[Average](a, conf, p, r) {
override def tagExprForGpu(): Unit = {
override def tagAggForGpu(): Unit = {
val dataType = a.child.dataType
GpuOverrides.checkAndTagFloatAgg(dataType, conf, this)
}

override def convertToGpu(child: Expression): GpuExpression = GpuAverage(child)
override def convertToGpu(childExprs: Seq[Expression]): GpuExpression =
GpuAverage(childExprs.head)

// Average is not supported in ANSI mode right now, no matter the type
override val ansiTypeToCheck: Option[DataType] = None
}),
GpuOverrides.expr[Abs](
"Absolute value",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,16 @@ abstract class SparkBaseShims extends Spark30XShims {
TypeSig.DOUBLE, TypeSig.DOUBLE + TypeSig.DECIMAL_128_FULL,
Seq(ParamCheck("input", TypeSig.integral + TypeSig.fp, TypeSig.numeric))),
(a, conf, p, r) => new AggExprMeta[Average](a, conf, p, r) {
override def tagExprForGpu(): Unit = {
override def tagAggForGpu(): Unit = {
val dataType = a.child.dataType
GpuOverrides.checkAndTagFloatAgg(dataType, conf, this)
}

override def convertToGpu(child: Expression): GpuExpression = GpuAverage(child)
override def convertToGpu(childExprs: Seq[Expression]): GpuExpression =
GpuAverage(childExprs.head)

// Average is not supported in ANSI mode right now, no matter the type
override val ansiTypeToCheck: Option[DataType] = None
}),
GpuOverrides.expr[Abs](
"Absolute value",
Expand Down
Loading