From 3220a115d1588c0258801a57222a91135fff3012 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 29 Jul 2021 14:07:33 -0500 Subject: [PATCH] Partial support for time windows (#3074) Signed-off-by: Robert (Bobby) Evans --- docs/configs.md | 1 + docs/supported_ops.md | 654 +++++++++++------- integration_tests/src/main/python/data_gen.py | 4 +- .../src/main/python/time_window_test.py | 96 +++ .../nvidia/spark/rapids/GpuOverrides.scala | 17 +- .../apache/spark/sql/rapids/TimeWindow.scala | 43 ++ 6 files changed, 543 insertions(+), 272 deletions(-) create mode 100644 integration_tests/src/main/python/time_window_test.py create mode 100644 sql-plugin/src/main/scala/org/apache/spark/sql/rapids/TimeWindow.scala diff --git a/docs/configs.md b/docs/configs.md index b4373905562..99d21061c28 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -239,6 +239,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.Pmod|`pmod`|Pmod|true|None| spark.rapids.sql.expression.PosExplode|`posexplode_outer`, `posexplode`|Given an input array produces a sequence of rows for each value in the array.|true|None| spark.rapids.sql.expression.Pow|`pow`, `power`|lhs ^ rhs|true|None| +spark.rapids.sql.expression.PreciseTimestampConversion| |Expression used internally to convert the TimestampType to Long and back without losing precision, i.e. in microseconds. Used in time windowing.|true|None| spark.rapids.sql.expression.PromotePrecision| |PromotePrecision before arithmetic operations between DecimalType data|true|None| spark.rapids.sql.expression.PythonUDF| |UDF run in an external python process. Does not actually run on the GPU, but the transfer of data to/from it can be accelerated.|true|None| spark.rapids.sql.expression.Quarter|`quarter`|Returns the quarter of the year for date, in the range 1 to 4|true|None| diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 50a38ac61ed..7c22e577fb9 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -190,9 +190,9 @@ Accelerator supports are described below. S NS NS -NS -NS -NS +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) NS @@ -11833,6 +11833,96 @@ Accelerator support is described below. +PreciseTimestampConversion + +Expression used internally to convert the TimestampType to Long and back without losing precision, i.e. in microseconds. Used in time windowing. +None +project +input + + + + +S + + + +S* + + + + + + + + + + + +result + + + + +S + + + +S* + + + + + + + + + + + +lambda +input + + + + +NS + + + +NS + + + + + + + + + + + +result + + + + +NS + + + +NS + + + + + + + + + + + PromotePrecision PromotePrecision before arithmetic operations between DecimalType data @@ -11923,6 +12013,32 @@ Accelerator support is described below. +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + PythonUDF UDF run in an external python process. Does not actually run on the GPU, but the transfer of data to/from it can be accelerated. @@ -12099,32 +12215,6 @@ Accelerator support is described below. -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - Quarter `quarter` Returns the quarter of the year for date, in the range 1 to 4 @@ -12305,6 +12395,32 @@ Accelerator support is described below. +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + Rank `rank` Window function that returns the rank value within the aggregation window @@ -12526,32 +12642,6 @@ Accelerator support is described below. -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - Remainder `%`, `mod` Remainder or modulo @@ -12684,6 +12774,32 @@ Accelerator support is described below. +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + Rint `rint` Rounds up a double value to the nearest double equal to an integer @@ -12906,32 +13022,6 @@ Accelerator support is described below. -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - RowNumber `row_number` Window function that returns the index for the row within the aggregation window @@ -13048,12 +13138,38 @@ Accelerator support is described below. NS -Second -`second` -Returns the second component of the string/timestamp -None -project -input +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + +Second +`second` +Returns the second component of the string/timestamp +None +project +input @@ -13270,32 +13386,6 @@ Accelerator support is described below. -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - ShiftRight `shiftright` Bitwise shift right (>>) @@ -13428,6 +13518,32 @@ Accelerator support is described below. +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + ShiftRightUnsigned `shiftrightunsigned` Bitwise unsigned shift right (>>>) @@ -13650,32 +13766,6 @@ Accelerator support is described below. -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - Sin `sin` Sine @@ -13856,6 +13946,32 @@ Accelerator support is described below. +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + Size `size`, `cardinality` The size of an array or a map @@ -14078,32 +14194,6 @@ Accelerator support is described below. -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - SortOrder Sort order @@ -14267,6 +14357,32 @@ Accelerator support is described below. +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + Sqrt `sqrt` Square root @@ -14489,32 +14605,6 @@ Accelerator support is described below. -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - StringLPad `lpad` Pad a string on the left @@ -14689,6 +14779,32 @@ Accelerator support is described below. +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + StringLocate `position`, `locate` Substring search operator @@ -14863,32 +14979,6 @@ Accelerator support is described below. -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - StringRPad `rpad` Pad a string on the right @@ -15063,6 +15153,32 @@ Accelerator support is described below. +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + StringReplace `replace` StringReplace operator @@ -15237,32 +15353,6 @@ Accelerator support is described below. -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - StringSplit `split` Splits `str` around occurrences that match `regex` @@ -15437,6 +15527,32 @@ Accelerator support is described below. +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + StringTrim `trim` StringTrim operator @@ -15701,32 +15817,6 @@ Accelerator support is described below. -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - StringTrimRight `rtrim` StringTrimRight operator @@ -15859,6 +15949,32 @@ Accelerator support is described below. +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + Substring `substr`, `substring` Substring operator diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index f5ffe30d72f..703ccdaf236 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -868,9 +868,9 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False): boolean_gens = [boolean_gen] -single_level_array_gens = [ArrayGen(sub_gen) for sub_gen in all_basic_gens + decimal_gens + [null_gen]] +single_level_array_gens = [ArrayGen(sub_gen) for sub_gen in all_basic_gens + decimal_gens] -single_level_array_gens_no_decimal = [ArrayGen(sub_gen) for sub_gen in all_basic_gens + [null_gen]] +single_level_array_gens_no_decimal = [ArrayGen(sub_gen) for sub_gen in all_basic_gens] map_string_string_gen = [MapGen(StringGen(pattern='key_[0-9]', nullable=False), StringGen())] diff --git a/integration_tests/src/main/python/time_window_test.py b/integration_tests/src/main/python/time_window_test.py new file mode 100644 index 00000000000..05005bef5b2 --- /dev/null +++ b/integration_tests/src/main/python/time_window_test.py @@ -0,0 +1,96 @@ +# Copyright (c) 2021, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from asserts import assert_gpu_and_cpu_are_equal_collect +from data_gen import * +from datetime import datetime +from marks import ignore_order, allow_non_gpu +from pyspark.sql.types import * +import pyspark.sql.functions as f +from pyspark.sql.window import Window + +# do it over a day so we have more chance of overlapping values +_restricted_start = datetime(2020, 1, 1, tzinfo=timezone.utc) +_restricted_end = datetime(2020, 1, 2, tzinfo=timezone.utc) +_restricted_ts_gen = TimestampGen(start=_restricted_start, end=_restricted_end) + +# Once we support grouping by a struct (even single level) this should go away +# https://github.com/NVIDIA/spark-rapids/issues/2877 +# Shuffle falls back to CPU because it is in between two CPU hash/sort aggregates +@allow_non_gpu('HashAggregateExec', 'SortAggregateExec', 'AggregateExpression', 'Max', 'Alias', 'ShuffleExchangeExec', 'HashPartitioning') +@pytest.mark.parametrize('data_gen', integral_gens + [string_gen], ids=idfn) +@ignore_order +def test_grouped_tumbling_window(data_gen): + row_gen = StructGen([['ts', _restricted_ts_gen],['data', data_gen]], nullable=False) + assert_gpu_and_cpu_are_equal_collect( + lambda spark : gen_df(spark, row_gen).groupBy(f.window('ts', '5 hour')).agg(f.max("data").alias("max_data"))) + +# Warning. On Sliding windows is it easy to make lots of overlapping windows. This can make the Spark code generation +# have some real problems and even crash some times when trying to JIT it. This problem only happens on the CPU +# so be careful. + +# Once we support grouping by a struct (even single level) this should go away +# https://github.com/NVIDIA/spark-rapids/issues/2877 +# Shuffle falls back to CPU because it is in between two CPU hash/sort aggregates +@allow_non_gpu('HashAggregateExec', 'SortAggregateExec', 'AggregateExpression', 'Max', 'Alias', 'ShuffleExchangeExec', 'HashPartitioning') +@pytest.mark.parametrize('data_gen', integral_gens + [string_gen], ids=idfn) +@ignore_order +def test_grouped_sliding_window(data_gen): + row_gen = StructGen([['ts', _restricted_ts_gen],['data', data_gen]], nullable=False) + assert_gpu_and_cpu_are_equal_collect( + lambda spark : gen_df(spark, row_gen).groupBy(f.window('ts', '5 hour', '1 hour')).agg(f.max("data").alias("max_data"))) + +# Having arrays allows us to verify that expand exec in this case works with arrays too +# Once we support grouping by a struct (even single level) this should go away +# https://github.com/NVIDIA/spark-rapids/issues/2877 +# Shuffle falls back to CPU because it is in between two CPU hash/sort aggregates +@allow_non_gpu('HashAggregateExec', 'SortAggregateExec', 'AggregateExpression', 'GetArrayItem', 'Literal', 'Max', 'Alias', 'ShuffleExchangeExec', 'HashPartitioning') +@pytest.mark.parametrize('data_gen', integral_gens + [string_gen], ids=idfn) +@ignore_order +def test_grouped_sliding_window_array(data_gen): + row_gen = StructGen([['ts', _restricted_ts_gen],['data', ArrayGen(data_gen)]], nullable=False) + assert_gpu_and_cpu_are_equal_collect( + lambda spark : gen_df(spark, row_gen).groupBy(f.window('ts', '5 hour', '1 hour')).agg(f.max(f.col("data")[3]).alias("max_data"))) + +@pytest.mark.parametrize('data_gen', integral_gens + [string_gen], ids=idfn) +@allow_non_gpu('WindowExec', 'WindowExpression', 'WindowSpecDefinition', 'SpecifiedWindowFrame', 'UnboundedPreceding$', 'UnboundedFollowing$', 'AggregateExpression', 'Max', 'Alias') +@ignore_order +def test_tumbling_window(data_gen): + row_gen = StructGen([['ts', _restricted_ts_gen],['data', data_gen]], nullable=False) + w = Window.partitionBy(f.window('ts', '5 hour')) + assert_gpu_and_cpu_are_equal_collect( + lambda spark : gen_df(spark, row_gen).withColumn('rolling_max', f.max("data").over(w))) + +@pytest.mark.parametrize('data_gen', integral_gens + [string_gen], ids=idfn) +@allow_non_gpu('WindowExec', 'WindowExpression', 'WindowSpecDefinition', 'SpecifiedWindowFrame', 'UnboundedPreceding$', 'UnboundedFollowing$', 'AggregateExpression', 'Max', 'Alias') +@ignore_order +def test_sliding_window(data_gen): + row_gen = StructGen([['ts', _restricted_ts_gen],['data', data_gen]], nullable=False) + w = Window.partitionBy(f.window('ts', '5 hour', '1 hour')) + assert_gpu_and_cpu_are_equal_collect( + lambda spark : gen_df(spark, row_gen).withColumn('rolling_max', f.max("data").over(w))) + +# This allows us to verify that GpuExpandExec works with all of the various types. +@pytest.mark.parametrize('data_gen', all_basic_gens + decimal_gens + array_gens_sample + map_gens_sample, ids=idfn) +# This includes an expand and we produce a different order than the CPU does. Sort locally to allow sorting of all types +@ignore_order(local=True) +def test_just_window(data_gen): + row_gen = StructGen([['ts', timestamp_gen],['data', data_gen]], nullable=False) + assert_gpu_and_cpu_are_equal_collect( + lambda spark : gen_df(spark, row_gen).withColumn('time_bucket', f.window('ts', '5 hour', '1 hour')), + conf = allow_negative_scale_of_decimal_conf) + + diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 8a1e44d1c6d..d3dc29e1782 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -964,6 +964,18 @@ object GpuOverrides { override def convertToGpu(): GpuExpression = GpuLag(input.convertToGpu(), offset.convertToGpu(), default.convertToGpu()) }), + expr[PreciseTimestampConversion]( + "Expression used internally to convert the TimestampType to Long and back without losing " + + "precision, i.e. in microseconds. Used in time windowing.", + ExprChecks.unaryProjectNotLambda( + TypeSig.TIMESTAMP + TypeSig.LONG, + TypeSig.TIMESTAMP + TypeSig.LONG, + TypeSig.TIMESTAMP + TypeSig.LONG, + TypeSig.TIMESTAMP + TypeSig.LONG), + (a, conf, p, r) => new UnaryExprMeta[PreciseTimestampConversion](a, conf, p, r) { + override def convertToGpu(child: Expression): GpuExpression = + GpuPreciseTimestampConversion(child, a.fromType, a.toType) + }), expr[UnaryMinus]( "Negate a numeric value", ExprChecks.unaryProjectNotLambdaInputMatchesOutput( @@ -3046,7 +3058,10 @@ object GpuOverrides { (sort, conf, p, r) => new GpuSortMeta(sort, conf, p, r)), exec[ExpandExec]( "The backend for the expand operator", - ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all), + ExecChecks( + (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested(), + TypeSig.all), (expand, conf, p, r) => new GpuExpandExecMeta(expand, conf, p, r)), exec[WindowExec]( "Window-operator backend", diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/TimeWindow.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/TimeWindow.scala new file mode 100644 index 00000000000..52d1e8d7972 --- /dev/null +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/TimeWindow.scala @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.rapids + +import ai.rapids.cudf.ColumnVector +import com.nvidia.spark.rapids.{GpuColumnVector, GpuUnaryExpression} + +import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression} +import org.apache.spark.sql.types.{AbstractDataType, DataType} + +/** + * Expression used internally to convert the TimestampType to Long and back without losing + * precision, i.e. in microseconds. Used in time windowing. + */ +case class GpuPreciseTimestampConversion( + child: Expression, + fromType: DataType, + toType: DataType) extends GpuUnaryExpression with ExpectsInputTypes { + override def inputTypes: Seq[AbstractDataType] = Seq(fromType) + override def dataType: DataType = toType + + override protected def doColumnar(input: GpuColumnVector): ColumnVector = { + val outDType = GpuColumnVector.getNonNestedRapidsType(toType) + withResource(input.getBase.bitCastTo(outDType)) { bitCast => + bitCast.copyToColumnVector() + } + } +} \ No newline at end of file