Skip to content

Commit

Permalink
Use improved CUDF JSON validation (#11464)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Sep 17, 2024
1 parent 00cd422 commit 2589976
Show file tree
Hide file tree
Showing 12 changed files with 242 additions and 131 deletions.
94 changes: 67 additions & 27 deletions integration_tests/src/main/python/json_matrix_test.py

Large diffs are not rendered by default.

49 changes: 49 additions & 0 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from marks import approximate_float, allow_non_gpu, ignore_order, datagen_overrides
from spark_session import *

TEXT_INPUT_EXEC='FileSourceScanExec'

json_supported_gens = [
# Spark does not escape '\r' or '\n' even though it uses it to mark end of record
# This would require multiLine reads to work correctly, so we avoid these chars
Expand Down Expand Up @@ -350,6 +352,53 @@ def test_basic_json_read(std_input_path, filename, schema, read_func, allow_non_
options),
conf=updated_conf)

@approximate_float
@pytest.mark.parametrize('filename', [
'boolean.json',
'boolean_invalid.json',
'ints.json',
pytest.param('ints_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4940')), # This fails for dates, as not all are invalid
'nan_and_inf.json',
pytest.param('nan_and_inf_strings.json', marks=pytest.mark.skipif(is_before_spark_330(), reason='https://issues.apache.org/jira/browse/SPARK-38060 fixed in Spark 3.3.0')),
'nan_and_inf_invalid.json',
'floats.json',
'floats_leading_zeros.json',
'floats_invalid.json',
'floats_edge_cases.json',
'decimals.json',
'dates.json',
'dates_invalid.json',
])
@pytest.mark.parametrize('schema', [_bool_schema, _byte_schema, _short_schema, _int_schema, _long_schema, \
_float_schema, _double_schema, _decimal_10_2_schema, _decimal_10_3_schema, \
_date_schema], ids=idfn)
@pytest.mark.parametrize('allow_non_numeric_numbers', ['true', 'false'])
@pytest.mark.parametrize('allow_numeric_leading_zeros', [
'true',
'false'
])
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
@allow_non_gpu(TEXT_INPUT_EXEC, *not_utc_allow_for_test_json_scan)
@pytest.mark.parametrize('date_format', [None, 'yyyy-MM-dd'])
def test_basic_from_json(std_input_path, filename, schema, allow_non_numeric_numbers, \
allow_numeric_leading_zeros, ansi_enabled, date_format):
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.ansi.enabled': ansi_enabled,
'spark.sql.legacy.timeParserPolicy': 'CORRECTED'})
options = {"allowNonNumericNumbers": allow_non_numeric_numbers,
"allowNumericLeadingZeros": allow_numeric_leading_zeros,
}

if date_format:
options['dateFormat'] = date_format

assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.text(std_input_path + '/' + filename).
selectExpr("value as json").
select(f.col("json"), f.from_json(f.col("json"), schema, options)),
conf=updated_conf)


