From 2589976d2e3888bfe94a3561789647400e988a8e Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Tue, 17 Sep 2024 16:46:07 -0500 Subject: [PATCH] Use improved CUDF JSON validation (#11464) Signed-off-by: Robert (Bobby) Evans --- .../src/main/python/json_matrix_test.py | 94 ++++++++++---- .../src/main/python/json_test.py | 49 +++++++ .../src/test/resources/bad_whitespace.json | 10 ++ .../src/test/resources/float_formatted.json | 3 - .../resources/invalid_ridealong_columns.json | 1 - .../src/test/resources/scan_emtpy_lines.json | 23 ++++ .../src/test/resources/sci_formatted.json | 1 - .../rapids/GpuTextBasedPartitionReader.scala | 60 ++++++--- .../catalyst/json/rapids/GpuJsonScan.scala | 7 +- .../spark/sql/rapids/GpuJsonReadCommon.scala | 121 +++++++----------- .../spark/sql/rapids/GpuJsonToStructs.scala | 2 +- .../spark/rapids/JsonScanRetrySuite.scala | 2 +- 12 files changed, 242 insertions(+), 131 deletions(-) create mode 100644 integration_tests/src/test/resources/bad_whitespace.json create mode 100644 integration_tests/src/test/resources/scan_emtpy_lines.json diff --git a/integration_tests/src/main/python/json_matrix_test.py b/integration_tests/src/main/python/json_matrix_test.py index c9dec8afac9..361854586c3 100644 --- a/integration_tests/src/main/python/json_matrix_test.py +++ b/integration_tests/src/main/python/json_matrix_test.py @@ -406,7 +406,6 @@ def test_json_tuple_allow_backslash_escape_any_off(std_input_path): # Off is the default for scan so it really needs to work @pytest.mark.parametrize('read_func', [read_json_df, read_json_sql]) -@pytest.mark.xfail(reason = 'https://github.com/NVIDIA/spark-rapids/issues/10457') def test_scan_json_allow_unquoted_control_chars_off(std_input_path, read_func, spark_tmp_table_factory): assert_gpu_and_cpu_are_equal_collect( read_func(std_input_path + '/' + WITH_UNQUOTED_CONTROL_FILE, @@ -417,7 +416,6 @@ def test_scan_json_allow_unquoted_control_chars_off(std_input_path, read_func, s # Off is the default for from_json so it really needs to work @allow_non_gpu(TEXT_INPUT_EXEC, *non_utc_allow) # https://github.com/NVIDIA/spark-rapids/issues/10453 -@pytest.mark.xfail(reason = 'https://github.com/NVIDIA/spark-rapids/issues/10457') def test_from_json_allow_unquoted_control_chars_off(std_input_path): schema = WITH_UNQUOTED_CONTROL_SCHEMA assert_gpu_and_cpu_are_equal_collect( @@ -583,6 +581,7 @@ def test_json_tuple_dec_locale_non_aribic(std_input_path): "int_array_formatted.json", "int_struct_formatted.json", "int_mixed_array_struct_formatted.json", + pytest.param("bad_whitespace.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15280')), "escaped_strings.json", "nested_escaped_strings.json", pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/11361')), @@ -590,7 +589,10 @@ def test_json_tuple_dec_locale_non_aribic(std_input_path): "timestamp_formatted_strings.json", "timestamp_tz_formatted_strings.json"] -@pytest.mark.parametrize('input_file', COMMON_TEST_FILES) +COMMON_SCAN_TEST_FILES = COMMON_TEST_FILES + [ + "scan_emtpy_lines.json"] + +@pytest.mark.parametrize('input_file', COMMON_SCAN_TEST_FILES) @pytest.mark.parametrize('read_func', [read_json_df]) # we have done so many tests already that we don't need both read func. They are the same def test_scan_json_bytes(std_input_path, read_func, spark_tmp_table_factory, input_file): assert_gpu_and_cpu_are_equal_collect( @@ -607,7 +609,7 @@ def test_from_json_bytes(std_input_path, input_file): lambda spark : read_json_as_text(spark, std_input_path + '/' + input_file, "json").select(f.col('json'), f.from_json(f.col('json'), schema)), conf =_enable_json_to_structs_conf) -@pytest.mark.parametrize('input_file', COMMON_TEST_FILES) +@pytest.mark.parametrize('input_file', COMMON_SCAN_TEST_FILES) @pytest.mark.parametrize('read_func', [read_json_df]) # we have done so many tests already that we don't need both read func. They are the same def test_scan_json_shorts(std_input_path, read_func, spark_tmp_table_factory, input_file): assert_gpu_and_cpu_are_equal_collect( @@ -624,7 +626,7 @@ def test_from_json_shorts(std_input_path, input_file): lambda spark : read_json_as_text(spark, std_input_path + '/' + input_file, "json").select(f.col('json'), f.from_json(f.col('json'), schema)), conf =_enable_json_to_structs_conf) -@pytest.mark.parametrize('input_file', COMMON_TEST_FILES) +@pytest.mark.parametrize('input_file', COMMON_SCAN_TEST_FILES) @pytest.mark.parametrize('read_func', [read_json_df]) # we have done so many tests already that we don't need both read func. They are the same def test_scan_json_ints(std_input_path, read_func, spark_tmp_table_factory, input_file): assert_gpu_and_cpu_are_equal_collect( @@ -641,7 +643,7 @@ def test_from_json_ints(std_input_path, input_file): lambda spark : read_json_as_text(spark, std_input_path + '/' + input_file, "json").select(f.col('json'), f.from_json(f.col('json'), schema)), conf =_enable_json_to_structs_conf) -@pytest.mark.parametrize('input_file', COMMON_TEST_FILES) +@pytest.mark.parametrize('input_file', COMMON_SCAN_TEST_FILES) @pytest.mark.parametrize('read_func', [read_json_df]) # we have done so many tests already that we don't need both read func. They are the same def test_scan_json_longs(std_input_path, read_func, spark_tmp_table_factory, input_file): assert_gpu_and_cpu_are_equal_collect( @@ -673,12 +675,14 @@ def test_from_json_longs(std_input_path, input_file): "int_array_formatted.json", "int_struct_formatted.json", "int_mixed_array_struct_formatted.json", + pytest.param("bad_whitespace.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15280')), "escaped_strings.json", "nested_escaped_strings.json", pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/11361')), "mixed_objects.json", "timestamp_formatted_strings.json", - "timestamp_tz_formatted_strings.json"]) + "timestamp_tz_formatted_strings.json", + "scan_emtpy_lines.json"]) @pytest.mark.parametrize('read_func', [read_json_df]) # we have done so many tests already that we don't need both read func. They are the same def test_scan_json_decs(std_input_path, read_func, spark_tmp_table_factory, input_file, dt): assert_gpu_and_cpu_are_equal_collect( @@ -702,6 +706,7 @@ def test_scan_json_decs(std_input_path, read_func, spark_tmp_table_factory, inpu "int_array_formatted.json", "int_struct_formatted.json", "int_mixed_array_struct_formatted.json", + pytest.param("bad_whitespace.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15280')), "escaped_strings.json", "nested_escaped_strings.json", pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/11361')), @@ -725,17 +730,19 @@ def test_from_json_decs(std_input_path, input_file, dt): "sci_formatted_strings.json", "decimal_locale_formatted_strings.json", pytest.param("single_quoted_strings.json", marks=pytest.mark.xfail(condition=is_before_spark_330(), reason='https://github.com/NVIDIA/spark-rapids/issues/10495')), - pytest.param("boolean_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10479')), - pytest.param("invalid_ridealong_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10534')), + "boolean_formatted.json", + "invalid_ridealong_columns.json", pytest.param("int_array_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15318')), "int_struct_formatted.json", pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(condition=is_spark_400_or_later(), reason='https://github.com/NVIDIA/spark-rapids/issues/11154')), + pytest.param("bad_whitespace.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15280')), "escaped_strings.json", pytest.param("nested_escaped_strings.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10534')), pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/11361')), "mixed_objects.json", "timestamp_formatted_strings.json", - "timestamp_tz_formatted_strings.json"]) + "timestamp_tz_formatted_strings.json", + "scan_emtpy_lines.json"]) @pytest.mark.parametrize('read_func', [read_json_df]) def test_scan_json_strings(std_input_path, read_func, spark_tmp_table_factory, input_file): assert_gpu_and_cpu_are_equal_collect( @@ -753,11 +760,12 @@ def test_scan_json_strings(std_input_path, read_func, spark_tmp_table_factory, i "sci_formatted_strings.json", "decimal_locale_formatted_strings.json", "single_quoted_strings.json", - pytest.param("boolean_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10479')), - pytest.param("invalid_ridealong_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10534')), + "boolean_formatted.json", + "invalid_ridealong_columns.json", pytest.param("int_array_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15318')), "int_struct_formatted.json", "int_mixed_array_struct_formatted.json", + pytest.param("bad_whitespace.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15280')), "escaped_strings.json", pytest.param("nested_escaped_strings.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10534')), pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/11361')), @@ -785,6 +793,7 @@ def test_from_json_strings(std_input_path, input_file): "int_array_formatted.json", "int_struct_formatted.json", "int_mixed_array_struct_formatted.json", + "bad_whitespace.json", pytest.param("escaped_strings.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/11386')), pytest.param("nested_escaped_strings.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/11387')), pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/11361')), @@ -813,6 +822,7 @@ def test_get_json_object_formats(std_input_path, input_file): "int_array_formatted.json", "int_struct_formatted.json", "int_mixed_array_struct_formatted.json", + "bad_whitespace.json", "escaped_strings.json", "nested_escaped_strings.json", pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/11361')), @@ -851,6 +861,7 @@ def test_get_json_object_child_formats(std_input_path, input_file): "int_array_formatted.json", "int_struct_formatted.json", "int_mixed_array_struct_formatted.json", + "bad_whitespace.json", pytest.param("escaped_strings.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/11386')), pytest.param("nested_escaped_strings.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/11387')), pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/11361')), @@ -867,7 +878,7 @@ def test_json_tuple_formats(std_input_path, input_file): '''json_tuple(json, 'user.profile.username', 'user.skills[0]', 'user.projects[1].name') AS (username, first_skill, second_project_name)'''), conf =_enable_json_tuple_conf) -@pytest.mark.parametrize('input_file', COMMON_TEST_FILES) +@pytest.mark.parametrize('input_file', COMMON_SCAN_TEST_FILES) @pytest.mark.parametrize('read_func', [read_json_df]) def test_scan_json_bools(std_input_path, read_func, spark_tmp_table_factory, input_file): assert_gpu_and_cpu_are_equal_collect( @@ -888,7 +899,7 @@ def test_from_json_bools(std_input_path, input_file): @pytest.mark.parametrize('input_file', [ "int_formatted.json", pytest.param("float_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10481')), - pytest.param("sci_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15280')), + "sci_formatted.json", "int_formatted_strings.json", pytest.param("float_formatted_strings.json", marks=pytest.mark.xfail(condition=is_before_spark_330(), reason='https://issues.apache.org/jira/browse/SPARK-38060')), "sci_formatted_strings.json", @@ -898,12 +909,14 @@ def test_from_json_bools(std_input_path, input_file): "int_array_formatted.json", "int_struct_formatted.json", "int_mixed_array_struct_formatted.json", + pytest.param("bad_whitespace.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15280')), "escaped_strings.json", "nested_escaped_strings.json", pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/11361')), "mixed_objects.json", "timestamp_formatted_strings.json", - "timestamp_tz_formatted_strings.json"]) + "timestamp_tz_formatted_strings.json", + "scan_emtpy_lines.json"]) @pytest.mark.parametrize('read_func', [read_json_df]) def test_scan_json_floats(std_input_path, read_func, spark_tmp_table_factory, input_file): assert_gpu_and_cpu_are_equal_collect( @@ -916,7 +929,7 @@ def test_scan_json_floats(std_input_path, read_func, spark_tmp_table_factory, in @pytest.mark.parametrize('input_file', [ "int_formatted.json", pytest.param("float_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10481')), - pytest.param("sci_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15280')), + "sci_formatted.json", "int_formatted_strings.json", pytest.param("float_formatted_strings.json", marks=pytest.mark.xfail(condition=is_before_spark_330(), reason='https://issues.apache.org/jira/browse/SPARK-38060')), "sci_formatted_strings.json", @@ -926,6 +939,7 @@ def test_scan_json_floats(std_input_path, read_func, spark_tmp_table_factory, in "int_array_formatted.json", "int_struct_formatted.json", "int_mixed_array_struct_formatted.json", + pytest.param("bad_whitespace.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15280')), "escaped_strings.json", "nested_escaped_strings.json", pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/11361')), @@ -943,7 +957,7 @@ def test_from_json_floats(std_input_path, input_file): @pytest.mark.parametrize('input_file', [ "int_formatted.json", pytest.param("float_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10481')), - pytest.param("sci_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15280')), + "sci_formatted.json", "int_formatted_strings.json", pytest.param("float_formatted_strings.json", marks=pytest.mark.xfail(condition=is_before_spark_330(), reason='https://issues.apache.org/jira/browse/SPARK-38060')), "sci_formatted_strings.json", @@ -953,12 +967,14 @@ def test_from_json_floats(std_input_path, input_file): "int_array_formatted.json", "int_struct_formatted.json", "int_mixed_array_struct_formatted.json", + pytest.param("bad_whitespace.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15280')), "escaped_strings.json", "nested_escaped_strings.json", pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/11361')), "mixed_objects.json", "timestamp_formatted_strings.json", - "timestamp_tz_formatted_strings.json"]) + "timestamp_tz_formatted_strings.json", + "scan_emtpy_lines.json"]) @pytest.mark.parametrize('read_func', [read_json_df]) def test_scan_json_doubles(std_input_path, read_func, spark_tmp_table_factory, input_file): assert_gpu_and_cpu_are_equal_collect( @@ -971,7 +987,7 @@ def test_scan_json_doubles(std_input_path, read_func, spark_tmp_table_factory, i @pytest.mark.parametrize('input_file', [ "int_formatted.json", pytest.param("float_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10481')), - pytest.param("sci_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15280')), + "sci_formatted.json", "int_formatted_strings.json", pytest.param("float_formatted_strings.json", marks=pytest.mark.xfail(condition=is_before_spark_330(), reason='https://issues.apache.org/jira/browse/SPARK-38060')), "sci_formatted_strings.json", @@ -981,6 +997,7 @@ def test_scan_json_doubles(std_input_path, read_func, spark_tmp_table_factory, i "int_array_formatted.json", "int_struct_formatted.json", "int_mixed_array_struct_formatted.json", + pytest.param("bad_whitespace.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15280')), "escaped_strings.json", "nested_escaped_strings.json", pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/11361')), @@ -1007,12 +1024,14 @@ def test_from_json_doubles(std_input_path, input_file): "int_array_formatted.json", "int_struct_formatted.json", "int_mixed_array_struct_formatted.json", + "bad_whitespace.json", pytest.param("escaped_strings.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9664')), "nested_escaped_strings.json", pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/11361')), "mixed_objects.json", pytest.param("timestamp_formatted_strings.json", marks=pytest.mark.xfail(condition=is_before_spark_330(), reason='https://github.com/NVIDIA/spark-rapids/issues/11391')), - pytest.param("timestamp_tz_formatted_strings.json", marks=pytest.mark.xfail(condition=is_before_spark_330(), reason='https://github.com/NVIDIA/spark-rapids/issues/11391'))]) + pytest.param("timestamp_tz_formatted_strings.json", marks=pytest.mark.xfail(condition=is_before_spark_330(), reason='https://github.com/NVIDIA/spark-rapids/issues/11391')), + "scan_emtpy_lines.json"]) @pytest.mark.parametrize('read_func', [read_json_df]) @allow_non_gpu(*non_utc_allow) # https://github.com/NVIDIA/spark-rapids/issues/10453 def test_scan_json_corrected_dates(std_input_path, read_func, spark_tmp_table_factory, input_file): @@ -1036,6 +1055,7 @@ def test_scan_json_corrected_dates(std_input_path, read_func, spark_tmp_table_fa "int_array_formatted.json", "int_struct_formatted.json", "int_mixed_array_struct_formatted.json", + "bad_whitespace.json", pytest.param("escaped_strings.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9664')), "nested_escaped_strings.json", pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/11361')), @@ -1063,12 +1083,14 @@ def test_from_json_corrected_dates(std_input_path, input_file): "int_array_formatted.json", "int_struct_formatted.json", "int_mixed_array_struct_formatted.json", + "bad_whitespace.json", "escaped_strings.json", "nested_escaped_strings.json", pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/11361')), "mixed_objects.json", "timestamp_formatted_strings.json", - pytest.param("timestamp_tz_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/6846'))]) + pytest.param("timestamp_tz_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/6846')), + "scan_emtpy_lines.json"]) @pytest.mark.parametrize('read_func', [read_json_df]) @allow_non_gpu(*non_utc_allow) def test_scan_json_corrected_timestamps(std_input_path, read_func, spark_tmp_table_factory, input_file): @@ -1092,6 +1114,7 @@ def test_scan_json_corrected_timestamps(std_input_path, read_func, spark_tmp_tab "int_array_formatted.json", "int_struct_formatted.json", "int_mixed_array_struct_formatted.json", + "bad_whitespace.json", "escaped_strings.json", "nested_escaped_strings.json", pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/11361')), @@ -1119,12 +1142,14 @@ def test_from_json_corrected_timestamps(std_input_path, input_file): pytest.param("int_array_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10573')), "int_struct_formatted.json", pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("bad_whitespace.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("escaped_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), "nested_escaped_strings.json", pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), "mixed_objects.json", pytest.param("timestamp_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), - pytest.param("timestamp_tz_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260'))]) + pytest.param("timestamp_tz_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + "scan_emtpy_lines.json"]) @pytest.mark.parametrize('read_func', [read_json_df]) # we have done so many tests already that we don't need both read func. They are the same def test_scan_json_long_arrays(std_input_path, read_func, spark_tmp_table_factory, input_file): assert_gpu_and_cpu_are_equal_collect( @@ -1146,6 +1171,7 @@ def test_scan_json_long_arrays(std_input_path, read_func, spark_tmp_table_factor pytest.param("int_array_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10573')), "int_struct_formatted.json", pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("bad_whitespace.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("escaped_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), "nested_escaped_strings.json", pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), @@ -1172,12 +1198,14 @@ def test_from_json_long_arrays(std_input_path, input_file): pytest.param("int_array_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10574')), "int_struct_formatted.json", pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("bad_whitespace.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("escaped_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), "nested_escaped_strings.json", pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), "mixed_objects.json", pytest.param("timestamp_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), - pytest.param("timestamp_tz_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260'))]) + pytest.param("timestamp_tz_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + "scan_emtpy_lines.json"]) @pytest.mark.parametrize('read_func', [read_json_df]) # we have done so many tests already that we don't need both read func. They are the same def test_scan_json_string_arrays(std_input_path, read_func, spark_tmp_table_factory, input_file): assert_gpu_and_cpu_are_equal_collect( @@ -1199,6 +1227,7 @@ def test_scan_json_string_arrays(std_input_path, read_func, spark_tmp_table_fact pytest.param("int_array_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10574')), "int_struct_formatted.json", pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("bad_whitespace.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("escaped_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), "nested_escaped_strings.json", pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), @@ -1225,12 +1254,14 @@ def test_from_json_string_arrays(std_input_path, input_file): "int_array_formatted.json", pytest.param("int_struct_formatted.json", marks=pytest.mark.xfail(condition=is_before_spark_342(),reason='https://github.com/NVIDIA/spark-rapids/issues/10588')), pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("bad_whitespace.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("escaped_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("nested_escaped_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), "mixed_objects.json", pytest.param("timestamp_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), - pytest.param("timestamp_tz_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260'))]) + pytest.param("timestamp_tz_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + "scan_emtpy_lines.json"]) @pytest.mark.parametrize('read_func', [read_json_df]) # we have done so many tests already that we don't need both read func. They are the same def test_scan_json_long_structs(std_input_path, read_func, spark_tmp_table_factory, input_file): assert_gpu_and_cpu_are_equal_collect( @@ -1252,6 +1283,7 @@ def test_scan_json_long_structs(std_input_path, read_func, spark_tmp_table_facto "int_array_formatted.json", pytest.param("int_struct_formatted.json", marks=pytest.mark.xfail(condition=is_before_spark_342(),reason='https://github.com/NVIDIA/spark-rapids/issues/10588')), pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("bad_whitespace.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("escaped_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("nested_escaped_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), @@ -1278,12 +1310,14 @@ def test_from_json_long_structs(std_input_path, input_file): "int_array_formatted.json", "int_struct_formatted.json", pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("bad_whitespace.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("escaped_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("nested_escaped_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), "mixed_objects.json", pytest.param("timestamp_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), - pytest.param("timestamp_tz_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260'))]) + pytest.param("timestamp_tz_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + "scan_emtpy_lines.json"]) @pytest.mark.parametrize('read_func', [read_json_df]) # we have done so many tests already that we don't need both read func. They are the same def test_scan_json_string_structs(std_input_path, read_func, spark_tmp_table_factory, input_file): assert_gpu_and_cpu_are_equal_collect( @@ -1305,6 +1339,7 @@ def test_scan_json_string_structs(std_input_path, read_func, spark_tmp_table_fac "int_array_formatted.json", "int_struct_formatted.json", pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("bad_whitespace.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("escaped_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("nested_escaped_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), @@ -1332,11 +1367,13 @@ def test_from_json_string_structs(std_input_path, input_file): pytest.param("int_array_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10573')), # This does not fail on 38,0 "int_struct_formatted.json", pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("bad_whitespace.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("escaped_stringted_.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), "mixed_objects.json", pytest.param("timestamp_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), - pytest.param("timestamp_tz_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260'))]) + pytest.param("timestamp_tz_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + "scan_emtpy_lines.json"]) @pytest.mark.parametrize('read_func', [read_json_df]) # we have done so many tests already that we don't need both read func. They are the same def test_scan_json_dec_arrays(std_input_path, read_func, spark_tmp_table_factory, input_file, dt): assert_gpu_and_cpu_are_equal_collect( @@ -1359,6 +1396,7 @@ def test_scan_json_dec_arrays(std_input_path, read_func, spark_tmp_table_factory pytest.param("int_array_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10573')), # This does not fail on 38,0 "int_struct_formatted.json", pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("bad_whitespace.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("escaped_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), "nested_escaped_strings.json", pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), @@ -1385,12 +1423,14 @@ def test_from_json_dec_arrays(std_input_path, input_file, dt): "int_array_formatted.json", "int_struct_formatted.json", pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("bad_whitespace.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("escaped_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("nested_escaped_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("repeated_columns.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), "mixed_objects.json", pytest.param("timestamp_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), - pytest.param("timestamp_tz_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260'))]) + pytest.param("timestamp_tz_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + "scan_emtpy_lines.json"]) @pytest.mark.parametrize('read_func', [read_json_df]) # we have done so many tests already that we don't need both read func. They are the same def test_scan_json_mixed_struct(std_input_path, read_func, spark_tmp_table_factory, input_file): assert_gpu_and_cpu_are_equal_collect( diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index ca5eb135715..b87f1ceabf0 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -23,6 +23,8 @@ from marks import approximate_float, allow_non_gpu, ignore_order, datagen_overrides from spark_session import * +TEXT_INPUT_EXEC='FileSourceScanExec' + json_supported_gens = [ # Spark does not escape '\r' or '\n' even though it uses it to mark end of record # This would require multiLine reads to work correctly, so we avoid these chars @@ -350,6 +352,53 @@ def test_basic_json_read(std_input_path, filename, schema, read_func, allow_non_ options), conf=updated_conf) +@approximate_float +@pytest.mark.parametrize('filename', [ + 'boolean.json', + 'boolean_invalid.json', + 'ints.json', + pytest.param('ints_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4940')), # This fails for dates, as not all are invalid + 'nan_and_inf.json', + pytest.param('nan_and_inf_strings.json', marks=pytest.mark.skipif(is_before_spark_330(), reason='https://issues.apache.org/jira/browse/SPARK-38060 fixed in Spark 3.3.0')), + 'nan_and_inf_invalid.json', + 'floats.json', + 'floats_leading_zeros.json', + 'floats_invalid.json', + 'floats_edge_cases.json', + 'decimals.json', + 'dates.json', + 'dates_invalid.json', +]) +@pytest.mark.parametrize('schema', [_bool_schema, _byte_schema, _short_schema, _int_schema, _long_schema, \ + _float_schema, _double_schema, _decimal_10_2_schema, _decimal_10_3_schema, \ + _date_schema], ids=idfn) +@pytest.mark.parametrize('allow_non_numeric_numbers', ['true', 'false']) +@pytest.mark.parametrize('allow_numeric_leading_zeros', [ + 'true', + 'false' +]) +@pytest.mark.parametrize('ansi_enabled', ["true", "false"]) +@allow_non_gpu(TEXT_INPUT_EXEC, *not_utc_allow_for_test_json_scan) +@pytest.mark.parametrize('date_format', [None, 'yyyy-MM-dd']) +def test_basic_from_json(std_input_path, filename, schema, allow_non_numeric_numbers, \ + allow_numeric_leading_zeros, ansi_enabled, date_format): + updated_conf = copy_and_update(_enable_all_types_conf, + {'spark.sql.ansi.enabled': ansi_enabled, + 'spark.sql.legacy.timeParserPolicy': 'CORRECTED'}) + options = {"allowNonNumericNumbers": allow_non_numeric_numbers, + "allowNumericLeadingZeros": allow_numeric_leading_zeros, + } + + if date_format: + options['dateFormat'] = date_format + + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.read.text(std_input_path + '/' + filename). + selectExpr("value as json"). + select(f.col("json"), f.from_json(f.col("json"), schema, options)), + conf=updated_conf) + + @ignore_order @pytest.mark.parametrize('filename', [ 'malformed1.ndjson', diff --git a/integration_tests/src/test/resources/bad_whitespace.json b/integration_tests/src/test/resources/bad_whitespace.json new file mode 100644 index 00000000000..0f3edebe336 --- /dev/null +++ b/integration_tests/src/test/resources/bad_whitespace.json @@ -0,0 +1,10 @@ +{"data": 1 . 0} +{"data": - 1 . 0} +{"data": + 1 . 0} +{"data": 1 E 1} +{"data": n u l l} +{"data": t r u e} +{"data": f a l s e} +{"data": 1 0} +{"data": 1, "other": 1 0} +{"data": "BAD NUM 1 000", "ride-along-num": 1 000} diff --git a/integration_tests/src/test/resources/float_formatted.json b/integration_tests/src/test/resources/float_formatted.json index 8f305c3dbed..c0b3dacdce8 100644 --- a/integration_tests/src/test/resources/float_formatted.json +++ b/integration_tests/src/test/resources/float_formatted.json @@ -20,6 +20,3 @@ {"data": 0.9999} {"data": +1.0} {"data": -1.0} -{"data": 1 . 0} -{"data": - 1 . 0} -{"data": + 1 . 0} diff --git a/integration_tests/src/test/resources/invalid_ridealong_columns.json b/integration_tests/src/test/resources/invalid_ridealong_columns.json index e45013747d5..00092f2e436 100644 --- a/integration_tests/src/test/resources/invalid_ridealong_columns.json +++ b/integration_tests/src/test/resources/invalid_ridealong_columns.json @@ -14,7 +14,6 @@ {"data": "BAD NUM +1", "ride-along-num": +1} {"data": "BAD NUM 01", "ride-along-num": 01} {"data": "BAD NUM 00.1", "ride-along-num": 00.1} -{"data": "BAD NUM 1 000", "ride-along-num": 1 000} {"data": "BAD NUM 1,000", "ride-along-num": 1,000} {"data": "BAD NUM 1e", "ride-along-num": 1e} {"data": "BAD NUM 1ee2", "ride-along-num": 1ee2} diff --git a/integration_tests/src/test/resources/scan_emtpy_lines.json b/integration_tests/src/test/resources/scan_emtpy_lines.json new file mode 100644 index 00000000000..4845cf918b8 --- /dev/null +++ b/integration_tests/src/test/resources/scan_emtpy_lines.json @@ -0,0 +1,23 @@ + + + + +{"BAD"} + + + + +{"BAD"} + + + + +{"BAD"} + + + + + + + + diff --git a/integration_tests/src/test/resources/sci_formatted.json b/integration_tests/src/test/resources/sci_formatted.json index 2cc39c84308..d42056d8914 100644 --- a/integration_tests/src/test/resources/sci_formatted.json +++ b/integration_tests/src/test/resources/sci_formatted.json @@ -13,4 +13,3 @@ {"data": 1E-1} {"data": 1E+1} {"data": 1e1} -{"data": 1 E 1} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala index c12b8c4d5d6..344a1ae21fd 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala @@ -50,10 +50,25 @@ trait LineBufferer extends AutoCloseable { */ def getLength: Long + /** + * Get the numnber of lines currently added to this that were not filtered out. + */ + def getNumLines: Int + /** * Add a new line of bytes to the data to process. */ def add(line: Array[Byte], offset: Int, len: Int): Unit + + def isWhiteSpace(b: Byte): Boolean = { + b == ' ' || b == '\t' || b == '\r' || b == '\n' + } + + def isEmpty(line: Array[Byte], lineOffset: Int, lineLen: Int): Boolean = { + (0 until lineLen).forall { idx => + isWhiteSpace(line(lineOffset + idx)) + } + } } /** @@ -64,18 +79,27 @@ trait LineBuffererFactory[BUFF <: LineBufferer] { } object HostLineBuffererFactory extends LineBuffererFactory[HostLineBufferer] { + override def createBufferer(estimatedSize: Long, + lineSeparatorInRead: Array[Byte]): HostLineBufferer = + new HostLineBufferer(estimatedSize, lineSeparatorInRead, false) +} + +object FilterEmptyHostLineBuffererFactory extends LineBuffererFactory[HostLineBufferer] { override def createBufferer(estimatedSize: Long, lineSeparatorInRead: Array[Byte]): HostLineBufferer = - new HostLineBufferer(estimatedSize, lineSeparatorInRead) + new HostLineBufferer(estimatedSize, lineSeparatorInRead, true) } /** * Buffer the lines in a single HostMemoryBuffer with the separator inserted inbetween each of * the lines. */ -class HostLineBufferer(size: Long, separator: Array[Byte]) extends LineBufferer { +class HostLineBufferer(size: Long, + separator: Array[Byte], + filterEmpty: Boolean) extends LineBufferer { private var buffer = HostMemoryBuffer.allocate(size) private var location: Long = 0 + private var numLines: Int = 0 def grow(needed: Long): Unit = { val newSize = math.max(buffer.getLength * 2, needed) @@ -88,20 +112,21 @@ class HostLineBufferer(size: Long, separator: Array[Byte]) extends LineBufferer override def getLength: Long = location - override def add(line: Array[Byte], lineOffset: Int, lineLen: Int): Unit = { - val newTotal = location + lineLen + separator.length - if (newTotal > buffer.getLength) { - grow(newTotal) - } + override def getNumLines: Int = numLines - // Can have an empty line, do not write this to buffer but add the separator - // and totalRows - if (lineLen != 0) { + override def add(line: Array[Byte], lineOffset: Int, lineLen: Int): Unit = { + // Empty lines are filtered out + if (!filterEmpty || !isEmpty(line, lineOffset, lineLen)) { + numLines += 1 + val newTotal = location + lineLen + separator.length + if (newTotal > buffer.getLength) { + grow(newTotal) + } buffer.setBytes(location, line, lineOffset, lineLen) location = location + lineLen + buffer.setBytes(location, separator, 0, separator.length) + location = location + separator.length } - buffer.setBytes(location, separator, 0, separator.length) - location = location + separator.length } def getBufferAndRelease: HostMemoryBuffer = { @@ -139,10 +164,13 @@ class HostStringColBufferer(size: Long, separator: Array[Byte]) extends LineBuff override def getLength: Long = dataLocation + override def getNumLines: Int = numRows + override def add(line: Array[Byte], lineOffset: Int, lineLen: Int): Unit = { if (numRows + 1 > rowsAllocated) { val newRowsAllocated = math.min(rowsAllocated * 2, Int.MaxValue - 1) - val tmpBuffer = HostMemoryBuffer.allocate((newRowsAllocated + 1) * DType.INT32.getSizeInBytes) + val tmpBuffer = + HostMemoryBuffer.allocate((newRowsAllocated + 1) * DType.INT32.getSizeInBytes) tmpBuffer.copyFromHostBuffer(0, offsetsBuffer, 0, offsetsBuffer.getLength) offsetsBuffer.close() offsetsBuffer = tmpBuffer @@ -157,9 +185,7 @@ class HostStringColBufferer(size: Long, separator: Array[Byte]) extends LineBuff dataBuffer = newBuff } } - if (lineLen != 0) { - dataBuffer.setBytes(dataLocation, line, lineOffset, lineLen) - } + dataBuffer.setBytes(dataLocation, line, lineOffset, lineLen) offsetsBuffer.setInt(numRows * DType.INT32.getSizeInBytes, dataLocation.toInt) dataLocation += lineLen numRows += 1 @@ -372,7 +398,7 @@ abstract class GpuTextBasedPartitionReader[BUFF <: LineBufferer, FACT <: LineBuf && totalSize <= maxBytesPerChunk /* soft limit and returns at least one row */) { val line = lineReader.next() hmb.add(line.getBytes, 0, line.getLength) - totalRows += 1 + totalRows = hmb.getNumLines totalSize = hmb.getLength } //Indicate this is the last chunk diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala index 2024fb5891d..506b22a22ab 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala @@ -319,7 +319,7 @@ object JsonPartitionReader { withResource(new NvtxWithMetrics(formatName + " decode", NvtxColor.DARK_GREEN, decodeTime)) { _ => try { - Table.readJSON(cudfSchema, jsonOpts, dataBuffer, 0, dataSize) + Table.readJSON(cudfSchema, jsonOpts, dataBuffer, 0, dataSize, dataBufferer.getNumLines) } catch { case e: AssertionError if e.getMessage == "CudfColumns can't be null or empty" => // this happens when every row in a JSON file is invalid (or we are @@ -344,9 +344,10 @@ class JsonPartitionReader( maxRowsPerChunk: Integer, maxBytesPerChunk: Long, execMetrics: Map[String, GpuMetric]) - extends GpuTextBasedPartitionReader[HostLineBufferer, HostLineBuffererFactory.type](conf, + extends GpuTextBasedPartitionReader[HostLineBufferer, + FilterEmptyHostLineBuffererFactory.type](conf, partFile, dataSchema, readDataSchema, parsedOptions.lineSeparatorInRead, maxRowsPerChunk, - maxBytesPerChunk, execMetrics, HostLineBuffererFactory) { + maxBytesPerChunk, execMetrics, FilterEmptyHostLineBuffererFactory) { def buildJsonOptions(parsedOptions: JSONOptions): cudf.JSONOptions = GpuJsonReadCommon.cudfJsonOptions(parsedOptions) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonReadCommon.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonReadCommon.scala index 9acc9063750..8ac3feeee53 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonReadCommon.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonReadCommon.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.rapids import java.util.Locale -import ai.rapids.cudf.{BinaryOp, CaptureGroups, ColumnVector, ColumnView, DType, RegexProgram, Scalar, Schema, Table} +import ai.rapids.cudf.{BinaryOp, ColumnVector, ColumnView, DType, Scalar, Schema, Table} +import com.fasterxml.jackson.core.JsonParser import com.nvidia.spark.rapids.{ColumnCastUtil, GpuCast, GpuColumnVector, GpuScalar, GpuTextBasedPartitionReader} import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingArray @@ -109,7 +110,6 @@ object GpuJsonReadCommon { private lazy val specialUnquotedFloats = Seq("NaN", "+INF", "-INF", "+Infinity", "Infinity", "-Infinity") private lazy val specialQuotedFloats = specialUnquotedFloats.map(s => '"'+s+'"') - private lazy val allSpecialFloats = specialUnquotedFloats ++ specialQuotedFloats /** * JSON has strict rules about valid numeric formats. See https://www.json.org/ for specification. @@ -120,64 +120,41 @@ object GpuJsonReadCommon { private def sanitizeFloats(input: ColumnView, options: JSONOptions): ColumnVector = { // Note that this is not 100% consistent with Spark versions prior to Spark 3.3.0 // due to https://issues.apache.org/jira/browse/SPARK-38060 - // cuDF `isFloat` supports some inputs that are not valid JSON numbers, such as `.1`, `1.`, - // and `+1` so we use a regular expression to match valid JSON numbers instead - // TODO The majority of this validation needs to move to CUDF so that we can invalidate - // an entire line/row instead of a single field. - // https://github.com/NVIDIA/spark-rapids/issues/10534 - val jsonNumberRegexp = if (options.allowNumericLeadingZeros) { - "^-?[0-9]+(?:\\.[0-9]+)?(?:[eE][\\-\\+]?[0-9]+)?$" - } else { - "^-?(?:(?:[1-9][0-9]*)|0)(?:\\.[0-9]+)?(?:[eE][\\-\\+]?[0-9]+)?$" - } - val prog = new RegexProgram(jsonNumberRegexp, CaptureGroups.NON_CAPTURE) - val isValid = if (options.allowNonNumericNumbers) { - withResource(ColumnVector.fromStrings(allSpecialFloats: _*)) { nonNumeric => - withResource(input.matchesRe(prog)) { isJsonNumber => - withResource(input.contains(nonNumeric)) { nonNumeric => - isJsonNumber.or(nonNumeric) - } + if (options.allowNonNumericNumbers) { + // Need to normalize the quotes to non-quoted to parse properly + withResource(ColumnVector.fromStrings(specialQuotedFloats: _*)) { quoted => + withResource(ColumnVector.fromStrings(specialUnquotedFloats: _*)) { unquoted => + input.findAndReplaceAll(quoted, unquoted) } } } else { - input.matchesRe(prog) - } - val cleaned = withResource(isValid) { _ => - withResource(Scalar.fromNull(DType.STRING)) { nullString => - isValid.ifElse(input, nullString) - } + input.copyToColumnVector() } + } + + private def sanitizeInts(input: ColumnView): ColumnVector = { + // Integer numbers cannot look like a float, so no `.` or e The rest of the parsing should + // handle this correctly. The rest of the validation is in CUDF itself - withResource(cleaned) { _ => - if (options.allowNonNumericNumbers) { - // Need to normalize the quotes to non-quoted to parse properly - withResource(ColumnVector.fromStrings(specialQuotedFloats: _*)) { quoted => - withResource(ColumnVector.fromStrings(specialUnquotedFloats: _*)) { unquoted => - cleaned.findAndReplaceAll(quoted, unquoted) + val tmp = withResource(Scalar.fromString(".")) { dot => + withResource(input.stringContains(dot)) { hasDot => + withResource(Scalar.fromString("e")) { e => + withResource(input.stringContains(e)) { hase => + hasDot.or(hase) } } - } else { - cleaned.incRefCount() } } - } - - private def sanitizeInts(input: ColumnView, options: JSONOptions): ColumnVector = { - // Integer numbers cannot look like a float, so no `.` The rest of the parsing should - // handle this correctly. - // TODO The majority of this validation needs to move to CUDF so that we can invalidate - // an entire line/row instead of a single field. - // https://github.com/NVIDIA/spark-rapids/issues/10534 - val jsonNumberRegexp = if (options.allowNumericLeadingZeros) { - "^-?[0-9]+$" - } else { - "^-?(?:(?:[1-9][0-9]*)|0)$" + val invalid = withResource(tmp) { _ => + withResource(Scalar.fromString("E")) { E => + withResource(input.stringContains(E)) { hasE => + tmp.or(hasE) + } + } } - - val prog = new RegexProgram(jsonNumberRegexp, CaptureGroups.NON_CAPTURE) - withResource(input.matchesRe(prog)) { isValid => + withResource(invalid) { _ => withResource(Scalar.fromNull(DType.STRING)) { nullString => - isValid.ifElse(input, nullString) + invalid.ifElse(nullString, input) } } } @@ -194,32 +171,11 @@ object GpuJsonReadCommon { } } - private def sanitizeUnquotedDecimal(input: ColumnView, options: JSONOptions): ColumnVector = { - // For unquoted decimal values the number has to look like it is floating point before it is - // parsed, so this follows that, but without the special cases for INF/NaN - // TODO The majority of this validation needs to move to CUDF so that we can invalidate - // an entire line/row instead of a single field. - // https://github.com/NVIDIA/spark-rapids/issues/10534 - val jsonNumberRegexp = if (options.allowNumericLeadingZeros) { - "^-?[0-9]+(?:\\.[0-9]+)?(?:[eE][\\-\\+]?[0-9]+)?$" - } else { - "^-?(?:(?:[1-9][0-9]*)|0)(?:\\.[0-9]+)?(?:[eE][\\-\\+]?[0-9]+)?$" - } - val prog = new RegexProgram(jsonNumberRegexp, CaptureGroups.NON_CAPTURE) - withResource(input.matchesRe(prog)) { isValid => - withResource(Scalar.fromNull(DType.STRING)) { nullString => - isValid.ifElse(input, nullString) - } - } - } - private def sanitizeDecimal(input: ColumnView, options: JSONOptions): ColumnVector = { assert(options.locale == Locale.US) withResource(isQuotedString(input)) { isQuoted => - withResource(sanitizeUnquotedDecimal(input, options)) { unquoted => - withResource(sanitizeQuotedDecimalInUSLocale(input)) { quoted => - isQuoted.ifElse(quoted, unquoted) - } + withResource(sanitizeQuotedDecimalInUSLocale(input)) { quoted => + isQuoted.ifElse(quoted, input) } } } @@ -231,13 +187,13 @@ object GpuJsonReadCommon { } } - private def castStringToDecimal(input: ColumnVector, dt: DecimalType): ColumnVector = + private def castStringToDecimal(input: ColumnVector, dt: DecimalType): ColumnVector = { + // TODO there is a bug here around 0 https://github.com/NVIDIA/spark-rapids/issues/10898 CastStrings.toDecimal(input, false, false, dt.precision, -dt.scale) + } private def castJsonStringToBool(input: ColumnView): ColumnVector = { - // TODO This validation needs to move to CUDF so that we can invalidate - // an entire line/row instead of a single field. - // https://github.com/NVIDIA/spark-rapids/issues/10534 + // Sadly there is no good kernel right now to do just this check/conversion val isTrue = withResource(Scalar.fromString("true")) { trueStr => input.equalTo(trueStr) } @@ -336,7 +292,7 @@ object GpuJsonReadCommon { case (cv, Some(dt)) if (dt == ByteType || dt == ShortType || dt == IntegerType || dt == LongType ) && cv.getType == DType.STRING => - withResource(sanitizeInts(cv, options)) { tmp => + withResource(sanitizeInts(cv)) { tmp => CastStrings.toInteger(tmp, false, GpuColumnVector.getNonNestedRapidsType(dt)) } case (cv, Some(dt)) if cv.getType == DType.STRING => @@ -363,12 +319,23 @@ object GpuJsonReadCommon { } def cudfJsonOptions(options: JSONOptions): ai.rapids.cudf.JSONOptions = { + // This is really ugly, but options.allowUnquotedControlChars is marked as private + // and this is the only way I know to get it without even uglier tricks + @scala.annotation.nowarn("msg=Java enum ALLOW_UNQUOTED_CONTROL_CHARS in " + + "Java enum Feature is deprecated") + val allowUnquotedControlChars = + options.buildJsonFactory() + .isEnabled(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS) ai.rapids.cudf.JSONOptions.builder() .withRecoverWithNull(true) .withMixedTypesAsStrings(true) .withNormalizeWhitespace(true) .withKeepQuotes(true) .withNormalizeSingleQuotes(options.allowSingleQuotes) + .withStrictValidation(true) + .withLeadingZeros(options.allowNumericLeadingZeros) + .withNonNumericNumbers(options.allowNonNumericNumbers) + .withUnquotedControlChars(allowUnquotedControlChars) .build() } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala index 7b49a8f3351..e60aefb8d59 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala @@ -183,7 +183,7 @@ case class GpuJsonToStructs( val table = withResource(new JsonDeviceDataSource(combined)) { ds => // Step 4: Have cudf parse the JSON data try { - cudf.Table.readJSON(cudfSchema, jsonOptions, ds) + cudf.Table.readJSON(cudfSchema, jsonOptions, ds, numRows) } catch { case e : RuntimeException => throw new JsonParsingException("Currently some Json to Struct cases " + diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/JsonScanRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/JsonScanRetrySuite.scala index 1db21ca4f58..47546f25513 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/JsonScanRetrySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/JsonScanRetrySuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.types._ class JsonScanRetrySuite extends RmmSparkRetrySuiteBase { test("test simple retry") { - val bufferer = HostLineBuffererFactory.createBufferer(100, Array('\n'.toByte)) + val bufferer = FilterEmptyHostLineBuffererFactory.createBufferer(100, Array('\n'.toByte)) bufferer.add("{\"a\": 1, \"b\": 2".getBytes, 0, 14) val cudfSchema = GpuColumnVector.from(StructType(Seq(StructField("a", IntegerType),