From ec66baab2cb8fe3a4389bd33a2e4145f90ff48eb Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 18 Feb 2022 12:58:54 -0700 Subject: [PATCH 01/23] Initial support for reading decimal types from JSON and CSV Signed-off-by: Andy Grove --- docs/compatibility.md | 6 --- integration_tests/src/main/python/csv_test.py | 6 +++ .../src/main/python/json_test.py | 10 ++-- .../src/test/resources/decimals.csv | 14 ++++++ .../src/test/resources/decimals.json | 12 +++++ .../nvidia/spark/rapids/GpuOverrides.scala | 4 +- .../rapids/GpuTextBasedPartitionReader.scala | 22 +++++---- .../catalyst/json/rapids/GpuJsonScan.scala | 46 +++++++++++++++++-- 8 files changed, 98 insertions(+), 22 deletions(-) create mode 100644 integration_tests/src/test/resources/decimals.csv create mode 100644 integration_tests/src/test/resources/decimals.json diff --git a/docs/compatibility.md b/docs/compatibility.md index d7ef0005508..becb81ccba1 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -283,14 +283,8 @@ will produce a different result compared to the plugin. Due to inconsistencies between how CSV data is parsed CSV parsing is off by default. Each data type can be enabled or disabled independently using the following configs. - * [spark.rapids.sql.csv.read.bool.enabled](configs.md#sql.csv.read.bool.enabled) - * [spark.rapids.sql.csv.read.byte.enabled](configs.md#sql.csv.read.byte.enabled) * [spark.rapids.sql.csv.read.date.enabled](configs.md#sql.csv.read.date.enabled) * [spark.rapids.sql.csv.read.double.enabled](configs.md#sql.csv.read.double.enabled) - * [spark.rapids.sql.csv.read.float.enabled](configs.md#sql.csv.read.float.enabled) - * [spark.rapids.sql.csv.read.integer.enabled](configs.md#sql.csv.read.integer.enabled) - * [spark.rapids.sql.csv.read.long.enabled](configs.md#sql.csv.read.long.enabled) - * [spark.rapids.sql.csv.read.short.enabled](configs.md#sql.csv.read.short.enabled) * [spark.rapids.sql.csvTimestamps.enabled](configs.md#sql.csvTimestamps.enabled) If you know that your particular data type will be parsed correctly enough, you may enable each diff --git a/integration_tests/src/main/python/csv_test.py b/integration_tests/src/main/python/csv_test.py index 67c5e7344b4..575446963af 100644 --- a/integration_tests/src/main/python/csv_test.py +++ b/integration_tests/src/main/python/csv_test.py @@ -127,6 +127,9 @@ _double_schema = StructType([ StructField('number', DoubleType())]) +_decimal_schema = StructType([ + StructField('number', DecimalType(10, 2))]) + _number_as_string_schema = StructType([ StructField('number', StringType())]) @@ -220,6 +223,8 @@ def read_impl(spark): pytest.param('simple_int_values.csv', _long_schema, {'header': 'true'}), ('simple_int_values.csv', _float_schema, {'header': 'true'}), ('simple_int_values.csv', _double_schema, {'header': 'true'}), + ('simple_int_values.csv', _decimal_schema, {'header': 'true'}), + ('decimals.csv', _decimal_schema, {'header': 'true'}), pytest.param('empty_int_values.csv', _empty_byte_schema, {'header': 'true'}), pytest.param('empty_int_values.csv', _empty_short_schema, {'header': 'true'}), pytest.param('empty_int_values.csv', _empty_int_schema, {'header': 'true'}), @@ -235,6 +240,7 @@ def read_impl(spark): pytest.param('simple_float_values.csv', _long_schema, {'header': 'true'}), pytest.param('simple_float_values.csv', _float_schema, {'header': 'true'}), pytest.param('simple_float_values.csv', _double_schema, {'header': 'true'}), + pytest.param('simple_float_values.csv', _decimal_schema, {'header': 'true'}), pytest.param('simple_boolean_values.csv', _bool_schema, {'header': 'true'}), pytest.param('ints_with_whitespace.csv', _number_as_string_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2069')), pytest.param('ints_with_whitespace.csv', _byte_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/130')) diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index 3e50eb0d73a..12ad9360b50 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -59,6 +59,9 @@ _double_schema = StructType([ StructField('number', DoubleType())]) +_decimal_schema = StructType([ + StructField('number', DecimalType(10, 2))]) + _string_schema = StructType([ StructField('a', StringType())]) @@ -189,14 +192,15 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena pytest.param('boolean_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4779')), 'ints.json', pytest.param('ints_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4793')), - 'nan_and_inf.json', - pytest.param('nan_and_inf_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4646')), + pytest.param('nan_and_inf.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4615, https://github.com/NVIDIA/spark-rapids/issues/4646')), + pytest.param('nan_and_inf_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4615, https://github.com/NVIDIA/spark-rapids/issues/4646')), 'floats.json', 'floats_leading_zeros.json', 'floats_invalid.json', pytest.param('floats_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4647')), + 'decimals.json', ]) -@pytest.mark.parametrize('schema', [_bool_schema, _byte_schema, _short_schema, _int_schema, _long_schema, _float_schema, _double_schema]) +@pytest.mark.parametrize('schema', [_bool_schema, _byte_schema, _short_schema, _int_schema, _long_schema, _float_schema, _double_schema, _decimal_schema]) @pytest.mark.parametrize('read_func', [read_json_df, read_json_sql]) @pytest.mark.parametrize('allow_non_numeric_numbers', ["true", "false"]) @pytest.mark.parametrize('allow_numeric_leading_zeros', ["true"]) diff --git a/integration_tests/src/test/resources/decimals.csv b/integration_tests/src/test/resources/decimals.csv new file mode 100644 index 00000000000..a199a772d0a --- /dev/null +++ b/integration_tests/src/test/resources/decimals.csv @@ -0,0 +1,14 @@ +"number" +-1 +0 +1. +0.12 +.12 ++.12 +-.12 +1 +1.01 +12.34 +12.3456 +12345678.12 + diff --git a/integration_tests/src/test/resources/decimals.json b/integration_tests/src/test/resources/decimals.json new file mode 100644 index 00000000000..5a8fd685ff9 --- /dev/null +++ b/integration_tests/src/test/resources/decimals.json @@ -0,0 +1,12 @@ +{ "number": 0 } +{ "number": 12 } +{ "number": 12.0 } +{ "number": 12. } +{ "number": .34 } +{ "number": +.34 } +{ "number": -.34 } +{ "number": 0.34 } +{ "number": 12.34 } +{ "number": 12.3456 } +{ "number": 12.345678 } +{ "number": 123456.78 } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index d3a40991fd3..2ccc3d12bed 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -804,7 +804,7 @@ object GpuOverrides extends Logging { lazy val fileFormats: Map[FileFormatType, Map[FileFormatOp, FileFormatChecks]] = Map( (CsvFormatType, FileFormatChecks( - cudfRead = TypeSig.commonCudfTypes, + cudfRead = TypeSig.commonCudfTypes + TypeSig.DECIMAL_128, cudfWrite = TypeSig.none, sparkSig = TypeSig.cpuAtomics)), (ParquetFormatType, FileFormatChecks( @@ -823,7 +823,7 @@ object GpuOverrides extends Logging { sparkSig = (TypeSig.cpuAtomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP + TypeSig.UDT).nested())), (JsonFormatType, FileFormatChecks( - cudfRead = TypeSig.commonCudfTypes, + cudfRead = TypeSig.commonCudfTypes + TypeSig.DECIMAL_128, cudfWrite = TypeSig.none, sparkSig = (TypeSig.cpuAtomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP + TypeSig.UDT).nested()))) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala index 7b079ee0618..c7d1eee61a3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala @@ -18,17 +18,15 @@ package com.nvidia.spark.rapids import scala.collection.mutable.ListBuffer import scala.math.max - import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, NvtxRange, Scalar, Schema, Table} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.compress.CompressionCodecFactory - import org.apache.spark.TaskContext import org.apache.spark.sql.connector.read.PartitionReader import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.datasources.{HadoopFileLinesReader, PartitionedFile} -import org.apache.spark.sql.types.{DataTypes, StructField, StructType} +import org.apache.spark.sql.types.{DataTypes, DecimalType, StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch /** @@ -172,7 +170,7 @@ abstract class GpuTextBasedPartitionReader( f.dataType match { case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType | - DataTypes.DoubleType => + DataTypes.DoubleType | _: DecimalType => f.copy(dataType = DataTypes.StringType) case _ => f @@ -196,8 +194,6 @@ abstract class GpuTextBasedPartitionReader( // Table increases the ref counts on the columns so we have // to close them after creating the table withResource(columns) { _ => - // ansi mode does not apply to text inputs - val ansiEnabled = false for (i <- 0 until table.getNumberOfColumns) { val castColumn = newReadDataSchema.fields(i).dataType match { case DataTypes.BooleanType => @@ -211,9 +207,11 @@ abstract class GpuTextBasedPartitionReader( case DataTypes.LongType => castStringToInt(table.getColumn(i), DType.INT64) case DataTypes.FloatType => - GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT32) + castStringToFloat(table.getColumn(i), DType.FLOAT32) case DataTypes.DoubleType => - GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT64) + castStringToFloat(table.getColumn(i), DType.FLOAT64) + case dt: DecimalType => + castStringToDecimal(table.getColumn(i), dt) case _ => table.getColumn(i).incRefCount() } @@ -232,6 +230,14 @@ abstract class GpuTextBasedPartitionReader( def castStringToBool(input: ColumnVector): ColumnVector + def castStringToFloat(input: ColumnVector, dt: DType): ColumnVector = { + GpuCast.castStringToFloats(input, ansiEnabled = false, dt) + } + + def castStringToDecimal(input: ColumnVector, dt: DecimalType): ColumnVector = { + GpuCast.castStringToDecimal(input, ansiEnabled = false, dt) + } + def castStringToInt(input: ColumnVector, intType: DType): ColumnVector = { withResource(input.isInteger(intType)) { isInt => withResource(input.castTo(intType)) { asInt => diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala index 795a7d8eeb3..c9a8cf5e583 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala @@ -20,12 +20,10 @@ import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer - import ai.rapids.cudf import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, Scalar, Schema, Table} import com.nvidia.spark.rapids._ import org.apache.hadoop.conf.Configuration - import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow @@ -38,7 +36,7 @@ import org.apache.spark.sql.execution.datasources.{PartitionedFile, Partitioning import org.apache.spark.sql.execution.datasources.v2.{FilePartitionReaderFactory, FileScan, TextBasedFileScan} import org.apache.spark.sql.execution.datasources.v2.json.JsonScan import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{DateType, StringType, StructType, TimestampType} +import org.apache.spark.sql.types.{DateType, DecimalType, StringType, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration @@ -370,4 +368,46 @@ class JsonPartitionReader( } } + /** + * JSON has strict rules about valid numeric formats. See https://www.json.org/ for specification. + * + * Spark then has it's own rules for supporting NaN and Infinity, which are not + * valid numbers in JSON. + */ + private def sanitizeNumbers(input: ColumnVector): ColumnVector = { + // Note that this is not 100% consistent with Spark. See the following issues + // for more information: + // https://issues.apache.org/jira/browse/SPARK-38060 + // https://github.com/NVIDIA/spark-rapids/issues/4615 + val regex = if (parsedOptions.allowNonNumericNumbers) { + "^" + + "(?:" + + "(?:-?[0-9]+(?:\\.[0-9]+)?(?:[eE][\\-\\+]?[0-9]+)?)" + + "|NaN" + + "|(?:[\\+\\-]?INF)" + + "|(?:[\\-\\+]?Infinity)" + + ")" + + "$" + } else { + "^-?[0-9]+(?:\\.[0-9]+)?(?:[eE][\\-\\+]?[0-9]+)?$" + } + withResource(input.matchesRe(regex)) { validJsonDecimal => + withResource(Scalar.fromNull(DType.STRING)) { nullString => + validJsonDecimal.ifElse(input, nullString) + } + } + } + + override def castStringToFloat(input: ColumnVector, dt: DType): ColumnVector = { + withResource(sanitizeNumbers(input)) { sanitizedInput => + super.castStringToFloat(sanitizedInput, dt) + } + } + + override def castStringToDecimal(input: ColumnVector, dt: DecimalType): ColumnVector = { + withResource(sanitizeNumbers(input)) { sanitizedInput => + super.castStringToDecimal(sanitizedInput, dt) + } + } + } From 2b45083bc35963ab41834de5b4191308c33d88c7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 18 Feb 2022 13:05:20 -0700 Subject: [PATCH 02/23] update docs --- docs/compatibility.md | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/docs/compatibility.md b/docs/compatibility.md index becb81ccba1..c643c76f198 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -284,7 +284,6 @@ Due to inconsistencies between how CSV data is parsed CSV parsing is off by defa Each data type can be enabled or disabled independently using the following configs. * [spark.rapids.sql.csv.read.date.enabled](configs.md#sql.csv.read.date.enabled) - * [spark.rapids.sql.csv.read.double.enabled](configs.md#sql.csv.read.double.enabled) * [spark.rapids.sql.csvTimestamps.enabled](configs.md#sql.csvTimestamps.enabled) If you know that your particular data type will be parsed correctly enough, you may enable each @@ -301,14 +300,6 @@ default. The plugin will strip leading and trailing space for all values except There are also discrepancies/issues with specific types that are detailed below. -### CSV Boolean - -Invalid values like `BAD` show up as `true` as described by this -[issue](https://github.com/NVIDIA/spark-rapids/issues/2071) - -This is the same for all other types, but because that is the only issue with boolean parsing -we have called it out specifically here. - ### CSV Strings Writing strings to a CSV file in general for Spark can be problematic unless you can ensure that your data does not have any line deliminators in it. The GPU accelerated CSV parser handles quoted @@ -377,10 +368,6 @@ Also parsing of some values will not produce bit for bit identical results to wh They are within round-off errors except when they are close enough to overflow to Inf or -Inf which then results in a number being returned when the CPU would have returned null. -### CSV Integer - -Any number that overflows will not be turned into a null value. - ## ORC The ORC format has fairly complete support for both reads and writes. There are only a few known @@ -480,7 +467,7 @@ these formats when unquoted, will produce `null` on the CPU and may produce vali { "number": "-Infinity" } ``` -Another limitation of the GPU JSON reader is that it will parse strings containing floating-point values where +Another limitation of the GPU JSON reader is that it will parse strings containing boolean or numeric values where Spark will treat them as invalid inputs and will just return `null`. ### JSON Schema discovery From edc8c71e235779940d60628bae42685c8800e544 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 18 Feb 2022 13:22:10 -0700 Subject: [PATCH 03/23] Document behavior of JSON option allowNonNumericNumbers --- docs/compatibility.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/compatibility.md b/docs/compatibility.md index c643c76f198..1b68293cbbe 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -494,6 +494,13 @@ unquoted control characters but Spark reads these entries incorrectly as null. H and when the option is false, then RAPIDS Accelerator's behavior is same as Spark where an exception is thrown as discussed in `JSON Schema discovery` section. +- `allowNonNumericNumbers` - Allows `NaN` and `Infinity` values to be parsed (note that these are not valid numeric +values in the [JSON specification](https://json.org)). Spark has inconsistent behavior and will +parse some variants of `NaN` and `Infinity` even when this option is disabled +([SPARK-38060](https://issues.apache.org/jira/browse/SPARK-38060)). The RAPIDS Accelerator supports a wider range of +representations than Spark when this option is enabled and does not support any form of `NaN` or `Infinity` when the +option is disabled. + ## Regular Expressions The following Apache Spark regular expression functions and expressions are supported on the GPU: From ec31dc9f671b309bf5e91dc128c5380367d37246 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 22 Feb 2022 10:41:53 -0700 Subject: [PATCH 04/23] Improve JSON support for INF values --- docs/compatibility.md | 15 +++++---------- integration_tests/src/main/python/json_test.py | 7 ++++--- .../src/test/resources/nan_and_inf.json | 9 ++++++--- .../test/resources/nan_and_inf_edge_cases.json | 12 ------------ .../src/test/resources/nan_and_inf_invalid.json | 12 ++++++++++++ .../src/test/resources/nan_and_inf_strings.json | 6 ++++++ .../sql/catalyst/json/rapids/GpuJsonScan.scala | 8 +++----- 7 files changed, 36 insertions(+), 33 deletions(-) delete mode 100644 integration_tests/src/test/resources/nan_and_inf_edge_cases.json create mode 100644 integration_tests/src/test/resources/nan_and_inf_invalid.json create mode 100644 integration_tests/src/test/resources/nan_and_inf_strings.json diff --git a/docs/compatibility.md b/docs/compatibility.md index 1b68293cbbe..c485a4be1b2 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -456,16 +456,11 @@ The nested types(array, map and struct) are not supported yet in current version Parsing floating-point values has the same limitations as [casting from string to float](#String-to-Float). -The GPU JSON reader does not support `NaN` and `Inf` values with full compatibility with Spark. - -The following are the only formats that are parsed consistently between CPU and GPU. Any other variation, including -these formats when unquoted, will produce `null` on the CPU and may produce valid `NaN` and `Inf` results on the GPU. - -```json -{ "number": "NaN" } -{ "number": "Infinity" } -{ "number": "-Infinity" } -``` +Prior to Spark 3.3.0, reading JSON strings such as `"+Infinity"` when specifying that the data type is `FloatType` +or `DoubleType` caused these values to be parsed even when `allowNonNumericNumbers` is set to false. Also, Spark +versions prior to 3.3.0 only supported the `"Infinity"` and `"-Infinity"` representations of infinity and did not +support `"+INF"`, `"-INF"`, or `"+Infinity"`, which Spark considers valid when unquoted. The GPU JSON reader is +consistent with the behavior in Spark 3.3.0 and later. Another limitation of the GPU JSON reader is that it will parse strings containing boolean or numeric values where Spark will treat them as invalid inputs and will just return `null`. diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index 12ad9360b50..26f8cd24da5 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -18,7 +18,7 @@ from data_gen import * from src.main.python.marks import approximate_float, allow_non_gpu -from src.main.python.spark_session import with_cpu_session +from src.main.python.spark_session import with_cpu_session, is_before_spark_330 json_supported_gens = [ # Spark does not escape '\r' or '\n' even though it uses it to mark end of record @@ -192,8 +192,9 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena pytest.param('boolean_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4779')), 'ints.json', pytest.param('ints_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4793')), - pytest.param('nan_and_inf.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4615, https://github.com/NVIDIA/spark-rapids/issues/4646')), - pytest.param('nan_and_inf_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4615, https://github.com/NVIDIA/spark-rapids/issues/4646')), + 'nan_and_inf.json', + pytest.param('nan_and_inf_strings.json', marks=pytest.mark.skipif(is_before_spark_330(), reason='https://issues.apache.org/jira/browse/SPARK-38060 fixed in Spark 3.3.0')), + 'nan_and_inf_invalid.json', 'floats.json', 'floats_leading_zeros.json', 'floats_invalid.json', diff --git a/integration_tests/src/test/resources/nan_and_inf.json b/integration_tests/src/test/resources/nan_and_inf.json index e4aab168de4..ff012b53bec 100644 --- a/integration_tests/src/test/resources/nan_and_inf.json +++ b/integration_tests/src/test/resources/nan_and_inf.json @@ -1,3 +1,6 @@ -{ "number": "NaN" } -{ "number": "Infinity" } -{ "number": "-Infinity" } \ No newline at end of file +{ "number": NaN } +{ "number": +INF } +{ "number": -INF } +{ "number": Infinity } +{ "number": +Infinity } +{ "number": -Infinity } \ No newline at end of file diff --git a/integration_tests/src/test/resources/nan_and_inf_edge_cases.json b/integration_tests/src/test/resources/nan_and_inf_edge_cases.json deleted file mode 100644 index c27a2291626..00000000000 --- a/integration_tests/src/test/resources/nan_and_inf_edge_cases.json +++ /dev/null @@ -1,12 +0,0 @@ -{ "number": "NAN" } -{ "number": "nan" } -{ "number": "INF" } -{ "number": "+INF" } -{ "number": "-INF" } -{ "number": INF } -{ "number": +INF } -{ "number": -INF } -{ "number": "Inf" } -{ "number": "+Inf" } -{ "number": "-Inf" } -{ "number": "+Infinity" } \ No newline at end of file diff --git a/integration_tests/src/test/resources/nan_and_inf_invalid.json b/integration_tests/src/test/resources/nan_and_inf_invalid.json new file mode 100644 index 00000000000..03dbb41d2f5 --- /dev/null +++ b/integration_tests/src/test/resources/nan_and_inf_invalid.json @@ -0,0 +1,12 @@ +{ "number": NAN } +{ "number": nan } +{ "number": INF } +{ "number": Inf } +{ "number": +Inf } +{ "number": -Inf } +{ "number": "NAN" } +{ "number": "nan" } +{ "number": "INF" } +{ "number": "Inf" } +{ "number": "+Inf" } +{ "number": "-Inf" } \ No newline at end of file diff --git a/integration_tests/src/test/resources/nan_and_inf_strings.json b/integration_tests/src/test/resources/nan_and_inf_strings.json new file mode 100644 index 00000000000..3080f5147a4 --- /dev/null +++ b/integration_tests/src/test/resources/nan_and_inf_strings.json @@ -0,0 +1,6 @@ +{ "number": "NaN" } +{ "number": "+INF" } +{ "number": "-INF" } +{ "number": "Infinity" } +{ "number": "+Infinity" } +{ "number": "-Infinity" } \ No newline at end of file diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala index c9a8cf5e583..abaf13d5728 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala @@ -375,16 +375,14 @@ class JsonPartitionReader( * valid numbers in JSON. */ private def sanitizeNumbers(input: ColumnVector): ColumnVector = { - // Note that this is not 100% consistent with Spark. See the following issues - // for more information: - // https://issues.apache.org/jira/browse/SPARK-38060 - // https://github.com/NVIDIA/spark-rapids/issues/4615 + // Note that this is not 100% consistent with Spark versions prior to Spark 3.3.0 + // due to https://issues.apache.org/jira/browse/SPARK-38060 val regex = if (parsedOptions.allowNonNumericNumbers) { "^" + "(?:" + "(?:-?[0-9]+(?:\\.[0-9]+)?(?:[eE][\\-\\+]?[0-9]+)?)" + "|NaN" + - "|(?:[\\+\\-]?INF)" + + "|(?:[\\+\\-]INF)" + "|(?:[\\-\\+]?Infinity)" + ")" + "$" From 3d121bb564ad30a89e9ae42507853b31f0b1e3ec Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 22 Feb 2022 10:59:06 -0700 Subject: [PATCH 05/23] improve decimal tests to cover different rounding cases --- integration_tests/src/main/python/csv_test.py | 13 +++++++++---- integration_tests/src/main/python/json_test.py | 7 +++++-- integration_tests/src/test/resources/decimals.csv | 2 ++ integration_tests/src/test/resources/decimals.json | 2 ++ .../sql/catalyst/json/rapids/GpuJsonScan.scala | 2 +- 5 files changed, 19 insertions(+), 7 deletions(-) diff --git a/integration_tests/src/main/python/csv_test.py b/integration_tests/src/main/python/csv_test.py index 575446963af..3067bb1ee5b 100644 --- a/integration_tests/src/main/python/csv_test.py +++ b/integration_tests/src/main/python/csv_test.py @@ -127,9 +127,12 @@ _double_schema = StructType([ StructField('number', DoubleType())]) -_decimal_schema = StructType([ +_decimal_10_2_schema = StructType([ StructField('number', DecimalType(10, 2))]) +_decimal_10_3_schema = StructType([ + StructField('number', DecimalType(10, 3))]) + _number_as_string_schema = StructType([ StructField('number', StringType())]) @@ -223,8 +226,9 @@ def read_impl(spark): pytest.param('simple_int_values.csv', _long_schema, {'header': 'true'}), ('simple_int_values.csv', _float_schema, {'header': 'true'}), ('simple_int_values.csv', _double_schema, {'header': 'true'}), - ('simple_int_values.csv', _decimal_schema, {'header': 'true'}), - ('decimals.csv', _decimal_schema, {'header': 'true'}), + ('simple_int_values.csv', _decimal_10_2_schema, {'header': 'true'}), + ('decimals.csv', _decimal_10_2_schema, {'header': 'true'}), + ('decimals.csv', _decimal_10_3_schema, {'header': 'true'}), pytest.param('empty_int_values.csv', _empty_byte_schema, {'header': 'true'}), pytest.param('empty_int_values.csv', _empty_short_schema, {'header': 'true'}), pytest.param('empty_int_values.csv', _empty_int_schema, {'header': 'true'}), @@ -240,7 +244,8 @@ def read_impl(spark): pytest.param('simple_float_values.csv', _long_schema, {'header': 'true'}), pytest.param('simple_float_values.csv', _float_schema, {'header': 'true'}), pytest.param('simple_float_values.csv', _double_schema, {'header': 'true'}), - pytest.param('simple_float_values.csv', _decimal_schema, {'header': 'true'}), + pytest.param('simple_float_values.csv', _decimal_10_2_schema, {'header': 'true'}), + pytest.param('simple_float_values.csv', _decimal_10_3_schema, {'header': 'true'}), pytest.param('simple_boolean_values.csv', _bool_schema, {'header': 'true'}), pytest.param('ints_with_whitespace.csv', _number_as_string_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2069')), pytest.param('ints_with_whitespace.csv', _byte_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/130')) diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index 26f8cd24da5..6c27df15572 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -59,9 +59,12 @@ _double_schema = StructType([ StructField('number', DoubleType())]) -_decimal_schema = StructType([ +_decimal_10_2_schema = StructType([ StructField('number', DecimalType(10, 2))]) +_decimal_10_3_schema = StructType([ + StructField('number', DecimalType(10, 3))]) + _string_schema = StructType([ StructField('a', StringType())]) @@ -201,7 +204,7 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena pytest.param('floats_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4647')), 'decimals.json', ]) -@pytest.mark.parametrize('schema', [_bool_schema, _byte_schema, _short_schema, _int_schema, _long_schema, _float_schema, _double_schema, _decimal_schema]) +@pytest.mark.parametrize('schema', [_bool_schema, _byte_schema, _short_schema, _int_schema, _long_schema, _float_schema, _double_schema, _decimal_10_2_schema, _decimal_10_3_schema]) @pytest.mark.parametrize('read_func', [read_json_df, read_json_sql]) @pytest.mark.parametrize('allow_non_numeric_numbers', ["true", "false"]) @pytest.mark.parametrize('allow_numeric_leading_zeros', ["true"]) diff --git a/integration_tests/src/test/resources/decimals.csv b/integration_tests/src/test/resources/decimals.csv index a199a772d0a..b8e83af6fff 100644 --- a/integration_tests/src/test/resources/decimals.csv +++ b/integration_tests/src/test/resources/decimals.csv @@ -11,4 +11,6 @@ 12.34 12.3456 12345678.12 +33.545454 +33.454545 diff --git a/integration_tests/src/test/resources/decimals.json b/integration_tests/src/test/resources/decimals.json index 5a8fd685ff9..0a98fd05474 100644 --- a/integration_tests/src/test/resources/decimals.json +++ b/integration_tests/src/test/resources/decimals.json @@ -10,3 +10,5 @@ { "number": 12.3456 } { "number": 12.345678 } { "number": 123456.78 } +{ "number": 33.454545 } +{ "number": 33.545454 } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala index abaf13d5728..cbc561c8811 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala @@ -371,7 +371,7 @@ class JsonPartitionReader( /** * JSON has strict rules about valid numeric formats. See https://www.json.org/ for specification. * - * Spark then has it's own rules for supporting NaN and Infinity, which are not + * Spark then has its own rules for supporting NaN and Infinity, which are not * valid numbers in JSON. */ private def sanitizeNumbers(input: ColumnVector): ColumnVector = { From 55b350a0f8c98311b3972c3c1f547e1b70fd3e4b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 22 Feb 2022 12:14:01 -0700 Subject: [PATCH 06/23] reduce the amount of regexp --- .../catalyst/json/rapids/GpuJsonScan.scala | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala index cbc561c8811..2815bbbdd64 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala @@ -20,10 +20,12 @@ import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer + import ai.rapids.cudf import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, Scalar, Schema, Table} import com.nvidia.spark.rapids._ import org.apache.hadoop.conf.Configuration + import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow @@ -377,21 +379,24 @@ class JsonPartitionReader( private def sanitizeNumbers(input: ColumnVector): ColumnVector = { // Note that this is not 100% consistent with Spark versions prior to Spark 3.3.0 // due to https://issues.apache.org/jira/browse/SPARK-38060 - val regex = if (parsedOptions.allowNonNumericNumbers) { - "^" + - "(?:" + - "(?:-?[0-9]+(?:\\.[0-9]+)?(?:[eE][\\-\\+]?[0-9]+)?)" + - "|NaN" + - "|(?:[\\+\\-]INF)" + - "|(?:[\\-\\+]?Infinity)" + - ")" + - "$" + // cuDF `isFloat` supports some inputs that are not valid JSON numbers, such as `.1`, `1.`, + // and `+1` so we use a regular expression to match valid JSON numbers instead + val jsonNumberRegexp = "^-?[0-9]+(?:\\.[0-9]+)?(?:[eE][\\-\\+]?[0-9]+)?$" + val isValid = if (parsedOptions.allowNonNumericNumbers) { + withResource(ColumnVector.fromStrings("NaN", "+INF", "-INF", "+Infinity", + "Infinity", "-Infinity")) { nonNumeric => + withResource(input.matchesRe(jsonNumberRegexp)) { isJsonNumber => + withResource(input.contains(nonNumeric)) { nonNumeric => + isJsonNumber.or(nonNumeric) + } + } + } } else { - "^-?[0-9]+(?:\\.[0-9]+)?(?:[eE][\\-\\+]?[0-9]+)?$" + input.matchesRe(jsonNumberRegexp) } - withResource(input.matchesRe(regex)) { validJsonDecimal => + withResource(isValid) { _ => withResource(Scalar.fromNull(DType.STRING)) { nullString => - validJsonDecimal.ifElse(input, nullString) + isValid.ifElse(input, nullString) } } } From addbe403e29a7bcee9cfa615f5323715744135e9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 22 Feb 2022 12:32:19 -0700 Subject: [PATCH 07/23] scalastyle --- .../com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala index c7d1eee61a3..88e7fef3970 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala @@ -18,10 +18,12 @@ package com.nvidia.spark.rapids import scala.collection.mutable.ListBuffer import scala.math.max + import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, NvtxRange, Scalar, Schema, Table} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.compress.CompressionCodecFactory + import org.apache.spark.TaskContext import org.apache.spark.sql.connector.read.PartitionReader import org.apache.spark.sql.execution.QueryExecutionException From fcddeb1684ff591b3fcd2f79affdfa2652be23b8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 22 Feb 2022 13:09:46 -0700 Subject: [PATCH 08/23] fix regression and update generated docs --- docs/supported_ops.md | 4 ++-- .../com/nvidia/spark/rapids/unit/DecimalUnitTest.scala | 6 ++---- tools/src/main/resources/supportedDataSource.csv | 2 +- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 4f4292f97a6..a063f6ff02d 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -17698,7 +17698,7 @@ dates or timestamps, or for a lack of type coercion support. S PS
UTC is only supported TZ for TIMESTAMP
S -NS +S NS @@ -17741,7 +17741,7 @@ dates or timestamps, or for a lack of type coercion support. S PS
UTC is only supported TZ for TIMESTAMP
S -NS +S NS diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/unit/DecimalUnitTest.scala b/tests/src/test/scala/com/nvidia/spark/rapids/unit/DecimalUnitTest.scala index 79ded960a0a..98e145bec57 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/unit/DecimalUnitTest.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/unit/DecimalUnitTest.scala @@ -26,8 +26,6 @@ import com.nvidia.spark.rapids.{GpuAlias, GpuBatchScanExec, GpuColumnVector, Gpu import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal} -import org.apache.spark.sql.execution.FileSourceScanExec -import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.GpuFileSourceScanExec import org.apache.spark.sql.types.{Decimal, DecimalType, IntegerType, LongType, StructField, StructType} @@ -257,7 +255,7 @@ class DecimalUnitTest extends GpuUnitTests { var rootPlan = frameFromOrc("decimal-test.orc")(ss).queryExecution.executedPlan assert(rootPlan.map(p => p).exists(_.isInstanceOf[GpuFileSourceScanExec])) rootPlan = fromCsvDf("decimal-test.csv", decimalCsvStruct)(ss).queryExecution.executedPlan - assert(rootPlan.map(p => p).exists(_.isInstanceOf[FileSourceScanExec])) + assert(rootPlan.map(p => p).exists(_.isInstanceOf[GpuFileSourceScanExec])) rootPlan = frameFromParquet("decimal-test.parquet")(ss).queryExecution.executedPlan assert(rootPlan.map(p => p).exists(_.isInstanceOf[GpuFileSourceScanExec])) }, conf) @@ -266,7 +264,7 @@ class DecimalUnitTest extends GpuUnitTests { var rootPlan = frameFromOrc("decimal-test.orc")(ss).queryExecution.executedPlan assert(rootPlan.map(p => p).exists(_.isInstanceOf[GpuBatchScanExec])) rootPlan = fromCsvDf("decimal-test.csv", decimalCsvStruct)(ss).queryExecution.executedPlan - assert(rootPlan.map(p => p).exists(_.isInstanceOf[BatchScanExec])) + assert(rootPlan.map(p => p).exists(_.isInstanceOf[GpuBatchScanExec])) rootPlan = frameFromParquet("decimal-test.parquet")(ss).queryExecution.executedPlan assert(rootPlan.map(p => p).exists(_.isInstanceOf[GpuBatchScanExec])) }, conf.set(SQLConf.USE_V1_SOURCE_LIST.key, "")) diff --git a/tools/src/main/resources/supportedDataSource.csv b/tools/src/main/resources/supportedDataSource.csv index 4a42eede67c..81833809620 100644 --- a/tools/src/main/resources/supportedDataSource.csv +++ b/tools/src/main/resources/supportedDataSource.csv @@ -1,5 +1,5 @@ Format,Direction,BOOLEAN,BYTE,SHORT,INT,LONG,FLOAT,DOUBLE,DATE,TIMESTAMP,STRING,DECIMAL,NULL,BINARY,CALENDAR,ARRAY,MAP,STRUCT,UDT -CSV,read,S,S,S,S,S,S,S,CO,CO,S,NS,NA,NS,NA,NA,NA,NA,NA +CSV,read,S,S,S,S,S,S,S,CO,CO,S,S,NA,NS,NA,NA,NA,NA,NA JSON,read,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO ORC,read,S,S,S,S,S,S,S,S,PS,S,S,NA,NS,NA,PS,PS,PS,NS ORC,write,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA From 1a68c27fda73dd012508ed0b5aef0eb2c535783f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 22 Feb 2022 13:22:07 -0700 Subject: [PATCH 09/23] update compatibility guide --- docs/compatibility.md | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/docs/compatibility.md b/docs/compatibility.md index c485a4be1b2..b0716b0b3f8 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -490,11 +490,10 @@ and when the option is false, then RAPIDS Accelerator's behavior is same as Spar as discussed in `JSON Schema discovery` section. - `allowNonNumericNumbers` - Allows `NaN` and `Infinity` values to be parsed (note that these are not valid numeric -values in the [JSON specification](https://json.org)). Spark has inconsistent behavior and will +values in the [JSON specification](https://json.org)). Spark versions prior to 3.3.0 have inconsistent behavior and will parse some variants of `NaN` and `Infinity` even when this option is disabled -([SPARK-38060](https://issues.apache.org/jira/browse/SPARK-38060)). The RAPIDS Accelerator supports a wider range of -representations than Spark when this option is enabled and does not support any form of `NaN` or `Infinity` when the -option is disabled. +([SPARK-38060](https://issues.apache.org/jira/browse/SPARK-38060)). The RAPIDS Accelerator behavior is consistent with +Spark version 3.3.0 and later. ## Regular Expressions From ecff2d2dd83ff44288243d2c6ef7da6b28326d11 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 22 Feb 2022 13:22:16 -0700 Subject: [PATCH 10/23] update compatibility guide --- docs/compatibility.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/compatibility.md b/docs/compatibility.md index b0716b0b3f8..85946b3c4ad 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -492,8 +492,8 @@ as discussed in `JSON Schema discovery` section. - `allowNonNumericNumbers` - Allows `NaN` and `Infinity` values to be parsed (note that these are not valid numeric values in the [JSON specification](https://json.org)). Spark versions prior to 3.3.0 have inconsistent behavior and will parse some variants of `NaN` and `Infinity` even when this option is disabled -([SPARK-38060](https://issues.apache.org/jira/browse/SPARK-38060)). The RAPIDS Accelerator behavior is consistent with -Spark version 3.3.0 and later. +([SPARK-38060](https://issues.apache.org/jira/browse/SPARK-38060)). The RAPIDS Accelerator behavior is consistent with +Spark version 3.3.0 and later. ## Regular Expressions From bb79bf4cdeffe108a09786f44dd66bec40a86434 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 23 Feb 2022 19:47:38 +0000 Subject: [PATCH 11/23] Initial support for reading dates with JSON and CSV readers Signed-off-by: Andy Grove --- integration_tests/src/main/python/csv_test.py | 6 +- .../src/main/python/json_test.py | 69 ++++++++++++++++++- .../src/test/resources/dates.json | 3 + .../src/test/resources/dates_invalid.json | 1 + .../com/nvidia/spark/rapids/DateUtils.scala | 55 ++++++++++++++- .../spark/rapids/GpuBatchScanExec.scala | 1 + .../rapids/GpuTextBasedPartitionReader.scala | 27 +++++++- .../catalyst/json/rapids/GpuJsonScan.scala | 10 ++- .../sql/rapids/datetimeExpressions.scala | 63 ++++------------- 9 files changed, 174 insertions(+), 61 deletions(-) create mode 100644 integration_tests/src/test/resources/dates.json create mode 100644 integration_tests/src/test/resources/dates_invalid.json diff --git a/integration_tests/src/main/python/csv_test.py b/integration_tests/src/main/python/csv_test.py index 3067bb1ee5b..624b385a87d 100644 --- a/integration_tests/src/main/python/csv_test.py +++ b/integration_tests/src/main/python/csv_test.py @@ -175,7 +175,7 @@ 'spark.rapids.sql.csv.read.long.enabled': 'true', 'spark.rapids.sql.csv.read.float.enabled': 'true', 'spark.rapids.sql.csv.read.double.enabled': 'true', - 'spark.sql.legacy.timeParserPolicy': 'Corrected'} + 'spark.sql.legacy.timeParserPolicy': 'CORRECTED'} def read_csv_df(data_path, schema, options = {}): def read_impl(spark): @@ -200,8 +200,8 @@ def read_impl(spark): @pytest.mark.parametrize('name,schema,options', [ ('Acquisition_2007Q3.txt', _acq_schema, {'sep': '|'}), ('Performance_2007Q3.txt_0', _perf_schema, {'sep': '|'}), - pytest.param('ts.csv', _date_schema, {}), - pytest.param('date.csv', _date_schema, {}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1111')), + ('ts.csv', _date_schema, {}), + ('date.csv', _date_schema, {}), ('ts.csv', _ts_schema, {}), ('str.csv', _bad_str_schema, {'header': 'true'}), ('str.csv', _good_str_schema, {'header': 'true'}), diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index eab1c1e0cc2..baf7d856b71 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -14,7 +14,7 @@ import pytest -from asserts import assert_gpu_and_cpu_are_equal_collect +from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error, assert_gpu_fallback_collect from data_gen import * from conftest import is_databricks_runtime from marks import approximate_float, allow_non_gpu, ignore_order @@ -66,6 +66,9 @@ _decimal_10_3_schema = StructType([ StructField('number', DecimalType(10, 3))]) +_date_schema = StructType([ + StructField('number', DateType())]) + _string_schema = StructType([ StructField('a', StringType())]) @@ -204,14 +207,18 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena 'floats_invalid.json', pytest.param('floats_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4647')), 'decimals.json', + 'dates.json', + 'dates_invalid.json', ]) -@pytest.mark.parametrize('schema', [_bool_schema, _byte_schema, _short_schema, _int_schema, _long_schema, _float_schema, _double_schema, _decimal_10_2_schema, _decimal_10_3_schema]) +@pytest.mark.parametrize('schema', [_bool_schema, _byte_schema, _short_schema, _int_schema, _long_schema, _float_schema, _double_schema, _decimal_10_2_schema, _decimal_10_3_schema, _date_schema]) @pytest.mark.parametrize('read_func', [read_json_df, read_json_sql]) @pytest.mark.parametrize('allow_non_numeric_numbers', ["true", "false"]) @pytest.mark.parametrize('allow_numeric_leading_zeros', ["true"]) @pytest.mark.parametrize('ansi_enabled', ["true", "false"]) def test_basic_json_read(std_input_path, filename, schema, read_func, allow_non_numeric_numbers, allow_numeric_leading_zeros, ansi_enabled): - updated_conf = copy_and_update(_enable_all_types_conf, {'spark.sql.ansi.enabled': ansi_enabled}) + updated_conf = copy_and_update(_enable_all_types_conf, + {'spark.sql.ansi.enabled': ansi_enabled, + 'spark.sql.legacy.timeParserPolicy': 'CORRECTED'}) assert_gpu_and_cpu_are_equal_collect( read_func(std_input_path + '/' + filename, schema, @@ -219,6 +226,62 @@ def test_basic_json_read(std_input_path, filename, schema, read_func, allow_non_ "allowNumericLeadingZeros": allow_numeric_leading_zeros}), conf=updated_conf) +@approximate_float +@pytest.mark.parametrize('filename', [ + 'dates.json', +]) +@pytest.mark.parametrize('schema', [_date_schema]) +@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql]) +@pytest.mark.parametrize('ansi_enabled', ["true", "false"]) +@pytest.mark.parametrize('time_parser_policy', [ + pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('FileSourceScanExec')), + 'CORRECTED', + 'EXCEPTION' +]) +def test_json_read_valid_dates(std_input_path, filename, schema, read_func, ansi_enabled, time_parser_policy): + updated_conf = copy_and_update(_enable_all_types_conf, + {'spark.sql.ansi.enabled': ansi_enabled, + 'spark.sql.legacy.timeParserPolicy': time_parser_policy, + 'spark.rapids.sql.incompatibleDateFormats.enabled': True}) + f = read_func(std_input_path + '/' + filename, schema, {}) + if time_parser_policy == 'LEGACY' and ansi_enabled == 'true': + assert_gpu_fallback_collect( + f, + 'FileSourceScanExec', + conf=updated_conf) + else: + assert_gpu_and_cpu_are_equal_collect(f, conf=updated_conf) + +@approximate_float +@pytest.mark.parametrize('filename', [ + 'dates_invalid.json', +]) +@pytest.mark.parametrize('schema', [_date_schema]) +@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql]) +@pytest.mark.parametrize('ansi_enabled', ["true", "false"]) +@pytest.mark.parametrize('time_parser_policy', [ + pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('FileSourceScanExec')), + 'CORRECTED', + 'EXCEPTION' +]) +def test_json_read_invalid_dates(std_input_path, filename, schema, read_func, ansi_enabled, time_parser_policy): + updated_conf = copy_and_update(_enable_all_types_conf, + {'spark.sql.ansi.enabled': ansi_enabled, + 'spark.sql.legacy.timeParserPolicy': time_parser_policy }) + f = read_func(std_input_path + '/' + filename, schema, {}) + if time_parser_policy == 'EXCEPTION': + assert_gpu_and_cpu_error( + df_fun=lambda spark: f(spark).collect(), + conf=updated_conf, + error_message='DateTimeException') + elif time_parser_policy == 'LEGACY' and ansi_enabled == 'true': + assert_gpu_fallback_collect( + f, + 'FileSourceScanExec', + conf=updated_conf) + else: + assert_gpu_and_cpu_are_equal_collect(f, conf=updated_conf) + @pytest.mark.parametrize('schema', [_string_schema]) @pytest.mark.parametrize('read_func', [read_json_df, read_json_sql]) @pytest.mark.parametrize('allow_unquoted_chars', ["true"]) diff --git a/integration_tests/src/test/resources/dates.json b/integration_tests/src/test/resources/dates.json new file mode 100644 index 00000000000..b551b4edfa8 --- /dev/null +++ b/integration_tests/src/test/resources/dates.json @@ -0,0 +1,3 @@ +{ "number": "2020-09-16" } +{ "number": "1581-01-01" } +{ "number": "1583-01-01" } \ No newline at end of file diff --git a/integration_tests/src/test/resources/dates_invalid.json b/integration_tests/src/test/resources/dates_invalid.json new file mode 100644 index 00000000000..6ecfd8f04b9 --- /dev/null +++ b/integration_tests/src/test/resources/dates_invalid.json @@ -0,0 +1 @@ +{ "number": "2020-09-32" } \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala index 7834bdeaed6..54da70121af 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,8 @@ import com.nvidia.spark.rapids.VersionUtils.isSpark320OrLater import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils.localDateToDays +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.rapids.{GpuToTimestamp, LegacyTimeParserPolicy} /** * Class for helper functions for Date @@ -211,4 +213,55 @@ object DateUtils { } case class TimestampFormatConversionException(reason: String) extends Exception + + def tagAndGetCudfFormat( + meta: RapidsMeta[_, _, _], + sparkFormat: String, + parseString: Boolean): String = { + var strfFormat = "" + if (GpuOverrides.getTimeParserPolicy == LegacyTimeParserPolicy) { + try { + // try and convert the format to cuDF format - this will throw an exception if + // the format contains unsupported characters or words + strfFormat = toStrf(sparkFormat, parseString) + // format parsed ok but we have no 100% compatible formats in LEGACY mode + if (GpuToTimestamp.LEGACY_COMPATIBLE_FORMATS.contains(sparkFormat)) { + // LEGACY support has a number of issues that mean we cannot guarantee + // compatibility with CPU + // - we can only support 4 digit years but Spark supports a wider range + // - we use a proleptic Gregorian calender but Spark uses a hybrid Julian+Gregorian + // calender in LEGACY mode + if (SQLConf.get.ansiEnabled) { + meta.willNotWorkOnGpu("LEGACY format in ANSI mode is not supported on the GPU") + } else if (!meta.conf.incompatDateFormats) { + meta.willNotWorkOnGpu(s"LEGACY format '$sparkFormat' on the GPU is not guaranteed " + + s"to produce the same results as Spark on CPU. Set " + + s"${RapidsConf.INCOMPATIBLE_DATE_FORMATS.key}=true to force onto GPU.") + } + } else { + meta.willNotWorkOnGpu(s"LEGACY format '$sparkFormat' is not supported on the GPU.") + } + } catch { + case e: TimestampFormatConversionException => + meta.willNotWorkOnGpu(s"Failed to convert ${e.reason} ${e.getMessage}") + } + } else { + try { + // try and convert the format to cuDF format - this will throw an exception if + // the format contains unsupported characters or words + strfFormat = toStrf(sparkFormat, parseString) + // format parsed ok, so it is either compatible (tested/certified) or incompatible + if (!GpuToTimestamp.CORRECTED_COMPATIBLE_FORMATS.contains(sparkFormat) && + !meta.conf.incompatDateFormats) { + meta.willNotWorkOnGpu(s"CORRECTED format '$sparkFormat' on the GPU is not guaranteed " + + s"to produce the same results as Spark on CPU. Set " + + s"${RapidsConf.INCOMPATIBLE_DATE_FORMATS.key}=true to force onto GPU.") + } + } catch { + case e: TimestampFormatConversionException => + meta.willNotWorkOnGpu(s"Failed to convert ${e.reason} ${e.getMessage}") + } + } + strfFormat + } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala index 21c9d0147ac..360400f8078 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala @@ -412,4 +412,5 @@ class CSVPartitionReader( } } + override def dateFormat: String = parsedOptions.dateFormat } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala index 88e7fef3970..3498c1b7e1c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala @@ -16,6 +16,8 @@ package com.nvidia.spark.rapids +import java.time.DateTimeException + import scala.collection.mutable.ListBuffer import scala.math.max @@ -28,6 +30,7 @@ import org.apache.spark.TaskContext import org.apache.spark.sql.connector.read.PartitionReader import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.datasources.{HadoopFileLinesReader, PartitionedFile} +import org.apache.spark.sql.rapids.ExceptionTimeParserPolicy import org.apache.spark.sql.types.{DataTypes, DecimalType, StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -172,7 +175,7 @@ abstract class GpuTextBasedPartitionReader( f.dataType match { case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType | - DataTypes.DoubleType | _: DecimalType => + DataTypes.DoubleType | _: DecimalType | DataTypes.DateType => f.copy(dataType = DataTypes.StringType) case _ => f @@ -214,6 +217,8 @@ abstract class GpuTextBasedPartitionReader( castStringToFloat(table.getColumn(i), DType.FLOAT64) case dt: DecimalType => castStringToDecimal(table.getColumn(i), dt) + case DataTypes.DateType => + castStringToDate(table.getColumn(i), ansiEnabled = false) case _ => table.getColumn(i).incRefCount() } @@ -230,6 +235,26 @@ abstract class GpuTextBasedPartitionReader( } } + def dateFormat: String + + def castStringToDate(input: ColumnVector, ansiEnabled: Boolean): ColumnVector = { + val cudfFormat = DateUtils.toStrf(dateFormat, parseString = true) + withResource(input.isTimestamp(cudfFormat)) { isDate => + if (GpuOverrides.getTimeParserPolicy == ExceptionTimeParserPolicy) { + withResource(isDate.all()) { all => + if (all.isValid && !all.getBoolean) { + throw new DateTimeException("One or more values is not a valid date") + } + } + } + withResource(input.asTimestamp(DType.TIMESTAMP_DAYS, cudfFormat)) { asDate => + withResource(Scalar.fromNull(DType.TIMESTAMP_DAYS)) { nullScalar => + isDate.ifElse(asDate, nullScalar) + } + } + } + } + def castStringToBool(input: ColumnVector): ColumnVector def castStringToFloat(input: ColumnVector, dt: DType): ColumnVector = { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala index 2815bbbdd64..b8603ff7f7a 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala @@ -147,10 +147,12 @@ object GpuJsonScan { }) if (readSchema.map(_.dataType).contains(DateType)) { +// if (GpuOverrides.getTimeParserPolicy == LegacyTimeParserPolicy) { +// // https://github.com/NVIDIA/spark-rapids/issues/4849 +// meta.willNotWorkOnGpu("GpuJsonScan does not support LEGACY timeParserPolicy") +// } ShimLoader.getSparkShims.dateFormatInRead(parsedOptions).foreach { dateFormat => - if (!supportedDateFormats.contains(dateFormat)) { - meta.willNotWorkOnGpu(s"the date format '${dateFormat}' is not supported'") - } + DateUtils.tagAndGetCudfFormat(meta, dateFormat, parseString = true) } } @@ -413,4 +415,6 @@ class JsonPartitionReader( } } + override def dateFormat: String = parsedOptions.dateFormat + } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index 0425f77f2c9..6b0e611368e 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,14 +19,12 @@ package org.apache.spark.sql.rapids import java.util.concurrent.TimeUnit import ai.rapids.cudf.{BinaryOp, ColumnVector, ColumnView, DType, Scalar} -import com.nvidia.spark.rapids.{Arm, BinaryExprMeta, DataFromReplacementRule, DateUtils, GpuBinaryExpression, GpuColumnVector, GpuExpression, GpuOverrides, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta} -import com.nvidia.spark.rapids.DateUtils.TimestampFormatConversionException +import com.nvidia.spark.rapids.{Arm, BinaryExprMeta, DataFromReplacementRule, DateUtils, GpuBinaryExpression, GpuColumnVector, GpuExpression, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta} import com.nvidia.spark.rapids.GpuOverrides.{extractStringLit, getTimeParserPolicy} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.shims.v2.ShimBinaryExpression import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, ExpectsInputTypes, Expression, ImplicitCastInputTypes, NullIntolerant, TimeZoneAwareExpression} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.unsafe.types.CalendarInterval @@ -395,51 +393,8 @@ abstract class UnixTimeExprMeta[A <: BinaryExpression with TimeZoneAwareExpressi extractStringLit(expr.right) match { case Some(rightLit) => sparkFormat = rightLit - if (GpuOverrides.getTimeParserPolicy == LegacyTimeParserPolicy) { - try { - // try and convert the format to cuDF format - this will throw an exception if - // the format contains unsupported characters or words - strfFormat = DateUtils.toStrf(sparkFormat, - expr.left.dataType == DataTypes.StringType) - // format parsed ok but we have no 100% compatible formats in LEGACY mode - if (GpuToTimestamp.LEGACY_COMPATIBLE_FORMATS.contains(sparkFormat)) { - // LEGACY support has a number of issues that mean we cannot guarantee - // compatibility with CPU - // - we can only support 4 digit years but Spark supports a wider range - // - we use a proleptic Gregorian calender but Spark uses a hybrid Julian+Gregorian - // calender in LEGACY mode - if (SQLConf.get.ansiEnabled) { - willNotWorkOnGpu("LEGACY format in ANSI mode is not supported on the GPU") - } else if (!conf.incompatDateFormats) { - willNotWorkOnGpu(s"LEGACY format '$sparkFormat' on the GPU is not guaranteed " + - s"to produce the same results as Spark on CPU. Set " + - s"${RapidsConf.INCOMPATIBLE_DATE_FORMATS.key}=true to force onto GPU.") - } - } else { - willNotWorkOnGpu(s"LEGACY format '$sparkFormat' is not supported on the GPU.") - } - } catch { - case e: TimestampFormatConversionException => - willNotWorkOnGpu(s"Failed to convert ${e.reason} ${e.getMessage}") - } - } else { - try { - // try and convert the format to cuDF format - this will throw an exception if - // the format contains unsupported characters or words - strfFormat = DateUtils.toStrf(sparkFormat, - expr.left.dataType == DataTypes.StringType) - // format parsed ok, so it is either compatible (tested/certified) or incompatible - if (!GpuToTimestamp.CORRECTED_COMPATIBLE_FORMATS.contains(sparkFormat) && - !conf.incompatDateFormats) { - willNotWorkOnGpu(s"CORRECTED format '$sparkFormat' on the GPU is not guaranteed " + - s"to produce the same results as Spark on CPU. Set " + - s"${RapidsConf.INCOMPATIBLE_DATE_FORMATS.key}=true to force onto GPU.") - } - } catch { - case e: TimestampFormatConversionException => - willNotWorkOnGpu(s"Failed to convert ${e.reason} ${e.getMessage}") - } - } + strfFormat = DateUtils.tagAndGetCudfFormat(this, + sparkFormat, expr.left.dataType == DataTypes.StringType) case None => willNotWorkOnGpu("format has to be a string literal") } @@ -476,7 +431,15 @@ object GpuToTimestamp extends Arm { "dd-MM" -> ParseFormatMeta('-', isTimestamp = false, raw"\A\d{2}-\d{2}\Z"), "dd/MM" -> ParseFormatMeta('/', isTimestamp = false, - raw"\A\d{2}/\d{2}\Z") + raw"\A\d{2}/\d{2}\Z"), + "MM/yyyy" -> ParseFormatMeta('/', isTimestamp = false, + raw"\A\d{2}/\d{4}\Z"), + "MM-yyyy" -> ParseFormatMeta('-', isTimestamp = false, + raw"\A\d{2}-\d{4}\Z"), + "MM/dd/yyyy" -> ParseFormatMeta('/', isTimestamp = false, + raw"\A\d{2}/\d{2}/\d{4}\Z"), + "MM-dd-yyyy" -> ParseFormatMeta('-', isTimestamp = false, + raw"\A\d{2}-\d{2}-\d{4}\Z") ) // We are compatible with Spark for these formats when the timeParserPolicy is LEGACY. It From 0c12507c921f380a6c663f9d0e6b7e1d8ac7dafc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 23 Feb 2022 20:22:05 +0000 Subject: [PATCH 12/23] fix issues from self review --- docs/compatibility.md | 1 - docs/configs.md | 1 - integration_tests/src/main/python/csv_test.py | 8 -------- .../spark/rapids/tests/mortgage/MortgageSparkSuite.scala | 1 - .../main/scala/com/nvidia/spark/rapids/DateUtils.scala | 6 +++--- .../scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala | 4 ---- .../main/scala/com/nvidia/spark/rapids/RapidsConf.scala | 7 ------- .../main/scala/com/nvidia/spark/rapids/TypeChecks.scala | 1 - .../spark/sql/catalyst/json/rapids/GpuJsonScan.scala | 4 ---- .../nvidia/spark/rapids/SparkQueryCompareTestSuite.scala | 1 - tools/src/main/resources/supportedDataSource.csv | 2 +- 11 files changed, 4 insertions(+), 32 deletions(-) diff --git a/docs/compatibility.md b/docs/compatibility.md index 85946b3c4ad..2b43fbce610 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -283,7 +283,6 @@ will produce a different result compared to the plugin. Due to inconsistencies between how CSV data is parsed CSV parsing is off by default. Each data type can be enabled or disabled independently using the following configs. - * [spark.rapids.sql.csv.read.date.enabled](configs.md#sql.csv.read.date.enabled) * [spark.rapids.sql.csvTimestamps.enabled](configs.md#sql.csvTimestamps.enabled) If you know that your particular data type will be parsed correctly enough, you may enable each diff --git a/docs/configs.md b/docs/configs.md index a6e826a98aa..b93be64d275 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -67,7 +67,6 @@ Name | Description | Default Value spark.rapids.sql.castStringToFloat.enabled|When set to true, enables casting from strings to float types (float, double) on the GPU. Currently hex values aren't supported on the GPU. Also note that casting from string to float types on the GPU returns incorrect results when the string represents any number "1.7976931348623158E308" <= x < "1.7976931348623159E308" and "-1.7976931348623158E308" >= x > "-1.7976931348623159E308" in both these cases the GPU returns Double.MaxValue while CPU returns "+Infinity" and "-Infinity" respectively|false spark.rapids.sql.castStringToTimestamp.enabled|When set to true, casting from string to timestamp is supported on the GPU. The GPU only supports a subset of formats when casting strings to timestamps. Refer to the CAST documentation for more details.|false spark.rapids.sql.concurrentGpuTasks|Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors.|1 -spark.rapids.sql.csv.read.date.enabled|Parsing invalid CSV dates produces different results from Spark|false spark.rapids.sql.csvTimestamps.enabled|When set to true, enables the CSV parser to read timestamps. The default output format for Spark includes a timezone at the end. Anything except the UTC timezone is not supported. Timestamps after 2038 and before 1902 are also not supported.|false spark.rapids.sql.decimalOverflowGuarantees|FOR TESTING ONLY. DO NOT USE IN PRODUCTION. Please see the decimal section of the compatibility documents for more information on this config.|true spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true diff --git a/integration_tests/src/main/python/csv_test.py b/integration_tests/src/main/python/csv_test.py index 624b385a87d..77f37d3e39a 100644 --- a/integration_tests/src/main/python/csv_test.py +++ b/integration_tests/src/main/python/csv_test.py @@ -167,14 +167,6 @@ StructField('ignored_b', StringType())]) _enable_all_types_conf = {'spark.rapids.sql.csvTimestamps.enabled': 'true', - 'spark.rapids.sql.csv.read.bool.enabled': 'true', - 'spark.rapids.sql.csv.read.date.enabled': 'true', - 'spark.rapids.sql.csv.read.byte.enabled': 'true', - 'spark.rapids.sql.csv.read.short.enabled': 'true', - 'spark.rapids.sql.csv.read.integer.enabled': 'true', - 'spark.rapids.sql.csv.read.long.enabled': 'true', - 'spark.rapids.sql.csv.read.float.enabled': 'true', - 'spark.rapids.sql.csv.read.double.enabled': 'true', 'spark.sql.legacy.timeParserPolicy': 'CORRECTED'} def read_csv_df(data_path, schema, options = {}): diff --git a/integration_tests/src/test/scala/com/nvidia/spark/rapids/tests/mortgage/MortgageSparkSuite.scala b/integration_tests/src/test/scala/com/nvidia/spark/rapids/tests/mortgage/MortgageSparkSuite.scala index f1265e970d9..16663c3e19f 100644 --- a/integration_tests/src/test/scala/com/nvidia/spark/rapids/tests/mortgage/MortgageSparkSuite.scala +++ b/integration_tests/src/test/scala/com/nvidia/spark/rapids/tests/mortgage/MortgageSparkSuite.scala @@ -47,7 +47,6 @@ class MortgageSparkSuite extends FunSuite { .config("spark.rapids.sql.test.enabled", false) .config("spark.rapids.sql.incompatibleOps.enabled", true) .config("spark.rapids.sql.hasNans", false) - .config("spark.rapids.sql.csv.read.date.enabled", true) val rapidsShuffle = ShimLoader.getRapidsShuffleManagerClass val prop = System.getProperty("rapids.shuffle.manager.override", "false") if (prop.equalsIgnoreCase("true")) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala index 54da70121af..1e4abee9893 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala @@ -215,9 +215,9 @@ object DateUtils { case class TimestampFormatConversionException(reason: String) extends Exception def tagAndGetCudfFormat( - meta: RapidsMeta[_, _, _], - sparkFormat: String, - parseString: Boolean): String = { + meta: RapidsMeta[_, _, _], + sparkFormat: String, + parseString: Boolean): String = { var strfFormat = "" if (GpuOverrides.getTimeParserPolicy == LegacyTimeParserPolicy) { try { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala index 360400f8078..fcee28ee15b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala @@ -211,10 +211,6 @@ object GpuCSVScan { // parsedOptions.maxColumns was originally a performance optimization but is not used any more if (readSchema.map(_.dataType).contains(DateType)) { - if (!meta.conf.isCsvDateReadEnabled) { - meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading dates. " + - s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_DATES} to true.") - } ShimLoader.getSparkShims.dateFormatInRead(parsedOptions).foreach { dateFormat => if (!supportedDateFormats.contains(dateFormat)) { meta.willNotWorkOnGpu(s"the date format '${dateFormat}' is not supported'") diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index d403dbf653d..f3dcab1a115 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -885,11 +885,6 @@ object RapidsConf { .booleanConf .createWithDefault(false) - val ENABLE_READ_CSV_DATES = conf("spark.rapids.sql.csv.read.date.enabled") - .doc("Parsing invalid CSV dates produces different results from Spark") - .booleanConf - .createWithDefault(false) - val ENABLE_JSON = conf("spark.rapids.sql.format.json.enabled") .doc("When set to true enables all json input and output acceleration. " + "(only input is currently supported anyways)") @@ -1578,8 +1573,6 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isCsvTimestampReadEnabled: Boolean = get(ENABLE_CSV_TIMESTAMPS) - lazy val isCsvDateReadEnabled: Boolean = get(ENABLE_READ_CSV_DATES) - lazy val isCastDecimalToStringEnabled: Boolean = get(ENABLE_CAST_DECIMAL_TO_STRING) lazy val isProjectAstEnabled: Boolean = get(ENABLE_PROJECT_AST) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala index 0ebfc072568..74fbc9ee21b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala @@ -2116,7 +2116,6 @@ object SupportedOpsForTools { val typeEnabled = if (format.toString.toLowerCase.equals("csv")) { t.toString match { case "TIMESTAMP" => conf.isCsvTimestampReadEnabled - case "DATE" => conf.isCsvDateReadEnabled case _ => true } } else { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala index b8603ff7f7a..802445aef95 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala @@ -147,10 +147,6 @@ object GpuJsonScan { }) if (readSchema.map(_.dataType).contains(DateType)) { -// if (GpuOverrides.getTimeParserPolicy == LegacyTimeParserPolicy) { -// // https://github.com/NVIDIA/spark-rapids/issues/4849 -// meta.willNotWorkOnGpu("GpuJsonScan does not support LEGACY timeParserPolicy") -// } ShimLoader.getSparkShims.dateFormatInRead(parsedOptions).foreach { dateFormat => DateUtils.tagAndGetCudfFormat(meta, dateFormat, parseString = true) } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala index ea0828a8d64..a6dd6124e06 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala @@ -150,7 +150,6 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm { def enableCsvConf(): SparkConf = { new SparkConf() - .set(RapidsConf.ENABLE_READ_CSV_DATES.key, "true") } // @see java.lang.Float#intBitsToFloat diff --git a/tools/src/main/resources/supportedDataSource.csv b/tools/src/main/resources/supportedDataSource.csv index 81833809620..bef3ceae4df 100644 --- a/tools/src/main/resources/supportedDataSource.csv +++ b/tools/src/main/resources/supportedDataSource.csv @@ -1,5 +1,5 @@ Format,Direction,BOOLEAN,BYTE,SHORT,INT,LONG,FLOAT,DOUBLE,DATE,TIMESTAMP,STRING,DECIMAL,NULL,BINARY,CALENDAR,ARRAY,MAP,STRUCT,UDT -CSV,read,S,S,S,S,S,S,S,CO,CO,S,S,NA,NS,NA,NA,NA,NA,NA +CSV,read,S,S,S,S,S,S,S,S,CO,S,S,NA,NS,NA,NA,NA,NA,NA JSON,read,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO ORC,read,S,S,S,S,S,S,S,S,PS,S,S,NA,NS,NA,PS,PS,PS,NS ORC,write,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA From 689364e47d84158cfb1226db48dde2129a9deae5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 23 Feb 2022 15:05:44 -0700 Subject: [PATCH 13/23] more tests for newly supported date formats --- .../spark/rapids/ParseDateTimeSuite.scala | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala index fdce30bd613..389b3e1c91b 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -88,6 +88,18 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite with BeforeAndAfterE df => df.withColumn("c1", to_date(col("c0"), "dd/MM/yyyy")) } + testSparkResultsAreEqual("to_date MM/dd/yyyy", + datesAsStrings, + conf = CORRECTED_TIME_PARSER_POLICY) { + df => df.withColumn("c1", to_date(col("c0"), "MM/dd/yyyy")) + } + + testSparkResultsAreEqual("to_date yyyy/MM", + datesAsStrings, + conf = CORRECTED_TIME_PARSER_POLICY) { + df => df.withColumn("c1", to_date(col("c0"), "yyyy/MM")) + } + testSparkResultsAreEqual("to_date parse date", dates, conf = CORRECTED_TIME_PARSER_POLICY) { @@ -350,7 +362,10 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite with BeforeAndAfterE "1999-1-1 11:12:3.", "1999-1-1 11:2:3.", "1999-1-1 11:2:13.", - "1999-1-1 1:2:3.4") + "1999-1-1 1:2:3.4", + "12-01-1999", + "01-12-1999", + "1999-12") private val timestampValues = Seq( "", From 6487d22608556f5780902a7f4b69dcf0db61b31e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 23 Feb 2022 16:59:28 -0700 Subject: [PATCH 14/23] Make CSV date parsing consistent with Spark and improve tests --- integration_tests/src/main/python/csv_test.py | 31 ++++++++++++++++--- .../spark/rapids/GpuBatchScanExec.scala | 4 +-- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/integration_tests/src/main/python/csv_test.py b/integration_tests/src/main/python/csv_test.py index 77f37d3e39a..ac55b8ff2ba 100644 --- a/integration_tests/src/main/python/csv_test.py +++ b/integration_tests/src/main/python/csv_test.py @@ -14,7 +14,8 @@ import pytest -from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_gpu_fallback_write, assert_cpu_and_gpu_are_equal_collect_with_capture +from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_gpu_fallback_write, \ + assert_cpu_and_gpu_are_equal_collect_with_capture, assert_gpu_fallback_collect from conftest import get_non_gpu_allowed from datetime import datetime, timezone from data_gen import * @@ -308,16 +309,38 @@ def test_csv_fallback(spark_tmp_path, read_func, disable_conf): 'MM-yyyy', 'MM/yyyy', 'MM-dd-yyyy', 'MM/dd/yyyy'] @pytest.mark.parametrize('date_format', csv_supported_date_formats, ids=idfn) @pytest.mark.parametrize('v1_enabled_list', ["", "csv"]) -def test_date_formats_round_trip(spark_tmp_path, date_format, v1_enabled_list): +@pytest.mark.parametrize('ansi_enabled', ["true", "false"]) +@pytest.mark.parametrize('time_parser_policy', [ + pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('BatchScanExec,FileSourceScanExec')), + 'CORRECTED', + 'EXCEPTION' +]) +def test_date_formats_round_trip(spark_tmp_path, date_format, v1_enabled_list, ansi_enabled, time_parser_policy): gen = StructGen([('a', DateGen())], nullable=False) data_path = spark_tmp_path + '/CSV_DATA' schema = gen.data_type - updated_conf = copy_and_update(_enable_all_types_conf, {'spark.sql.sources.useV1SourceList': v1_enabled_list}) + updated_conf = copy_and_update(_enable_all_types_conf, + {'spark.sql.sources.useV1SourceList': v1_enabled_list, + 'spark.sql.ansi.enabled': ansi_enabled, + 'spark.rapids.sql.incompatibleDateFormats.enabled': True, + 'spark.sql.legacy.timeParserPolicy': time_parser_policy}) with_cpu_session( lambda spark : gen_df(spark, gen).write\ .option('dateFormat', date_format)\ .csv(data_path)) - assert_gpu_and_cpu_are_equal_collect( + if time_parser_policy == 'LEGACY' and date_format not in ['yyyy-MM-dd', 'yyyy/MM/dd']: + expected_class = 'FileSourceScanExec' + if v1_enabled_list == '': + expected_class = 'BatchScanExec' + assert_gpu_fallback_collect( + lambda spark : spark.read \ + .schema(schema) \ + .option('dateFormat', date_format) \ + .csv(data_path), + expected_class, + conf=updated_conf) + else: + assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read\ .schema(schema)\ .option('dateFormat', date_format)\ diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala index fcee28ee15b..a4239043ba2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala @@ -212,9 +212,7 @@ object GpuCSVScan { if (readSchema.map(_.dataType).contains(DateType)) { ShimLoader.getSparkShims.dateFormatInRead(parsedOptions).foreach { dateFormat => - if (!supportedDateFormats.contains(dateFormat)) { - meta.willNotWorkOnGpu(s"the date format '${dateFormat}' is not supported'") - } + DateUtils.tagAndGetCudfFormat(meta, dateFormat, parseString = true) } } From b7fb3660c5ff64e7c513c252057d2d7f70144a60 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 24 Feb 2022 14:48:04 -0700 Subject: [PATCH 15/23] fall back to CPU when reading CSV with timeParserPolicy=LEGACY --- integration_tests/src/main/python/csv_test.py | 34 +++++++++++++++++-- .../src/test/resources/dates_invalid.json | 3 +- .../spark/rapids/GpuBatchScanExec.scala | 7 ++++ 3 files changed, 41 insertions(+), 3 deletions(-) diff --git a/integration_tests/src/main/python/csv_test.py b/integration_tests/src/main/python/csv_test.py index ac55b8ff2ba..94cfc6c73d6 100644 --- a/integration_tests/src/main/python/csv_test.py +++ b/integration_tests/src/main/python/csv_test.py @@ -14,7 +14,7 @@ import pytest -from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_gpu_fallback_write, \ +from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error, assert_gpu_fallback_write, \ assert_cpu_and_gpu_are_equal_collect_with_capture, assert_gpu_fallback_collect from conftest import get_non_gpu_allowed from datetime import datetime, timezone @@ -328,7 +328,7 @@ def test_date_formats_round_trip(spark_tmp_path, date_format, v1_enabled_list, a lambda spark : gen_df(spark, gen).write\ .option('dateFormat', date_format)\ .csv(data_path)) - if time_parser_policy == 'LEGACY' and date_format not in ['yyyy-MM-dd', 'yyyy/MM/dd']: + if time_parser_policy == 'LEGACY': expected_class = 'FileSourceScanExec' if v1_enabled_list == '': expected_class = 'BatchScanExec' @@ -347,6 +347,36 @@ def test_date_formats_round_trip(spark_tmp_path, date_format, v1_enabled_list, a .csv(data_path), conf=updated_conf) +@pytest.mark.parametrize('filename', ["date.csv"]) +@pytest.mark.parametrize('v1_enabled_list', ["", "csv"]) +@pytest.mark.parametrize('ansi_enabled', ["true", "false"]) +@pytest.mark.parametrize('time_parser_policy', [ + pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('BatchScanExec,FileSourceScanExec')), + 'CORRECTED', + 'EXCEPTION' +]) +def test_read_valid_and_invalid_dates(std_input_path, filename, v1_enabled_list, ansi_enabled, time_parser_policy): + data_path = std_input_path + '/' + filename + updated_conf = copy_and_update(_enable_all_types_conf, + {'spark.sql.sources.useV1SourceList': v1_enabled_list, + 'spark.sql.ansi.enabled': ansi_enabled, + 'spark.rapids.sql.incompatibleDateFormats.enabled': True, + 'spark.sql.legacy.timeParserPolicy': time_parser_policy}) + if time_parser_policy == 'EXCEPTION': + assert_gpu_and_cpu_error( + lambda spark : spark.read \ + .schema(_date_schema) \ + .csv(data_path) + .collect(), + conf=updated_conf, + error_message='DateTimeException') + else: + assert_gpu_and_cpu_are_equal_collect( + lambda spark : spark.read \ + .schema(_date_schema) \ + .csv(data_path), + conf=updated_conf) + csv_supported_ts_parts = ['', # Just the date "'T'HH:mm:ss.SSSXXX", "'T'HH:mm:ss[.SSS][XXX]", diff --git a/integration_tests/src/test/resources/dates_invalid.json b/integration_tests/src/test/resources/dates_invalid.json index 6ecfd8f04b9..e62913a340a 100644 --- a/integration_tests/src/test/resources/dates_invalid.json +++ b/integration_tests/src/test/resources/dates_invalid.json @@ -1 +1,2 @@ -{ "number": "2020-09-32" } \ No newline at end of file +{ "number": "2020-09-32" } +{ "number": "2020-50-16" } \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala index a4239043ba2..00d5f1cba06 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.datasources.csv.CSVDataSource import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.rapids.LegacyTimeParserPolicy import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.vectorized.ColumnarBatch @@ -211,6 +212,12 @@ object GpuCSVScan { // parsedOptions.maxColumns was originally a performance optimization but is not used any more if (readSchema.map(_.dataType).contains(DateType)) { + if (GpuOverrides.getTimeParserPolicy == LegacyTimeParserPolicy) { + // Spark's CSV parser will parse the string "2020-50-16" to the date 2024/02/16 when + // timeParserPolicy is set to LEGACY mode and we would reject this as an invalid date + // so we fall back to CPU + meta.willNotWorkOnGpu(s"GpuCSVScan does not support timeParserPolicy=LEGACY") + } ShimLoader.getSparkShims.dateFormatInRead(parsedOptions).foreach { dateFormat => DateUtils.tagAndGetCudfFormat(meta, dateFormat, parseString = true) } From c6e3af2aff4cd47485270f33feb18bd49d185241 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 24 Feb 2022 14:50:29 -0700 Subject: [PATCH 16/23] update docs --- docs/compatibility.md | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/docs/compatibility.md b/docs/compatibility.md index 2b43fbce610..d7c3f211974 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -322,17 +322,14 @@ Only a limited set of formats are supported when parsing dates. * `"MM/yyyy"` * `"MM-dd-yyyy"` * `"MM/dd/yyyy"` +* `"dd-MM-yyyy"` +* `"dd/MM/yyyy"` The reality is that all of these formats are supported at the same time. The plugin will only disable itself if you set a format that it does not support. As a workaround you can parse the column as a timestamp and then cast it to a date. -Invalid dates in Spark, values that have the correct format, but the numbers produce invalid dates, -can result in an exception by default, and how they are parsed can be controlled through a config. -The RAPIDS Accelerator does not support any of this and will produce an incorrect date. Typically, -one that overflowed. - ### CSV Timestamps The CSV parser does not support time zones. It will ignore any trailing time zone information, despite the format asking for a `XXX` or `[XXX]`. As such it is off by default and you can enable it From 320fb56644a31dda62aae2e43b341054a7807068 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 7 Mar 2022 11:24:22 -0700 Subject: [PATCH 17/23] Address PR feedback --- integration_tests/src/main/python/json_test.py | 4 +++- .../main/scala/com/nvidia/spark/rapids/DateUtils.scala | 8 ++++---- .../nvidia/spark/rapids/GpuTextBasedPartitionReader.scala | 4 ++-- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index baf7d856b71..8d32add2801 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -210,7 +210,9 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena 'dates.json', 'dates_invalid.json', ]) -@pytest.mark.parametrize('schema', [_bool_schema, _byte_schema, _short_schema, _int_schema, _long_schema, _float_schema, _double_schema, _decimal_10_2_schema, _decimal_10_3_schema, _date_schema]) +@pytest.mark.parametrize('schema', [_bool_schema, _byte_schema, _short_schema, _int_schema, _long_schema, \ + _float_schema, _double_schema, _decimal_10_2_schema, _decimal_10_3_schema, \ + _date_schema]) @pytest.mark.parametrize('read_func', [read_json_df, read_json_sql]) @pytest.mark.parametrize('allow_non_numeric_numbers', ["true", "false"]) @pytest.mark.parametrize('allow_numeric_leading_zeros', ["true"]) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala index 1e4abee9893..587410eee5e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala @@ -218,12 +218,12 @@ object DateUtils { meta: RapidsMeta[_, _, _], sparkFormat: String, parseString: Boolean): String = { - var strfFormat = "" + var strfFormat: Option[String] = None if (GpuOverrides.getTimeParserPolicy == LegacyTimeParserPolicy) { try { // try and convert the format to cuDF format - this will throw an exception if // the format contains unsupported characters or words - strfFormat = toStrf(sparkFormat, parseString) + strfFormat = Some(toStrf(sparkFormat, parseString)) // format parsed ok but we have no 100% compatible formats in LEGACY mode if (GpuToTimestamp.LEGACY_COMPATIBLE_FORMATS.contains(sparkFormat)) { // LEGACY support has a number of issues that mean we cannot guarantee @@ -249,7 +249,7 @@ object DateUtils { try { // try and convert the format to cuDF format - this will throw an exception if // the format contains unsupported characters or words - strfFormat = toStrf(sparkFormat, parseString) + strfFormat = Some(toStrf(sparkFormat, parseString)) // format parsed ok, so it is either compatible (tested/certified) or incompatible if (!GpuToTimestamp.CORRECTED_COMPATIBLE_FORMATS.contains(sparkFormat) && !meta.conf.incompatDateFormats) { @@ -262,6 +262,6 @@ object DateUtils { meta.willNotWorkOnGpu(s"Failed to convert ${e.reason} ${e.getMessage}") } } - strfFormat + strfFormat.get } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala index 3498c1b7e1c..d1317f14a6f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala @@ -218,7 +218,7 @@ abstract class GpuTextBasedPartitionReader( case dt: DecimalType => castStringToDecimal(table.getColumn(i), dt) case DataTypes.DateType => - castStringToDate(table.getColumn(i), ansiEnabled = false) + castStringToDate(table.getColumn(i)) case _ => table.getColumn(i).incRefCount() } @@ -237,7 +237,7 @@ abstract class GpuTextBasedPartitionReader( def dateFormat: String - def castStringToDate(input: ColumnVector, ansiEnabled: Boolean): ColumnVector = { + def castStringToDate(input: ColumnVector): ColumnVector = { val cudfFormat = DateUtils.toStrf(dateFormat, parseString = true) withResource(input.isTimestamp(cudfFormat)) { isDate => if (GpuOverrides.getTimeParserPolicy == ExceptionTimeParserPolicy) { From 5621356ef4a6cdf1fcb7541c2ee591b500eaa3ef Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 7 Mar 2022 11:35:34 -0700 Subject: [PATCH 18/23] update docs and add tests for issue 1091 --- docs/compatibility.md | 7 ------- tests/src/test/resources/timestamps.csv | 6 ++++++ .../com/nvidia/spark/rapids/CsvScanSuite.scala | 16 +++++++++++++++- .../rapids/SparkQueryCompareTestSuite.scala | 7 +++++++ 4 files changed, 28 insertions(+), 8 deletions(-) create mode 100644 tests/src/test/resources/timestamps.csv diff --git a/docs/compatibility.md b/docs/compatibility.md index 04c91f84f63..da4311b2152 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -309,8 +309,6 @@ Escaped quote characters `'\"'` are not supported well as described by this [issue](https://github.com/NVIDIA/spark-rapids/issues/129). ### CSV Dates -Parsing a `timestamp` as a `date` does not work. The details are documented in this -[issue](https://github.com/NVIDIA/spark-rapids/issues/869). Only a limited set of formats are supported when parsing dates. @@ -325,11 +323,6 @@ Only a limited set of formats are supported when parsing dates. * `"dd-MM-yyyy"` * `"dd/MM/yyyy"` -The reality is that all of these formats are supported at the same time. The plugin will only -disable itself if you set a format that it does not support. - -As a workaround you can parse the column as a timestamp and then cast it to a date. - ### CSV Timestamps The CSV parser does not support time zones. It will ignore any trailing time zone information, despite the format asking for a `XXX` or `[XXX]`. As such it is off by default and you can enable it diff --git a/tests/src/test/resources/timestamps.csv b/tests/src/test/resources/timestamps.csv new file mode 100644 index 00000000000..d3093deb96d --- /dev/null +++ b/tests/src/test/resources/timestamps.csv @@ -0,0 +1,6 @@ +2019-01-03T12:34:56.123456,1 +2019-01-03T12:34:56.123456,1 +2019-01-03T12:34:56.123456,1 +2019-01-05T12:34:56.123456,2 +2019-01-05T12:34:56.123456,3 +2019-01-06T12:34:56.123456,6 \ No newline at end of file diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/CsvScanSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/CsvScanSuite.scala index e082e61354c..531bffc7d92 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/CsvScanSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/CsvScanSuite.scala @@ -17,7 +17,7 @@ package com.nvidia.spark.rapids import org.apache.spark.SparkConf -import org.apache.spark.sql.functions.col +import org.apache.spark.sql.functions.{col, date_add, lit} class CsvScanSuite extends SparkQueryCompareTestSuite { testExpectedException[IllegalArgumentException]("Test CSV projection including unsupported types", @@ -61,4 +61,18 @@ class CsvScanSuite extends SparkQueryCompareTestSuite { conf = new SparkConf()) { frame => frame.select(col("*")) } + + testSparkResultsAreEqual( + "Test CSV parse dates", + datesCsvDf, + conf=new SparkConf()) { + df => df.withColumn("next_day", date_add(col("dates"), lit(1))) + } + + testSparkResultsAreEqual( + "Test CSV parse timestamps as dates", + timestampsAsDatesCsvDf, + conf=new SparkConf()) { + df => df.withColumn("next_day", date_add(col("dates"), lit(1))) + } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala index a6dd6124e06..6d4923f9a20 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala @@ -1740,6 +1740,13 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm { )))(_) } + def timestampsAsDatesCsvDf= { + fromCsvDf("timestamps.csv", StructType(Array( + StructField("dates", DateType, false), + StructField("ints", IntegerType, false) + )))(_) + } + private def setNullableStateForAllColumns(df: DataFrame, nullable: Boolean) : DataFrame = { // get schema val schema = df.schema From 28fbe69b0c3fbd4173ad9e772aeda120e48ca729 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 7 Mar 2022 14:11:17 -0700 Subject: [PATCH 19/23] fix regression --- .../main/scala/com/nvidia/spark/rapids/DateUtils.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala index 587410eee5e..4ea21ef92aa 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala @@ -218,12 +218,12 @@ object DateUtils { meta: RapidsMeta[_, _, _], sparkFormat: String, parseString: Boolean): String = { - var strfFormat: Option[String] = None + var strfFormat: String = null if (GpuOverrides.getTimeParserPolicy == LegacyTimeParserPolicy) { try { // try and convert the format to cuDF format - this will throw an exception if // the format contains unsupported characters or words - strfFormat = Some(toStrf(sparkFormat, parseString)) + strfFormat = toStrf(sparkFormat, parseString) // format parsed ok but we have no 100% compatible formats in LEGACY mode if (GpuToTimestamp.LEGACY_COMPATIBLE_FORMATS.contains(sparkFormat)) { // LEGACY support has a number of issues that mean we cannot guarantee @@ -249,7 +249,7 @@ object DateUtils { try { // try and convert the format to cuDF format - this will throw an exception if // the format contains unsupported characters or words - strfFormat = Some(toStrf(sparkFormat, parseString)) + strfFormat = toStrf(sparkFormat, parseString) // format parsed ok, so it is either compatible (tested/certified) or incompatible if (!GpuToTimestamp.CORRECTED_COMPATIBLE_FORMATS.contains(sparkFormat) && !meta.conf.incompatDateFormats) { @@ -262,6 +262,6 @@ object DateUtils { meta.willNotWorkOnGpu(s"Failed to convert ${e.reason} ${e.getMessage}") } } - strfFormat.get + strfFormat } } From e0b4a44b062ff619cf194e69c988c420a73b599f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 8 Mar 2022 09:42:37 -0700 Subject: [PATCH 20/23] Add shim code for getting dateFormat from CSV and JSON read options --- .../spark/sql/catalyst/csv/GpuCsvUtils.scala | 21 ++++++++++++++++ .../sql/catalyst/json/GpuJsonUtils.scala | 21 ++++++++++++++++ .../spark/sql/catalyst/csv/GpuCsvUtils.scala | 24 +++++++++++++++++++ .../sql/catalyst/json/GpuJsonUtils.scala | 24 +++++++++++++++++++ .../spark/rapids/GpuBatchScanExec.scala | 4 ++-- .../catalyst/json/rapids/GpuJsonScan.scala | 4 ++-- 6 files changed, 94 insertions(+), 4 deletions(-) create mode 100644 sql-plugin/src/main/301until330-all/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala create mode 100644 sql-plugin/src/main/301until330-all/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala create mode 100644 sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala create mode 100644 sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala diff --git a/sql-plugin/src/main/301until330-all/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala b/sql-plugin/src/main/301until330-all/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala new file mode 100644 index 00000000000..b8736640a9f --- /dev/null +++ b/sql-plugin/src/main/301until330-all/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.csv + +object GpuCsvUtils { + def dateFormatInRead(options: CSVOptions): String = options.dateFormat +} diff --git a/sql-plugin/src/main/301until330-all/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala b/sql-plugin/src/main/301until330-all/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala new file mode 100644 index 00000000000..b22da8a4f71 --- /dev/null +++ b/sql-plugin/src/main/301until330-all/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json + +object GpuJsonUtils { + def dateFormatInRead(options: JSONOptions): String = options.dateFormat +} diff --git a/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala b/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala new file mode 100644 index 00000000000..2b7e5b2193a --- /dev/null +++ b/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.csv + +import org.apache.spark.sql.catalyst.util.DateFormatter + +object GpuCsvUtils { + def dateFormatInRead(options: CSVOptions): String = + options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern) +} diff --git a/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala b/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala new file mode 100644 index 00000000000..cd112da4e7a --- /dev/null +++ b/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json + +import org.apache.spark.sql.catalyst.util.DateFormatter + +object GpuJsonUtils { + def dateFormatInRead(options: JSONOptions): String = + options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern) +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala index 00d5f1cba06..9471c4b8dee 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala @@ -29,7 +29,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.csv.CSVOptions +import org.apache.spark.sql.catalyst.csv.{CSVOptions, GpuCsvUtils} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.util.PermissiveMode @@ -413,5 +413,5 @@ class CSVPartitionReader( } } - override def dateFormat: String = parsedOptions.dateFormat + override def dateFormat: String = GpuCsvUtils.dateFormatInRead(parsedOptions) } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala index 802445aef95..64810bf9551 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala @@ -30,7 +30,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.json.{JSONOptions, JSONOptionsInRead} +import org.apache.spark.sql.catalyst.json.{GpuJsonUtils, JSONOptions, JSONOptionsInRead} import org.apache.spark.sql.catalyst.util.PermissiveMode import org.apache.spark.sql.connector.read.{PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.execution.QueryExecutionException @@ -411,6 +411,6 @@ class JsonPartitionReader( } } - override def dateFormat: String = parsedOptions.dateFormat + override def dateFormat: String = GpuJsonUtils.dateFormatInRead(parsedOptions) } From f0599ab5c9bd81a17656d35ded76adff6ef48856 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 8 Mar 2022 11:40:40 -0700 Subject: [PATCH 21/23] XFAIL some tests with Spark 3.3.0 with LEGACY timeParserPolicy --- integration_tests/src/main/python/json_test.py | 8 +++++--- integration_tests/src/main/python/spark_session.py | 3 +++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index 8d32add2801..0a4fb0a9c6b 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -19,7 +19,7 @@ from conftest import is_databricks_runtime from marks import approximate_float, allow_non_gpu, ignore_order -from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330 +from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330, is_spark_330_or_later json_supported_gens = [ # Spark does not escape '\r' or '\n' even though it uses it to mark end of record @@ -236,7 +236,8 @@ def test_basic_json_read(std_input_path, filename, schema, read_func, allow_non_ @pytest.mark.parametrize('read_func', [read_json_df, read_json_sql]) @pytest.mark.parametrize('ansi_enabled', ["true", "false"]) @pytest.mark.parametrize('time_parser_policy', [ - pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('FileSourceScanExec')), + pytest.param('LEGACY', marks=[pytest.mark.allow_non_gpu('FileSourceScanExec'), \ + pytest.mark.xfail(not is_before_spark_330(), reason="https://github.com/NVIDIA/spark-rapids/issues/4912")]), 'CORRECTED', 'EXCEPTION' ]) @@ -262,7 +263,8 @@ def test_json_read_valid_dates(std_input_path, filename, schema, read_func, ansi @pytest.mark.parametrize('read_func', [read_json_df, read_json_sql]) @pytest.mark.parametrize('ansi_enabled', ["true", "false"]) @pytest.mark.parametrize('time_parser_policy', [ - pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('FileSourceScanExec')), + pytest.param('LEGACY', marks=[pytest.mark.allow_non_gpu('FileSourceScanExec'), \ + pytest.mark.xfail(is_spark_330_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/4912")]), 'CORRECTED', 'EXCEPTION' ]) diff --git a/integration_tests/src/main/python/spark_session.py b/integration_tests/src/main/python/spark_session.py index 099b7b9b9b5..dfbdd71c64f 100644 --- a/integration_tests/src/main/python/spark_session.py +++ b/integration_tests/src/main/python/spark_session.py @@ -117,6 +117,9 @@ def is_before_spark_320(): def is_before_spark_330(): return spark_version() < "3.3.0" +def is_spark_330_or_later(): + return spark_version() >= "3.3.0" + def is_databricks91_or_later(): spark = get_spark_i_know_what_i_am_doing() return spark.conf.get("spark.databricks.clusterUsageTags.sparkVersion", "") >= "9.1" From c5e6c5850a53c3081eba1980258d5a8b1873f875 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 8 Mar 2022 11:41:54 -0700 Subject: [PATCH 22/23] use is_spark_330_or_later --- integration_tests/src/main/python/json_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index 0a4fb0a9c6b..44d2a56d413 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -237,7 +237,7 @@ def test_basic_json_read(std_input_path, filename, schema, read_func, allow_non_ @pytest.mark.parametrize('ansi_enabled', ["true", "false"]) @pytest.mark.parametrize('time_parser_policy', [ pytest.param('LEGACY', marks=[pytest.mark.allow_non_gpu('FileSourceScanExec'), \ - pytest.mark.xfail(not is_before_spark_330(), reason="https://github.com/NVIDIA/spark-rapids/issues/4912")]), + pytest.mark.xfail(is_spark_330_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/4912")]), 'CORRECTED', 'EXCEPTION' ]) From 8b6796f28522a1105b1abaaf74b2ffed823058cd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 8 Mar 2022 14:52:35 -0700 Subject: [PATCH 23/23] Tests pass with 3.3.0 --- integration_tests/src/main/python/json_test.py | 6 ++---- .../shims/v2/Spark30Xuntil33XFileOptionsShims.scala | 8 -------- .../json/rapids/shims/v2/Spark33XFileOptionsShims.scala | 8 -------- .../scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala | 5 ++--- .../main/scala/com/nvidia/spark/rapids/SparkShims.scala | 3 --- .../spark/sql/catalyst/json/rapids/GpuJsonScan.scala | 5 ++--- 6 files changed, 6 insertions(+), 29 deletions(-) diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index 44d2a56d413..3088bec714b 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -236,8 +236,7 @@ def test_basic_json_read(std_input_path, filename, schema, read_func, allow_non_ @pytest.mark.parametrize('read_func', [read_json_df, read_json_sql]) @pytest.mark.parametrize('ansi_enabled', ["true", "false"]) @pytest.mark.parametrize('time_parser_policy', [ - pytest.param('LEGACY', marks=[pytest.mark.allow_non_gpu('FileSourceScanExec'), \ - pytest.mark.xfail(is_spark_330_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/4912")]), + pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('FileSourceScanExec')), 'CORRECTED', 'EXCEPTION' ]) @@ -263,8 +262,7 @@ def test_json_read_valid_dates(std_input_path, filename, schema, read_func, ansi @pytest.mark.parametrize('read_func', [read_json_df, read_json_sql]) @pytest.mark.parametrize('ansi_enabled', ["true", "false"]) @pytest.mark.parametrize('time_parser_policy', [ - pytest.param('LEGACY', marks=[pytest.mark.allow_non_gpu('FileSourceScanExec'), \ - pytest.mark.xfail(is_spark_330_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/4912")]), + pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('FileSourceScanExec')), 'CORRECTED', 'EXCEPTION' ]) diff --git a/sql-plugin/src/main/301until330-all/scala/org/apache/spark/sql/catalyst/json/rapids/shims/v2/Spark30Xuntil33XFileOptionsShims.scala b/sql-plugin/src/main/301until330-all/scala/org/apache/spark/sql/catalyst/json/rapids/shims/v2/Spark30Xuntil33XFileOptionsShims.scala index b68404a5887..8789ceaf287 100644 --- a/sql-plugin/src/main/301until330-all/scala/org/apache/spark/sql/catalyst/json/rapids/shims/v2/Spark30Xuntil33XFileOptionsShims.scala +++ b/sql-plugin/src/main/301until330-all/scala/org/apache/spark/sql/catalyst/json/rapids/shims/v2/Spark30Xuntil33XFileOptionsShims.scala @@ -23,14 +23,6 @@ import org.apache.spark.sql.catalyst.json.JSONOptions trait Spark30Xuntil33XFileOptionsShims extends SparkShims { - def dateFormatInRead(fileOptions: Serializable): Option[String] = { - fileOptions match { - case csvOpts: CSVOptions => Option(csvOpts.dateFormat) - case jsonOpts: JSONOptions => Option(jsonOpts.dateFormat) - case _ => throw new RuntimeException("Wrong file options.") - } - } - def timestampFormatInRead(fileOptions: Serializable): Option[String] = { fileOptions match { case csvOpts: CSVOptions => Option(csvOpts.timestampFormat) diff --git a/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/json/rapids/shims/v2/Spark33XFileOptionsShims.scala b/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/json/rapids/shims/v2/Spark33XFileOptionsShims.scala index b762f5975a8..5c1636a994b 100644 --- a/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/json/rapids/shims/v2/Spark33XFileOptionsShims.scala +++ b/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/json/rapids/shims/v2/Spark33XFileOptionsShims.scala @@ -23,14 +23,6 @@ import org.apache.spark.sql.catalyst.json.JSONOptions trait Spark33XFileOptionsShims extends Spark321PlusShims { - def dateFormatInRead(fileOptions: Serializable): Option[String] = { - fileOptions match { - case csvOpts: CSVOptions => csvOpts.dateFormatInRead - case jsonOpts: JSONOptions => jsonOpts.dateFormatInRead - case _ => throw new RuntimeException("Wrong file options.") - } - } - def timestampFormatInRead(fileOptions: Serializable): Option[String] = { fileOptions match { case csvOpts: CSVOptions => csvOpts.dateFormatInRead diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala index 9471c4b8dee..5845c975838 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala @@ -218,9 +218,8 @@ object GpuCSVScan { // so we fall back to CPU meta.willNotWorkOnGpu(s"GpuCSVScan does not support timeParserPolicy=LEGACY") } - ShimLoader.getSparkShims.dateFormatInRead(parsedOptions).foreach { dateFormat => - DateUtils.tagAndGetCudfFormat(meta, dateFormat, parseString = true) - } + DateUtils.tagAndGetCudfFormat(meta, + GpuCsvUtils.dateFormatInRead(parsedOptions), parseString = true) } if (readSchema.map(_.dataType).contains(TimestampType)) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala index 8b29932e014..eba6798b38b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala @@ -303,9 +303,6 @@ trait SparkShims { */ def getLegacyStatisticalAggregate(): Boolean - - def dateFormatInRead(fileOptions: Serializable): Option[String] - def timestampFormatInRead(fileOptions: Serializable): Option[String] def neverReplaceShowCurrentNamespaceCommand: ExecRule[_ <: SparkPlan] diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala index 64810bf9551..a6ebdb40ee1 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala @@ -147,9 +147,8 @@ object GpuJsonScan { }) if (readSchema.map(_.dataType).contains(DateType)) { - ShimLoader.getSparkShims.dateFormatInRead(parsedOptions).foreach { dateFormat => - DateUtils.tagAndGetCudfFormat(meta, dateFormat, parseString = true) - } + DateUtils.tagAndGetCudfFormat(meta, + GpuJsonUtils.dateFormatInRead(parsedOptions), parseString = true) } if (readSchema.map(_.dataType).contains(TimestampType)) {