Skip to content

Commit

Permalink
[FEA] support json to struct function (#8174)
Browse files Browse the repository at this point in the history
* experimenting with json to struct

Signed-off-by: Cindy Jiang <cindyj@nvidia.com>

* added json to struct function implementation and integration tests

Signed-off-by: Cindy Jiang <cindyj@nvidia.com>

* fixed memory leak in json to struct implementation and added documentation

Signed-off-by: Cindy Jiang <cindyj@nvidia.com>

* fixed Arm import

Signed-off-by: Cindy Jiang <cindyj@nvidia.com>

* updated json to struct implementation to cast return type and added more tests

Signed-off-by: Cindy Jiang <cindyj@nvidia.com>

* updated json to struct supported types and documentation

Signed-off-by: Cindy Jiang <cindyj@nvidia.com>

* updated implementation to use GpuCast and updated tests

Signed-off-by: Cindy Jiang <cindyj@nvidia.com>

* fixed code style and removed unused imports

Signed-off-by: Cindy Jiang <cindyj@nvidia.com>

* added some comments for getSparkType function

Signed-off-by: Cindy Jiang <cindyj@nvidia.com>

---------

Signed-off-by: Cindy Jiang <cindyj@nvidia.com>
  • Loading branch information
cindyyuanjiang authored May 1, 2023
1 parent 54f77b0 commit 2b2835e
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 10 deletions.
10 changes: 8 additions & 2 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,15 +313,21 @@ The following input is invalid and will cause error:
{"name": Justin", "age":19}
```

Reading input with duplicated json key names is also incompatible with CPU Spark.

### JSON supporting types

In the current version, nested types (array, struct, and map types) are not yet supported in regular JSON parsing.

### `from_json` function

This particular function supports to output a map type with limited functionalities. In particular, the output map is not resulted from a regular JSON parsing but instead it will just contain plain text of key-value pairs extracted directly from the input JSON string.
This particular function supports to output a map or struct type with limited functionalities.

For struct output type, the function only supports struct of struct, array, string and int types. The output is incompatible if duplicated json key names are present in the input strings. For schemas that include IntegerType,
if arbitrarily large numbers are specified in the JSON strings, the GPU implementation will cast the numbers to
IntegerType, whereas CPU Spark will return null.

Due to such limitations, the input JSON schema must be `MAP<STRING,STRING>` and nothing else. Furthermore, there is no validation, no error tolerance, no data conversion as well as string formatting is performed. This may lead to some minor differences in the output if compared to the result of Spark CPU's `from_json`, such as:
In particular, the output map is not resulted from a regular JSON parsing but instead it will just contain plain text of key-value pairs extracted directly from the input JSON string. Due to such limitations, the input JSON map type schema must be `MAP<STRING,STRING>` and nothing else. Furthermore, there is no validation, no error tolerance, no data conversion as well as string formatting is performed. This may lead to some minor differences in the output if compared to the result of Spark CPU's `from_json`, such as:
* Floating point numbers in the input JSON string such as `1.2000` will not be reformatted to `1.2`. Instead, the output will be the same as the input.
* If the input JSON is given as multiple rows, any row containing invalid JSON format will lead to an application crash. On the other hand, Spark CPU version just produces nulls for the invalid rows, as shown below:
```
Expand Down
4 changes: 2 additions & 2 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -7890,8 +7890,8 @@ are limited.
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS<br/>unsupported child types BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, DATE, TIMESTAMP, DECIMAL, NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT</em></td>
<td><b>NS</b></td>
<td><em>PS<br/>MAP only supports keys and values that are of STRING type;<br/>unsupported child types BOOLEAN, BYTE, SHORT, LONG, FLOAT, DOUBLE, DATE, TIMESTAMP, DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT</em></td>
<td><em>PS<br/>unsupported child types BOOLEAN, BYTE, SHORT, LONG, FLOAT, DOUBLE, DATE, TIMESTAMP, DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT</em></td>
<td> </td>
</tr>
<tr>
Expand Down
50 changes: 49 additions & 1 deletion integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,4 +367,52 @@ def test_from_json_map():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json(f.col('a'), 'MAP<STRING,STRING>')),
conf={"spark.rapids.sql.expression.JsonToStructs": "true"})
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@allow_non_gpu('ProjectExec', 'JsonToStructs')
def test_from_json_map_fallback():
# The test here is working around some inconsistencies in how the keys are parsed for maps
# on the GPU the keys are dense, but on the CPU they are sparse
json_string_gen = StringGen(r'{"a": \d\d}')
assert_gpu_fallback_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json(f.col('a'), 'MAP<STRING,INT>')),
'JsonToStructs',
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.parametrize('data_gen', [StringGen(r'{"a": "[0-9]{0,5}", "b": "[A-Z]{0,5}", "c": 1234}')])
@pytest.mark.parametrize('schema', [StructType([StructField("a", StringType())]),
StructType([StructField("d", StringType())]),
StructType([StructField("a", StringType()), StructField("b", StringType())]),
StructType([StructField("c", IntegerType()), StructField("a", StringType())]),
StructType([StructField("a", StringType()), StructField("a", StringType())])
])
def test_from_json_struct(data_gen, schema):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen) \
.select(f.from_json(f.col('a'), schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.parametrize('data_gen', [StringGen(r'{"teacher": "Alice", "student": {"name": "Bob", "age": 20}}')])
@pytest.mark.parametrize('schema', [StructType([StructField("teacher", StringType())]),
StructType([StructField("student", StructType([StructField("name", StringType()), \
StructField("age", IntegerType())]))])])
def test_from_json_struct_of_struct(data_gen, schema):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen) \
.select(f.from_json(f.col('a'), schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.parametrize('data_gen', [StringGen(r'{"teacher": "Alice", "student": \[{"name": "Bob", "class": "junior"},' \
r'{"name": "Charlie", "class": "freshman"}\]}')])
@pytest.mark.parametrize('schema', [StructType([StructField("teacher", StringType())]),
StructType([StructField("student", ArrayType(StructType([StructField("name", StringType()), \
StructField("class", StringType())])))]),
StructType([StructField("teacher", StringType()), \
StructField("student", ArrayType(StructType([StructField("name", StringType()), \
StructField("class", StringType())])))])])
def test_from_json_struct_of_list(data_gen, schema):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen) \
.select(f.from_json(f.col('a'), schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True})
Original file line number Diff line number Diff line change
Expand Up @@ -3373,14 +3373,25 @@ object GpuOverrides extends Logging {
expr[JsonToStructs](
"Returns a struct value with the given `jsonStr` and `schema`",
ExprChecks.projectOnly(
TypeSig.MAP.nested(TypeSig.STRING),
TypeSig.MAP.nested(TypeSig.STRING).withPsNote(TypeEnum.MAP,
"MAP only supports keys and values that are of STRING type") +
TypeSig.STRUCT.nested(TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.STRING + TypeSig.INT),
(TypeSig.STRUCT + TypeSig.MAP + TypeSig.ARRAY).nested(TypeSig.all),
Seq(ParamCheck("jsonStr", TypeSig.STRING, TypeSig.STRING))),
(a, conf, p, r) => new UnaryExprMeta[JsonToStructs](a, conf, p, r) {
override def tagExprForGpu(): Unit =
a.schema match {
case MapType(_: StringType, _: StringType, _) => ()
case MapType(kt, vt, _) => {
willNotWorkOnGpu("JsonToStructs only supports MapType<StringType, StringType> for " +
s"input MapType schema, but received MapType<$kt, $vt>")
}
case _ => ()
}
GpuJsonScan.tagJsonToStructsSupport(a.options, this)

override def convertToGpu(child: Expression): GpuExpression =
// GPU implementation currently does not support duplicated json key names in input
GpuJsonToStructs(a.schema, a.options, child, a.timeZoneId)
}).disabledByDefault("parsing JSON from a column has a large number of issues and " +
"should be considered beta quality right now."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
package org.apache.spark.sql.rapids

import ai.rapids.cudf
import com.nvidia.spark.rapids.{GpuColumnVector, GpuUnaryExpression}
import com.nvidia.spark.rapids.{GpuColumnVector, GpuScalar, GpuUnaryExpression}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuCast.doCast
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq
import com.nvidia.spark.rapids.jni.MapUtils

import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, NullIntolerant, TimeZoneAwareExpression}
import org.apache.spark.sql.types.{AbstractDataType, DataType, StringType}
import org.apache.spark.sql.types._

case class GpuJsonToStructs(
schema: DataType,
Expand All @@ -30,8 +33,162 @@ case class GpuJsonToStructs(
timeZoneId: Option[String] = None)
extends GpuUnaryExpression with TimeZoneAwareExpression with ExpectsInputTypes
with NullIntolerant {

private def cleanAndConcat(input: cudf.ColumnVector): (cudf.ColumnVector, cudf.ColumnVector) ={
withResource(cudf.Scalar.fromString("{}")) { emptyRow =>
val stripped = withResource(cudf.Scalar.fromString(" ")) { space =>
input.strip(space)
}
withResource(stripped) { stripped =>
val isNullOrEmptyInput = withResource(input.isNull) { isNull =>
val isEmpty = withResource(stripped.getCharLengths) { lengths =>
withResource(cudf.Scalar.fromInt(0)) { zero =>
lengths.lessOrEqualTo(zero)
}
}
withResource(isEmpty) { isEmpty =>
isNull.binaryOp(cudf.BinaryOp.NULL_LOGICAL_OR, isEmpty, cudf.DType.BOOL8)
}
}
closeOnExcept(isNullOrEmptyInput) { _ =>
withResource(isNullOrEmptyInput.ifElse(emptyRow, stripped)) { cleaned =>
withResource(cudf.Scalar.fromString("\n")) { lineSep =>
withResource(cudf.Scalar.fromString("\r")) { returnSep =>
withResource(cleaned.stringContains(lineSep)) { inputHas =>
withResource(inputHas.any()) { anyLineSep =>
if (anyLineSep.isValid && anyLineSep.getBoolean) {
throw new IllegalArgumentException("We cannot currently support parsing " +
"JSON that contains a line separator in it")
}
}
}
withResource(cleaned.stringContains(returnSep)) { inputHas =>
withResource(inputHas.any()) { anyReturnSep =>
if (anyReturnSep.isValid && anyReturnSep.getBoolean) {
throw new IllegalArgumentException("We cannot currently support parsing " +
"JSON that contains a carriage return in it")
}
}
}
}
(isNullOrEmptyInput, cleaned.joinStrings(lineSep, emptyRow))
}
}
}
}
}
}

// Process a sequence of field names. If there are duplicated field names, we only keep the field
// name with the largest index in the sequence, for others, replace the field names with null.
// Example:
// Input = [("a", StringType), ("b", StringType), ("a", IntegerType)]
// Output = [(null, StringType), ("b", StringType), ("a", IntegerType)]
private def processFieldNames(names: Seq[(String, DataType)]): Seq[(String, DataType)] = {
val zero = (Set.empty[String], Seq.empty[(String, DataType)])
val (_, res) = names.foldRight(zero) { case ((name, dtype), (existingNames, acc)) =>
if (existingNames(name)) {
(existingNames, (null, dtype) +: acc)
} else {
(existingNames + name, (name, dtype) +: acc)
}
}
res
}

// Given a cudf column, return its Spark type
private def getSparkType(col: cudf.ColumnView): DataType = {
col.getType match {
case cudf.DType.INT8 | cudf.DType.UINT8 => ByteType
case cudf.DType.INT16 | cudf.DType.UINT16 => ShortType
case cudf.DType.INT32 | cudf.DType.UINT32 => IntegerType
case cudf.DType.INT64 | cudf.DType.UINT64 => LongType
case cudf.DType.FLOAT32 => FloatType
case cudf.DType.FLOAT64 => DoubleType
case cudf.DType.BOOL8 => BooleanType
case cudf.DType.STRING => StringType
case cudf.DType.LIST => ArrayType(getSparkType(col.getChildColumnView(0)))
case cudf.DType.STRUCT =>
val structFields = (0 until col.getNumChildren).map { i =>
val child = col.getChildColumnView(i)
StructField("", getSparkType(child))
}
StructType(structFields)
case t => throw new IllegalArgumentException(
s"GpuJsonToStructs currently cannot process CUDF column of type $t.")
}
}

override protected def doColumnar(input: GpuColumnVector): cudf.ColumnVector = {
MapUtils.extractRawMapFromJsonString(input.getBase)
schema match {
case _: MapType =>
MapUtils.extractRawMapFromJsonString(input.getBase)
case struct: StructType => {
// We cannot handle all corner cases with this right now. The parser just isn't
// good enough, but we will try to handle a few common ones.
val numRows = input.getRowCount.toInt

// Step 1: verify and preprocess the data to clean it up and normalize a few things
// Step 2: Concat the data into a single buffer
val (isNullOrEmpty, combined) = cleanAndConcat(input.getBase)
withResource(isNullOrEmpty) { isNullOrEmpty =>
// Step 3: copy the data back to the host so we can parse it.
val combinedHost = withResource(combined) { combined =>
combined.copyToHost()
}
// Step 4: Have cudf parse the JSON data
val (names, rawTable) = withResource(combinedHost) { combinedHost =>
val data = combinedHost.getData
val start = combinedHost.getStartListOffset(0)
val end = combinedHost.getEndListOffset(0)
val length = end - start

withResource(cudf.Table.readJSON(cudf.JSONOptions.DEFAULT, data, start,
length)) { tableWithMeta =>
val names = tableWithMeta.getColumnNames
(names, tableWithMeta.releaseTable())
}
}

// process duplicated field names in input struct schema
val fieldNames = processFieldNames(struct.fields.map { field =>
(field.name, field.dataType)})

withResource(rawTable) { rawTable =>
// Step 5: verify that the data looks correct
if (rawTable.getRowCount != numRows) {
throw new IllegalStateException("The input data didn't parse correctly and we read " +
s"a different number of rows than was expected. Expected $numRows, " +
s"but got ${rawTable.getRowCount}")
}

// Step 6: get the data based on input struct schema
val columns = fieldNames.safeMap { case (name, dtype) =>
val i = names.indexOf(name)
if (i == -1) {
GpuColumnVector.columnVectorFromNull(numRows, dtype)
} else {
val col = rawTable.getColumn(i)
// getSparkType is only used to get the from type for cast
doCast(col, getSparkType(col), dtype, false, false, false)
}
}

// Step 7: turn the data into a Struct
withResource(columns) { columns =>
withResource(cudf.ColumnVector.makeStruct(columns: _*)) { structData =>
// Step 8: put nulls back in for nulls and empty strings
withResource(GpuScalar.from(null, struct)) { nullVal =>
isNullOrEmpty.ifElse(nullVal, structData)
}
}
}
}
}
}
case _ => throw new IllegalArgumentException(
s"GpuJsonToStructs currently does not support schema of type $schema.")
}
}

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
Expand Down
2 changes: 1 addition & 1 deletion tools/generated_files/supportedExprs.csv
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ IsNotNull,S,`isnotnull`,None,project,result,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,N
IsNull,S,`isnull`,None,project,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS
IsNull,S,`isnull`,None,project,result,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
JsonToStructs,NS,`from_json`,This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now.,project,jsonStr,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
JsonToStructs,NS,`from_json`,This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now.,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NS,PS,NS,NA
JsonToStructs,NS,`from_json`,This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now.,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NS,PS,PS,NA
JsonTuple,S,`json_tuple`,None,project,json,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
JsonTuple,S,`json_tuple`,None,project,field,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA
JsonTuple,S,`json_tuple`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA
Expand Down

0 comments on commit 2b2835e

Please sign in to comment.