@ignore_order
@pytest.mark.parametrize('filename', [
'malformed1.ndjson',
Expand Down
10 changes: 10 additions & 0 deletions integration_tests/src/test/resources/bad_whitespace.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"data": 1 . 0}
{"data": - 1 . 0}
{"data": + 1 . 0}
{"data": 1 E 1}
{"data": n u l l}
{"data": t r u e}
{"data": f a l s e}
{"data": 1 0}
{"data": 1, "other": 1 0}
{"data": "BAD NUM 1 000", "ride-along-num": 1 000}
3 changes: 0 additions & 3 deletions integration_tests/src/test/resources/float_formatted.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,3 @@
{"data": 0.9999}
{"data": +1.0}
{"data": -1.0}
{"data": 1 . 0}
{"data": - 1 . 0}
{"data": + 1 . 0}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
{"data": "BAD NUM +1", "ride-along-num": +1}
{"data": "BAD NUM 01", "ride-along-num": 01}
{"data": "BAD NUM 00.1", "ride-along-num": 00.1}
{"data": "BAD NUM 1 000", "ride-along-num": 1 000}
{"data": "BAD NUM 1,000", "ride-along-num": 1,000}
{"data": "BAD NUM 1e", "ride-along-num": 1e}
{"data": "BAD NUM 1ee2", "ride-along-num": 1ee2}
Expand Down
23 changes: 23 additions & 0 deletions integration_tests/src/test/resources/scan_emtpy_lines.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@




{"BAD"}




{"BAD"}




{"BAD"}








1 change: 0 additions & 1 deletion integration_tests/src/test/resources/sci_formatted.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@
{"data": 1E-1}
{"data": 1E+1}
{"data": 1e1}
{"data": 1 E 1}
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,25 @@ trait LineBufferer extends AutoCloseable {
*/
def getLength: Long

/**
* Get the numnber of lines currently added to this that were not filtered out.
*/
def getNumLines: Int

/**
* Add a new line of bytes to the data to process.
*/
def add(line: Array[Byte], offset: Int, len: Int): Unit

def isWhiteSpace(b: Byte): Boolean = {
b == ' ' || b == '\t' || b == '\r' || b == '\n'
}

def isEmpty(line: Array[Byte], lineOffset: Int, lineLen: Int): Boolean = {
(0 until lineLen).forall { idx =>
isWhiteSpace(line(lineOffset + idx))
}
}
}

/**
Expand All @@ -64,18 +79,27 @@ trait LineBuffererFactory[BUFF <: LineBufferer] {
}

object HostLineBuffererFactory extends LineBuffererFactory[HostLineBufferer] {
override def createBufferer(estimatedSize: Long,
lineSeparatorInRead: Array[Byte]): HostLineBufferer =
new HostLineBufferer(estimatedSize, lineSeparatorInRead, false)
}

object FilterEmptyHostLineBuffererFactory extends LineBuffererFactory[HostLineBufferer] {
override def createBufferer(estimatedSize: Long,
lineSeparatorInRead: Array[Byte]): HostLineBufferer =
new HostLineBufferer(estimatedSize, lineSeparatorInRead)
new HostLineBufferer(estimatedSize, lineSeparatorInRead, true)
}

/**
* Buffer the lines in a single HostMemoryBuffer with the separator inserted inbetween each of
* the lines.
*/
class HostLineBufferer(size: Long, separator: Array[Byte]) extends LineBufferer {
class HostLineBufferer(size: Long,
separator: Array[Byte],
filterEmpty: Boolean) extends LineBufferer {
private var buffer = HostMemoryBuffer.allocate(size)
private var location: Long = 0
private var numLines: Int = 0

def grow(needed: Long): Unit = {
val newSize = math.max(buffer.getLength * 2, needed)
Expand All @@ -88,20 +112,21 @@ class HostLineBufferer(size: Long, separator: Array[Byte]) extends LineBufferer

override def getLength: Long = location

override def add(line: Array[Byte], lineOffset: Int, lineLen: Int): Unit = {
val newTotal = location + lineLen + separator.length
if (newTotal > buffer.getLength) {
grow(newTotal)
}
override def getNumLines: Int = numLines

// Can have an empty line, do not write this to buffer but add the separator
// and totalRows
if (lineLen != 0) {
override def add(line: Array[Byte], lineOffset: Int, lineLen: Int): Unit = {
// Empty lines are filtered out
if (!filterEmpty || !isEmpty(line, lineOffset, lineLen)) {
numLines += 1
val newTotal = location + lineLen + separator.length
if (newTotal > buffer.getLength) {
grow(newTotal)
}
buffer.setBytes(location, line, lineOffset, lineLen)
location = location + lineLen
buffer.setBytes(location, separator, 0, separator.length)
location = location + separator.length
}
buffer.setBytes(location, separator, 0, separator.length)
location = location + separator.length
}

def getBufferAndRelease: HostMemoryBuffer = {
Expand Down Expand Up @@ -139,10 +164,13 @@ class HostStringColBufferer(size: Long, separator: Array[Byte]) extends LineBuff

override def getLength: Long = dataLocation

override def getNumLines: Int = numRows

override def add(line: Array[Byte], lineOffset: Int, lineLen: Int): Unit = {
if (numRows + 1 > rowsAllocated) {
val newRowsAllocated = math.min(rowsAllocated * 2, Int.MaxValue - 1)
val tmpBuffer = HostMemoryBuffer.allocate((newRowsAllocated + 1) * DType.INT32.getSizeInBytes)
val tmpBuffer =
HostMemoryBuffer.allocate((newRowsAllocated + 1) * DType.INT32.getSizeInBytes)
tmpBuffer.copyFromHostBuffer(0, offsetsBuffer, 0, offsetsBuffer.getLength)
offsetsBuffer.close()
offsetsBuffer = tmpBuffer
Expand All @@ -157,9 +185,7 @@ class HostStringColBufferer(size: Long, separator: Array[Byte]) extends LineBuff
dataBuffer = newBuff
}
}
if (lineLen != 0) {
dataBuffer.setBytes(dataLocation, line, lineOffset, lineLen)
}
dataBuffer.setBytes(dataLocation, line, lineOffset, lineLen)
offsetsBuffer.setInt(numRows * DType.INT32.getSizeInBytes, dataLocation.toInt)
dataLocation += lineLen
numRows += 1
Expand Down Expand Up @@ -372,7 +398,7 @@ abstract class GpuTextBasedPartitionReader[BUFF <: LineBufferer, FACT <: LineBuf
&& totalSize <= maxBytesPerChunk /* soft limit and returns at least one row */) {
val line = lineReader.next()
hmb.add(line.getBytes, 0, line.getLength)
totalRows += 1
totalRows = hmb.getNumLines
totalSize = hmb.getLength
}
//Indicate this is the last chunk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ object JsonPartitionReader {
withResource(new NvtxWithMetrics(formatName + " decode",
NvtxColor.DARK_GREEN, decodeTime)) { _ =>
try {
Table.readJSON(cudfSchema, jsonOpts, dataBuffer, 0, dataSize)
Table.readJSON(cudfSchema, jsonOpts, dataBuffer, 0, dataSize, dataBufferer.getNumLines)
} catch {
case e: AssertionError if e.getMessage == "CudfColumns can't be null or empty" =>
// this happens when every row in a JSON file is invalid (or we are
Expand All @@ -344,9 +344,10 @@ class JsonPartitionReader(
maxRowsPerChunk: Integer,
maxBytesPerChunk: Long,
execMetrics: Map[String, GpuMetric])
extends GpuTextBasedPartitionReader[HostLineBufferer, HostLineBuffererFactory.type](conf,
extends GpuTextBasedPartitionReader[HostLineBufferer,
FilterEmptyHostLineBuffererFactory.type](conf,
partFile, dataSchema, readDataSchema, parsedOptions.lineSeparatorInRead, maxRowsPerChunk,
maxBytesPerChunk, execMetrics, HostLineBuffererFactory) {
maxBytesPerChunk, execMetrics, FilterEmptyHostLineBuffererFactory) {

def buildJsonOptions(parsedOptions: JSONOptions): cudf.JSONOptions =
GpuJsonReadCommon.cudfJsonOptions(parsedOptions)
Expand Down
Loading

0 comments on commit 2589976

Please sign in to comment.