Skip to content

Commit

Permalink
Update JsonToStructs and ScanJson to have white space normalization (#…
Browse files Browse the repository at this point in the history
…10575)

Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Mar 15, 2024
1 parent 7177c3a commit a56da0c
Show file tree
Hide file tree
Showing 7 changed files with 461 additions and 85 deletions.
297 changes: 280 additions & 17 deletions integration_tests/src/main/python/json_matrix_test.py

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,14 @@ def do_read(spark):
'boolean.json',
pytest.param('boolean_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4779')),
'ints.json',
pytest.param('ints_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4793')),
pytest.param('ints_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4940')), # This fails for dates, as not all are invalid
'nan_and_inf.json',
pytest.param('nan_and_inf_strings.json', marks=pytest.mark.skipif(is_before_spark_330(), reason='https://issues.apache.org/jira/browse/SPARK-38060 fixed in Spark 3.3.0')),
'nan_and_inf_invalid.json',
'floats.json',
'floats_leading_zeros.json',
'floats_invalid.json',
pytest.param('floats_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4647')),
'floats_edge_cases.json',
'decimals.json',
'dates.json',
'dates_invalid.json',
Expand Down
12 changes: 12 additions & 0 deletions integration_tests/src/test/resources/int_array_formatted.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{"data": [1,0]}
{"data": [-1,,100]}
{"data": [-0,5,6,7 ,8 , 9 ]}
{"data": [0]}
{"data": [127, -128]}
{"data": []}
{"data": [32767, -32768]}
{"data": [2147483647, -2147483648]}
{"data": [9223372036854775807,-9223372036854775808]}
{"data": [9223372036854775808, -9223372036854775809]}
{"data": [99999999999999999999999999999999999999, -99999999999999999999999999999999999999]}
{"data": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{"data": {"A": 0, "B": 1}}
{"data": [1,0]}
{"data": {"A": 1}}
{"data": [-1,,100]}
{"data": {"B": 50}}
{"data": [0]}
{"data": null}
{"data": []}
{"data": {"B": -128, "A": 127}}
{"data": [127, -128]}
{"data": {"A": 32767, "B": -32767}}
{"data": [32767, -32768]}
{"data": {"A": 214783647, "B": -2147483648}}
{"data": [2147483647, -2147483648]}
{"data": {"A": 9223372036854775807, "B": -9223372036854775808}}
{"data": [9223372036854775807,-9223372036854775808]}
{"data": {"A": 9223372036854775808,, "B": -9223372036854775809}}
{"data": [9223372036854775808, -9223372036854775809]}
{"data": {"B": 99999999999999999999999999999999999999, "A": -99999999999999999999999999999999999999}}
{"data": [99999999999999999999999999999999999999, -99999999999999999999999999999999999999]}
{"data": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"data": {"A": 0, "B": 1}}
{"data": {"A": 1}}
{"data": {"B": 50}}
{"data": {"B": -128, "A": 127}}
{"data": {"B": 99999999999999999999, "A": -9999999999999999999}}
157 changes: 94 additions & 63 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnCastUtil.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,92 +45,117 @@ object ColumnCastUtil {
*
* @param cv the view to be updated
* @param dt the Spark's data type of the input view (if applicable)
* @param nestedMismatchHandler a function that can handle a mismatch between nesting. This can
* include things like when a STRING is found, but a nested type is
* needed, or when a nested value is returned by CUDF but a
* non-nested type is expected.
* @param convert the partial function used to convert the data. If this matches and returns
* a updated view this function takes ownership of that view.
* @return None if there were no changes to the view or the updated view along with anything else
* that needs to be closed.
*/
def deepTransformView(cv: ColumnView, dt: Option[DataType] = None)
def deepTransformView(cv: ColumnView, dt: Option[DataType] = None,
nestedMismatchHandler: Option[(ColumnView, DataType) =>
(Option[ColumnView], Seq[AutoCloseable])] = None)
(convert: PartialFunction[(ColumnView, Option[DataType]), ColumnView]):
(Option[ColumnView], ArrayBuffer[AutoCloseable]) = {
(Option[ColumnView], Seq[AutoCloseable]) = {
closeOnExcept(ArrayBuffer.empty[AutoCloseable]) { needsClosing =>
val updated = convert.lift((cv, dt))
needsClosing ++= updated

updated match {
case Some(newCv) =>
(Some(newCv), needsClosing)
(Some(newCv), needsClosing.toSeq)
case None =>
// Recurse down if needed and check children
cv.getType.getTypeId match {
case DType.DTypeEnum.STRUCT =>
withResource(ArrayBuffer.empty[ColumnView]) { tmpNeedsClosed =>
val structFields = dt match {
case None => Array.empty[StructField]
case Some(t: StructType) => t.fields
case Some(t) => /* this should never be reach out */
val (structFields, transformedCv) = dt match {
case None => (Array.empty[StructField], None)
case Some(t: StructType) => (t.fields, None)
case Some(t) =>
nestedMismatchHandler.map { handler =>
// The fields is ignored
(Array.empty[StructField], Some(handler(cv, t)))
}.getOrElse {
throw new IllegalStateException("Invalid input DataType: " +
s"Expect StructType but got ${t.toString}")
}
var childrenUpdated = false
val newChildren = ArrayBuffer.empty[ColumnView]
(0 until cv.getNumChildren).foreach { index =>
val child = cv.getChildColumnView(index)
tmpNeedsClosed += child
val childDt = if (structFields.nonEmpty) {
Some(structFields(index).dataType)
} else {
None
s"CUDF returned STRUCT Spark asked for ${t.toString}")
}
val (updatedChild, needsClosingChild) = deepTransformView(child, childDt)(convert)
needsClosing ++= needsClosingChild
updatedChild match {
case Some(newChild) =>
newChildren += newChild
childrenUpdated = true
case None =>
newChildren += child
}
transformedCv.map {
case (updatedData, needsClosingData) =>
needsClosing ++= needsClosingData
(updatedData, needsClosing.toSeq)
}.getOrElse {
withResource(ArrayBuffer.empty[ColumnView]) { tmpNeedsClosed =>
var childrenUpdated = false
val newChildren = ArrayBuffer.empty[ColumnView]
(0 until cv.getNumChildren).foreach { index =>
val child = cv.getChildColumnView(index)
tmpNeedsClosed += child
val childDt = if (structFields.nonEmpty) {
Some(structFields(index).dataType)
} else {
None
}
val (updatedChild, needsClosingChild) = deepTransformView(child, childDt,
nestedMismatchHandler)(convert)
needsClosing ++= needsClosingChild
updatedChild match {
case Some(newChild) =>
newChildren += newChild
childrenUpdated = true
case None =>
newChildren += child
}
}
}
if (childrenUpdated) {
withResource(cv.getValid) { valid =>
val ret = new ColumnView(DType.STRUCT, cv.getRowCount,
Optional.empty[java.lang.Long](), valid, null, newChildren.toArray)
(Some(ret), needsClosing)
if (childrenUpdated) {
withResource(cv.getValid) { valid =>
val ret = new ColumnView(DType.STRUCT, cv.getRowCount,
Optional.empty[java.lang.Long](), valid, null, newChildren.toArray)
(Some(ret), needsClosing.toSeq)
}
} else {
(None, needsClosing.toSeq)
}
} else {
(None, needsClosing)
}
}
case DType.DTypeEnum.LIST =>
withResource(cv.getChildColumnView(0)) { child =>
// A ColumnView of LIST type may have data type is ArrayType or MapType in Spark.
// If it is a MapType, its child will be a column of type struct<key, value>.
// In such cases, we need to generate the corresponding Spark's data type
// for the child column as a StructType.
val childDt = dt match {
case None => None
case Some(t: ArrayType) => Some(t.elementType)
case Some(_: BinaryType) => Some(ByteType)
case Some(t: MapType) => Some(StructType(Array(
StructField("key", t.keyType, nullable = false),
StructField("value", t.valueType, nullable = t.valueContainsNull))))
case Some(t) => /* this should never be reach out */
throw new IllegalStateException("Invalid input DataType: " +
s"Expect ArrayType/BinaryType/MapType but got ${t.toString}")
}
val (updatedData, needsClosingData) = deepTransformView(child, childDt)(convert)
needsClosing ++= needsClosingData
updatedData match {
case Some(updated) =>
(Some(GpuListUtils.replaceListDataColumnAsView(cv, updated)), needsClosing)
case None =>
(None, needsClosing)
// A ColumnView of LIST was found. There are some types that we can auto-transform,
// but, in some cases we need to fall back to other processing.
val (childDt, transformedResult) = dt match {
case None => (None, None)
case Some(t: ArrayType) => (Some(t.elementType), None)
case Some(_: BinaryType) => (Some(ByteType), None)
case Some(t: MapType) => (Some(StructType(Array(
StructField("key", t.keyType, nullable = false),
StructField("value", t.valueType, nullable = t.valueContainsNull)))), None)
case Some(t) =>
nestedMismatchHandler.map { handler =>
(None, Some(handler(cv, t)))
}.getOrElse {
withResource(cv.getChildColumnView(0)) { child =>
throw new IllegalStateException("Invalid input DataType: " +
s"CUDF returned LIST[${child.getType}] We expect Spark to want an " +
s"ArrayType/BinaryType/MapType but got ${t.toString}")
}
}
}
val (updatedData, needsClosingData) = transformedResult.getOrElse {
withResource(cv.getChildColumnView(0)) { child =>
val (ud, nc) = deepTransformView(child, childDt, nestedMismatchHandler)(convert)
ud match {
case Some(updated) =>
(Some(GpuListUtils.replaceListDataColumnAsView(cv, updated)), nc)
case None =>
(None, nc)
}
}
}

needsClosing ++= needsClosingData
(updatedData, needsClosing.toSeq)
case _ =>
(None, needsClosing)
(None, needsClosing.toSeq)
}
}
}
Expand All @@ -143,13 +168,19 @@ object ColumnCastUtil {
*
* @param cv the vector to be updated
* @param dt the Spark's data type of the input vector (if applicable)
* @param nestedMismatchHandler a function that can handle a mismatch between nesting. This can
* include things like when a STRING is found, but a nested type is
* needed, or when a nested value is returned by CUDF but a
* non-nested type is expected.
* @param convert the partial function used to convert the data. If this matches and returns
* a updated view this function takes ownership of that view.
* @return the updated vector
*/
def deepTransform(cv: ColumnVector, dt: Option[DataType] = None)
def deepTransform(cv: ColumnVector, dt: Option[DataType] = None,
nestedMismatchHandler: Option[(ColumnView, DataType) =>
(Option[ColumnView], Seq[AutoCloseable])] = None)
(convert: PartialFunction[(ColumnView, Option[DataType]), ColumnView]): ColumnVector = {
val (retView, needsClosed) = deepTransformView(cv, dt)(convert)
val (retView, needsClosed) = deepTransformView(cv, dt, nestedMismatchHandler)(convert)
withResource(needsClosed) { _ =>
retView match {
case Some(updated) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ package org.apache.spark.sql.rapids
import java.util.Locale

import ai.rapids.cudf.{BinaryOp, CaptureGroups, ColumnVector, ColumnView, DType, RegexProgram, Scalar, Schema, Table}
import com.nvidia.spark.rapids.{ColumnCastUtil, GpuCast, GpuColumnVector, GpuTextBasedPartitionReader}
import com.nvidia.spark.rapids.{ColumnCastUtil, GpuCast, GpuColumnVector, GpuScalar, GpuTextBasedPartitionReader}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingArray
import com.nvidia.spark.rapids.jni.CastStrings

import org.apache.spark.sql.catalyst.json.{GpuJsonUtils, JSONOptions}
import org.apache.spark.sql.rapids.shims.GpuJsonToStructsShim
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.{DataType, _}

/**
* This is a utility method intended to provide common functionality between JsonToStructs and
Expand Down Expand Up @@ -266,10 +266,53 @@ object GpuJsonReadCommon {
private def timestampFormat(options: JSONOptions): String =
GpuJsonUtils.timestampFormatInRead(options)

private def throwMismatchException(cv: ColumnView,
dt: DataType): (Option[ColumnView], Seq[AutoCloseable]) = {
throw new IllegalStateException(s"Don't know how to transform $cv to $dt for JSON")
}

private def nestedColumnViewMismatchTransform(cv: ColumnView,
dt: DataType): (Option[ColumnView], Seq[AutoCloseable]) = {
// In the future we should be able to convert strings to maps/etc, but for
// now we are working around issues where CUDF is not returning a STRING for nested
// types when asked for it.
cv.getType match {
case DType.LIST =>
dt match {
case ByteType | ShortType | IntegerType | LongType |
BooleanType | FloatType | DoubleType |
_: DecimalType | _: StructType =>
// This is all nulls
val rows = cv.getRowCount().toInt
val ret = withResource(GpuScalar.from(null, dt)) { nullScalar =>
ColumnVector.fromScalar(nullScalar, rows)
}
(Some(ret.asInstanceOf[ColumnView]), Seq(ret))
case _ =>
throwMismatchException(cv, dt)
}
case DType.STRUCT =>
dt match {
case _: ArrayType =>
// This is all nulls
val rows = cv.getRowCount().toInt
val ret = withResource(GpuScalar.from(null, dt)) { nullScalar =>
ColumnVector.fromScalar(nullScalar, rows)
}
(Some(ret.asInstanceOf[ColumnView]), Seq(ret))
case _ =>
throwMismatchException(cv, dt)
}
case _ =>
throwMismatchException(cv, dt)
}
}

private def convertToDesiredType(inputCv: ColumnVector,
topLevelType: DataType,
options: JSONOptions): ColumnVector = {
ColumnCastUtil.deepTransform(inputCv, Some(topLevelType)) {
ColumnCastUtil.deepTransform(inputCv, Some(topLevelType),
Some(nestedColumnViewMismatchTransform)) {
case (cv, Some(BooleanType)) if cv.getType == DType.STRING =>
castJsonStringToBool(cv)
case (cv, Some(DateType)) if cv.getType == DType.STRING =>
Expand Down Expand Up @@ -324,6 +367,7 @@ object GpuJsonReadCommon {
ai.rapids.cudf.JSONOptions.builder()
.withRecoverWithNull(true)
.withMixedTypesAsStrings(enableMixedTypes)
.withNormalizeWhitespace(true)
.withKeepQuotes(true)
.withNormalizeSingleQuotes(options.allowSingleQuotes)
.build()
Expand Down

0 comments on commit a56da0c

Please sign in to comment.