From f8b6dc617760fc2faa4d5b4549bbd32dc52757ba Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Tue, 31 Aug 2021 13:41:21 +0800 Subject: [PATCH 1/2] support HashAgg with StructType Keys Signed-off-by: sperlingxx --- docs/supported_ops.md | 6 +- .../src/main/python/hash_aggregate_test.py | 74 +++++++++++++++++++ .../nvidia/spark/rapids/GpuOverrides.scala | 9 ++- .../com/nvidia/spark/rapids/aggregate.scala | 23 ++++-- 4 files changed, 98 insertions(+), 14 deletions(-) diff --git a/docs/supported_ops.md b/docs/supported_ops.md index a1ca763ca6b..208b0d64a26 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -488,7 +488,7 @@ Accelerator supports are described below. NS PS
not allowed for grouping expressions;
max nested DECIMAL precision of 18;
UTC is only supported TZ for nested TIMESTAMP;
missing nested BINARY, CALENDAR, UDT
PS
not allowed for grouping expressions;
max nested DECIMAL precision of 18;
UTC is only supported TZ for nested TIMESTAMP;
missing nested BINARY, CALENDAR, UDT
-PS
not allowed for grouping expressions;
max nested DECIMAL precision of 18;
UTC is only supported TZ for nested TIMESTAMP;
missing nested BINARY, CALENDAR, UDT
+PS
not allowed for grouping expressions if containing Array or Map as child;
max nested DECIMAL precision of 18;
UTC is only supported TZ for nested TIMESTAMP;
missing nested BINARY, CALENDAR, UDT
NS @@ -536,7 +536,7 @@ Accelerator supports are described below. NS PS
not allowed for grouping expressions;
max nested DECIMAL precision of 18;
UTC is only supported TZ for nested TIMESTAMP;
missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT
PS
not allowed for grouping expressions;
max nested DECIMAL precision of 18;
UTC is only supported TZ for nested TIMESTAMP;
missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT
-PS
not allowed for grouping expressions;
max nested DECIMAL precision of 18;
UTC is only supported TZ for nested TIMESTAMP;
missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT
+PS
not allowed for grouping expressions if containing Array or Map as child;
max nested DECIMAL precision of 18;
UTC is only supported TZ for nested TIMESTAMP;
missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT
NS @@ -560,7 +560,7 @@ Accelerator supports are described below. NS PS
not allowed for grouping expressions;
max nested DECIMAL precision of 18;
UTC is only supported TZ for nested TIMESTAMP;
missing nested BINARY, CALENDAR, UDT
PS
not allowed for grouping expressions;
max nested DECIMAL precision of 18;
UTC is only supported TZ for nested TIMESTAMP;
missing nested BINARY, CALENDAR, UDT
-PS
not allowed for grouping expressions;
max nested DECIMAL precision of 18;
UTC is only supported TZ for nested TIMESTAMP;
missing nested BINARY, CALENDAR, UDT
+PS
not allowed for grouping expressions if containing Array or Map as child;
max nested DECIMAL precision of 18;
UTC is only supported TZ for nested TIMESTAMP;
missing nested BINARY, CALENDAR, UDT
NS diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 20e13ab55d1..195e23b8fd1 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -86,6 +86,35 @@ ('a', RepeatSeqGen(StringGen(pattern='[0-9]{0,30}'), length= 20)), ('b', IntegerGen()), ('c', NullGen())] +# grouping single-level structs +_grpkey_structs_with_non_nested_children = [ + ('a', RepeatSeqGen(StructGen([ + ['aa', IntegerGen()], + ['ab', StringGen(pattern='[0-9]{0,30}')], + ['ac', DecimalGen()]]), length=20)), + ('b', IntegerGen()), + ('c', NullGen())] +# grouping multiple-level structs +_grpkey_nested_structs = [ + ('a', RepeatSeqGen(StructGen([ + ['aa', IntegerGen()], + ['ab', StringGen(pattern='[0-9]{0,30}')], + ['ac', StructGen([['aca', LongGen()], + ['acb', BooleanGen()], + ['acc', StructGen([['acca', StringGen()]])]])]]), + length=20)), + ('b', IntegerGen()), + ('c', NullGen())] +# grouping multiple-level structs with arrays in children +_grpkey_nested_structs_with_array_child = [ + ('a', RepeatSeqGen(StructGen([ + ['aa', IntegerGen()], + ['ab', ArrayGen(IntegerGen())], + ['ac', ArrayGen(StructGen([['aca', LongGen()]]))]]), + length=20)), + ('b', IntegerGen()), + ('c', NullGen())] + # grouping NullType _grpkey_nulls = [ ('a', NullGen()), @@ -687,6 +716,51 @@ def test_hash_agg_with_nan_keys(data_gen, parameterless): 'from hash_agg_table group by a', _no_nans_float_conf) +@ignore_order +@pytest.mark.parametrize('data_gen', [_grpkey_structs_with_non_nested_children, + _grpkey_nested_structs], ids=idfn) +def test_hash_agg_with_struct_keys(data_gen): + conf = _no_nans_float_conf.copy() + conf.update({'spark.sql.legacy.allowParameterlessCount': 'true'}) + assert_gpu_and_cpu_are_equal_sql( + lambda spark : gen_df(spark, data_gen, length=1024), + "hash_agg_table", + 'select a, ' + 'count(*) as count_stars, ' + 'count() as count_parameterless, ' + 'count(b) as count_bees, ' + 'sum(b) as sum_of_bees, ' + 'max(c) as max_seas, ' + 'min(c) as min_seas, ' + 'count(distinct c) as count_distinct_cees, ' + 'avg(c) as average_seas ' + 'from hash_agg_table group by a', + conf) + +@ignore_order(local=True) +@allow_non_gpu('HashAggregateExec', 'Avg', 'Count', 'Max', 'Min', 'Sum', 'Average', + 'Cast', 'Literal', 'Alias', 'AggregateExpression', + 'ShuffleExchangeExec', 'HashPartitioning') +@pytest.mark.parametrize('data_gen', [_grpkey_nested_structs_with_array_child], ids=idfn) +def test_hash_agg_with_struct_of_array_fallback(data_gen): + conf = _no_nans_float_conf.copy() + conf.update({'spark.sql.legacy.allowParameterlessCount': 'true'}) + assert_cpu_and_gpu_are_equal_sql_with_capture( + lambda spark : gen_df(spark, data_gen, length=100), + 'select a, ' + 'count(*) as count_stars, ' + 'count() as count_parameterless, ' + 'count(b) as count_bees, ' + 'sum(b) as sum_of_bees, ' + 'max(c) as max_seas, ' + 'min(c) as min_seas, ' + 'avg(c) as average_seas ' + 'from hash_agg_table group by a', + "hash_agg_table", + exist_classes='HashAggregateExec', + non_exist_classes='GpuHashAggregateExec', + conf=conf) + @approximate_float @ignore_order diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 7e0490d2889..954efdba61b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3208,7 +3208,8 @@ object GpuOverrides { .nested() .withPsNote(TypeEnum.ARRAY, "not allowed for grouping expressions") .withPsNote(TypeEnum.MAP, "not allowed for grouping expressions") - .withPsNote(TypeEnum.STRUCT, "not allowed for grouping expressions"), + .withPsNote(TypeEnum.STRUCT, + "not allowed for grouping expressions if containing Array or Map as child"), TypeSig.all), (agg, conf, p, r) => new GpuHashAggregateMeta(agg, conf, p, r)), exec[ObjectHashAggregateExec]( @@ -3219,7 +3220,8 @@ object GpuOverrides { .nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64) .withPsNote(TypeEnum.ARRAY, "not allowed for grouping expressions") .withPsNote(TypeEnum.MAP, "not allowed for grouping expressions") - .withPsNote(TypeEnum.STRUCT, "not allowed for grouping expressions"), + .withPsNote(TypeEnum.STRUCT, + "not allowed for grouping expressions if containing Array or Map as child"), TypeSig.all), (agg, conf, p, r) => new GpuObjectHashAggregateExecMeta(agg, conf, p, r)), exec[SortAggregateExec]( @@ -3230,7 +3232,8 @@ object GpuOverrides { .nested() .withPsNote(TypeEnum.ARRAY, "not allowed for grouping expressions") .withPsNote(TypeEnum.MAP, "not allowed for grouping expressions") - .withPsNote(TypeEnum.STRUCT, "not allowed for grouping expressions"), + .withPsNote(TypeEnum.STRUCT, + "not allowed for grouping expressions if containing Array or Map as child"), TypeSig.all), (agg, conf, p, r) => new GpuSortAggregateExecMeta(agg, conf, p, r)), exec[SortExec]( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index a51c96ed587..7cf38df7e9d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.execution.{ExplainUtils, SortExec, SparkPlan} import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.rapids.{CpuToGpuAggregateBufferConverter, CudfAggregate, GpuAggregateExpression, GpuToCpuAggregateBufferConverter} import org.apache.spark.sql.rapids.execution.{GpuShuffleMeta, TrampolineUtil} -import org.apache.spark.sql.types.{ArrayType, DataType, LongType, MapType, StructType} +import org.apache.spark.sql.types.{ArrayType, DataType, LongType, MapType, StructField, StructType} import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} object AggregateUtils { @@ -896,16 +896,23 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( resultExpressions override def tagPlanForGpu(): Unit = { - agg.groupingExpressions - .find(_.dataType match { - case _@(ArrayType(_, _) | MapType(_, _, _)) | _@StructType(_) => true - case _ => false - }) - .foreach(_ => - willNotWorkOnGpu("Nested types in grouping expressions are not supported")) if (agg.resultExpressions.isEmpty) { willNotWorkOnGpu("result expressions is empty") } + // We don't support Arrays and Maps as GroupBy keys yet, even they are nested in Structs. So, + // we need to traverse through all child types of structs via DFS. + val stack = mutable.Stack(agg.groupingExpressions.map(_.dataType): _*) + var canWorkOnGPU = true + while (stack.nonEmpty && canWorkOnGPU) { + stack.pop() match { + case _@(ArrayType(_, _) | MapType(_, _, _)) => canWorkOnGPU = false + case _@StructType(children: Array[StructField]) => stack.pushAll(children.map(_.dataType)) + case _ => + } + } + if (!canWorkOnGPU) { + willNotWorkOnGpu("ArrayTypes or MayTypes in grouping expressions are not supported") + } tagForReplaceMode() From 7955395aa69108afb6ecd18478702e39a029561d Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Wed, 1 Sep 2021 10:26:48 +0800 Subject: [PATCH 2/2] update Signed-off-by: sperlingxx --- docs/supported_ops.md | 6 +++--- .../com/nvidia/spark/rapids/aggregate.scala | 18 ++++++------------ 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/docs/supported_ops.md b/docs/supported_ops.md index e741ef5a460..b6155bd125b 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -488,7 +488,7 @@ Accelerator supports are described below. NS PS
not allowed for grouping expressions;
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
PS
not allowed for grouping expressions;
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
-PS
not allowed for grouping expressions;
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
+PS
not allowed for grouping expressions if containing Array or Map as child;
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
NS @@ -536,7 +536,7 @@ Accelerator supports are described below. NS PS
not allowed for grouping expressions;
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT
PS
not allowed for grouping expressions;
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT
-PS
not allowed for grouping expressions;
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT
+PS
not allowed for grouping expressions if containing Array or Map as child;
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT
NS @@ -560,7 +560,7 @@ Accelerator supports are described below. NS PS
not allowed for grouping expressions;
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
PS
not allowed for grouping expressions;
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
-PS
not allowed for grouping expressions;
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
+PS
not allowed for grouping expressions if containing Array or Map as child;
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
NS diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index 7cf38df7e9d..83f9c6ad0f0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.execution.{ExplainUtils, SortExec, SparkPlan} import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.rapids.{CpuToGpuAggregateBufferConverter, CudfAggregate, GpuAggregateExpression, GpuToCpuAggregateBufferConverter} import org.apache.spark.sql.rapids.execution.{GpuShuffleMeta, TrampolineUtil} -import org.apache.spark.sql.types.{ArrayType, DataType, LongType, MapType, StructField, StructType} +import org.apache.spark.sql.types.{ArrayType, DataType, LongType, MapType, StructType} import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} object AggregateUtils { @@ -900,17 +900,11 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( willNotWorkOnGpu("result expressions is empty") } // We don't support Arrays and Maps as GroupBy keys yet, even they are nested in Structs. So, - // we need to traverse through all child types of structs via DFS. - val stack = mutable.Stack(agg.groupingExpressions.map(_.dataType): _*) - var canWorkOnGPU = true - while (stack.nonEmpty && canWorkOnGPU) { - stack.pop() match { - case _@(ArrayType(_, _) | MapType(_, _, _)) => canWorkOnGPU = false - case _@StructType(children: Array[StructField]) => stack.pushAll(children.map(_.dataType)) - case _ => - } - } - if (!canWorkOnGPU) { + // we need to run recursive type check on the structs. + val allTypesAreSupported = agg.groupingExpressions.forall(e => + !TrampolineUtil.dataTypeExistsRecursively(e.dataType, + dt => dt.isInstanceOf[ArrayType] || dt.isInstanceOf[MapType])) + if (!allTypesAreSupported) { willNotWorkOnGpu("ArrayTypes or MayTypes in grouping expressions are not supported") }