Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update JsonToStructs and ScanJson to have white space normalization #10575

Merged
merged 6 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
297 changes: 280 additions & 17 deletions integration_tests/src/main/python/json_matrix_test.py

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,14 @@ def do_read(spark):
'boolean.json',
pytest.param('boolean_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4779')),
'ints.json',
pytest.param('ints_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4793')),
pytest.param('ints_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4940')), # This fails for dates, as not all are invalid
'nan_and_inf.json',
pytest.param('nan_and_inf_strings.json', marks=pytest.mark.skipif(is_before_spark_330(), reason='https://issues.apache.org/jira/browse/SPARK-38060 fixed in Spark 3.3.0')),
'nan_and_inf_invalid.json',
'floats.json',
'floats_leading_zeros.json',
'floats_invalid.json',
pytest.param('floats_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4647')),
'floats_edge_cases.json',
'decimals.json',
'dates.json',
'dates_invalid.json',
Expand Down
12 changes: 12 additions & 0 deletions integration_tests/src/test/resources/int_array_formatted.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{"data": [1,0]}
{"data": [-1,,100]}
{"data": [-0,5,6,7 ,8 , 9 ]}
{"data": [0]}
{"data": [127, -128]}
{"data": []}
{"data": [32767, -32768]}
{"data": [2147483647, -2147483648]}
{"data": [9223372036854775807,-9223372036854775808]}
{"data": [9223372036854775808, -9223372036854775809]}
{"data": [99999999999999999999999999999999999999, -99999999999999999999999999999999999999]}
{"data": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{"data": {"A": 0, "B": 1}}
{"data": [1,0]}
{"data": {"A": 1}}
{"data": [-1,,100]}
{"data": {"B": 50}}
{"data": [0]}
{"data": null}
{"data": []}
{"data": {"B": -128, "A": 127}}
{"data": [127, -128]}
{"data": {"A": 32767, "B": -32767}}
{"data": [32767, -32768]}
{"data": {"A": 214783647, "B": -2147483648}}
{"data": [2147483647, -2147483648]}
{"data": {"A": 9223372036854775807, "B": -9223372036854775808}}
{"data": [9223372036854775807,-9223372036854775808]}
{"data": {"A": 9223372036854775808,, "B": -9223372036854775809}}
{"data": [9223372036854775808, -9223372036854775809]}
{"data": {"B": 99999999999999999999999999999999999999, "A": -99999999999999999999999999999999999999}}
{"data": [99999999999999999999999999999999999999, -99999999999999999999999999999999999999]}
{"data": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"data": {"A": 0, "B": 1}}
{"data": {"A": 1}}
{"data": {"B": 50}}
{"data": {"B": -128, "A": 127}}
{"data": {"B": 99999999999999999999, "A": -9999999999999999999}}
149 changes: 90 additions & 59 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnCastUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,18 @@ object ColumnCastUtil {
*
* @param cv the view to be updated
* @param dt the Spark's data type of the input view (if applicable)
* @param nestedMismatchHandler a function that can handle a mismatch between nesting. This can
* include things like when a STRING is found, but a nested type is
* needed, or when a nested value is returned by CUDF but a
* non-nested type is expected.
* @param convert the partial function used to convert the data. If this matches and returns
* a updated view this function takes ownership of that view.
* @return None if there were no changes to the view or the updated view along with anything else
* that needs to be closed.
*/
def deepTransformView(cv: ColumnView, dt: Option[DataType] = None)
def deepTransformView(cv: ColumnView, dt: Option[DataType] = None,
nestedMismatchHandler: Option[(ColumnView, DataType) =>
(Option[ColumnView], ArrayBuffer[AutoCloseable])] = None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the handler need to return a mutable ArrayBuffer? I think the handler could return an immutable Seq given how it's being used, and that seems more flexible and less error-prone than forcing an ArrayBuffer here.

(convert: PartialFunction[(ColumnView, Option[DataType]), ColumnView]):
(Option[ColumnView], ArrayBuffer[AutoCloseable]) = {
closeOnExcept(ArrayBuffer.empty[AutoCloseable]) { needsClosing =>
Expand All @@ -64,71 +70,90 @@ object ColumnCastUtil {
// Recurse down if needed and check children
cv.getType.getTypeId match {
case DType.DTypeEnum.STRUCT =>
withResource(ArrayBuffer.empty[ColumnView]) { tmpNeedsClosed =>
val structFields = dt match {
case None => Array.empty[StructField]
case Some(t: StructType) => t.fields
case Some(t) => /* this should never be reach out */
val (structFields, transformedCv) = dt match {
case None => (Array.empty[StructField], None)
case Some(t: StructType) => (t.fields, None)
case Some(t) =>
nestedMismatchHandler.map { handler =>
// The fields is ignored
(Array.empty[StructField], Some(handler(cv, t)))
}.getOrElse {
throw new IllegalStateException("Invalid input DataType: " +
s"Expect StructType but got ${t.toString}")
}
var childrenUpdated = false
val newChildren = ArrayBuffer.empty[ColumnView]
(0 until cv.getNumChildren).foreach { index =>
val child = cv.getChildColumnView(index)
tmpNeedsClosed += child
val childDt = if (structFields.nonEmpty) {
Some(structFields(index).dataType)
} else {
None
s"CUDF returned STRUCT Spark asked for ${t.toString}")
}
val (updatedChild, needsClosingChild) = deepTransformView(child, childDt)(convert)
needsClosing ++= needsClosingChild
updatedChild match {
case Some(newChild) =>
newChildren += newChild
childrenUpdated = true
case None =>
newChildren += child
}
transformedCv.map {
case (updatedData, needsClosingData) =>
needsClosing ++= needsClosingData
(updatedData, needsClosing)
}.getOrElse {
withResource(ArrayBuffer.empty[ColumnView]) { tmpNeedsClosed =>
var childrenUpdated = false
val newChildren = ArrayBuffer.empty[ColumnView]
(0 until cv.getNumChildren).foreach { index =>
val child = cv.getChildColumnView(index)
tmpNeedsClosed += child
val childDt = if (structFields.nonEmpty) {
Some(structFields(index).dataType)
} else {
None
}
val (updatedChild, needsClosingChild) = deepTransformView(child, childDt,
nestedMismatchHandler)(convert)
needsClosing ++= needsClosingChild
updatedChild match {
case Some(newChild) =>
newChildren += newChild
childrenUpdated = true
case None =>
newChildren += child
}
}
}
if (childrenUpdated) {
withResource(cv.getValid) { valid =>
val ret = new ColumnView(DType.STRUCT, cv.getRowCount,
Optional.empty[java.lang.Long](), valid, null, newChildren.toArray)
(Some(ret), needsClosing)
if (childrenUpdated) {
withResource(cv.getValid) { valid =>
val ret = new ColumnView(DType.STRUCT, cv.getRowCount,
Optional.empty[java.lang.Long](), valid, null, newChildren.toArray)
(Some(ret), needsClosing)
}
} else {
(None, needsClosing)
}
} else {
(None, needsClosing)
}
}
case DType.DTypeEnum.LIST =>
withResource(cv.getChildColumnView(0)) { child =>
// A ColumnView of LIST type may have data type is ArrayType or MapType in Spark.
// If it is a MapType, its child will be a column of type struct<key, value>.
// In such cases, we need to generate the corresponding Spark's data type
// for the child column as a StructType.
val childDt = dt match {
case None => None
case Some(t: ArrayType) => Some(t.elementType)
case Some(_: BinaryType) => Some(ByteType)
case Some(t: MapType) => Some(StructType(Array(
StructField("key", t.keyType, nullable = false),
StructField("value", t.valueType, nullable = t.valueContainsNull))))
case Some(t) => /* this should never be reach out */
throw new IllegalStateException("Invalid input DataType: " +
s"Expect ArrayType/BinaryType/MapType but got ${t.toString}")
}
val (updatedData, needsClosingData) = deepTransformView(child, childDt)(convert)
needsClosing ++= needsClosingData
updatedData match {
case Some(updated) =>
(Some(GpuListUtils.replaceListDataColumnAsView(cv, updated)), needsClosing)
case None =>
(None, needsClosing)
// A ColumnView of LIST was found. There are some types that we can auto-transform,
// but, in some cases we need to fall back to other processing.
val (childDt, transformedResult) = dt match {
case None => (None, None)
case Some(t: ArrayType) => (Some(t.elementType), None)
case Some(_: BinaryType) => (Some(ByteType), None)
case Some(t: MapType) => (Some(StructType(Array(
StructField("key", t.keyType, nullable = false),
StructField("value", t.valueType, nullable = t.valueContainsNull)))), None)
case Some(t) =>
nestedMismatchHandler.map { handler =>
(None, Some(handler(cv, t)))
}.getOrElse {
withResource(cv.getChildColumnView(0)) { child =>
throw new IllegalStateException("Invalid input DataType: " +
s"CUDF returned LIST[${child.getType}] We expect Spark to want an " +
s"ArrayType/BinaryType/MapType but got ${t.toString}")
}
}
}
val (updatedData, needsClosingData) = transformedResult.getOrElse {
withResource(cv.getChildColumnView(0)) { child =>
val (ud, nc) = deepTransformView(child, childDt, nestedMismatchHandler)(convert)
ud match {
case Some(updated) =>
(Some(GpuListUtils.replaceListDataColumnAsView(cv, updated)), nc)
case None =>
(None, nc)
}
}
}

needsClosing ++= needsClosingData
(updatedData, needsClosing)
case _ =>
(None, needsClosing)
}
Expand All @@ -143,13 +168,19 @@ object ColumnCastUtil {
*
* @param cv the vector to be updated
* @param dt the Spark's data type of the input vector (if applicable)
* @param nestedMismatchHandler a function that can handle a mismatch between nesting. This can
* include things like when a STRING is found, but a nested type is
* needed, or when a nested value is returned by CUDF but a
* non-nested type is expected.
* @param convert the partial function used to convert the data. If this matches and returns
* a updated view this function takes ownership of that view.
* @return the updated vector
*/
def deepTransform(cv: ColumnVector, dt: Option[DataType] = None)
def deepTransform(cv: ColumnVector, dt: Option[DataType] = None,
nestedMismatchHandler: Option[(ColumnView, DataType) =>
(Option[ColumnView], ArrayBuffer[AutoCloseable])] = None)
jlowe marked this conversation as resolved.
Show resolved Hide resolved
(convert: PartialFunction[(ColumnView, Option[DataType]), ColumnView]): ColumnVector = {
val (retView, needsClosed) = deepTransformView(cv, dt)(convert)
val (retView, needsClosed) = deepTransformView(cv, dt, nestedMismatchHandler)(convert)
withResource(needsClosed) { _ =>
retView match {
case Some(updated) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ package org.apache.spark.sql.rapids

import java.util.Locale

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{BinaryOp, CaptureGroups, ColumnVector, ColumnView, DType, RegexProgram, Scalar, Schema, Table}
import com.nvidia.spark.rapids.{ColumnCastUtil, GpuCast, GpuColumnVector, GpuTextBasedPartitionReader}
import com.nvidia.spark.rapids.{ColumnCastUtil, GpuCast, GpuColumnVector, GpuScalar, GpuTextBasedPartitionReader}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingArray
import com.nvidia.spark.rapids.jni.CastStrings

import org.apache.spark.sql.catalyst.json.{GpuJsonUtils, JSONOptions}
import org.apache.spark.sql.rapids.shims.GpuJsonToStructsShim
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.{DataType, _}

/**
* This is a utility method intended to provide common functionality between JsonToStructs and
Expand Down Expand Up @@ -266,10 +268,53 @@ object GpuJsonReadCommon {
private def timestampFormat(options: JSONOptions): String =
GpuJsonUtils.timestampFormatInRead(options)

private def throwMismatchException(cv: ColumnView,
dt: DataType): (Option[ColumnView], ArrayBuffer[AutoCloseable]) = {
jlowe marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalStateException(s"Don't know how to transform $cv to $dt for JSON")
}

private def nestedColumnViewMismatchTransform(cv: ColumnView,
dt: DataType): (Option[ColumnView], ArrayBuffer[AutoCloseable]) = {
jlowe marked this conversation as resolved.
Show resolved Hide resolved
// In the future we should be able to convert strings to maps/etc, but for
// now we are working around issues where CUDF is not returning a STRING for nested
// types when asked for it.
cv.getType match {
case DType.LIST =>
dt match {
case ByteType | ShortType | IntegerType | LongType |
BooleanType | FloatType | DoubleType |
_: DecimalType | _: StructType =>
// This is all nulls
val rows = cv.getRowCount().toInt
val ret = withResource(GpuScalar.from(null, dt)) { nullScalar =>
ColumnVector.fromScalar(nullScalar, rows)
}
(Some(ret.asInstanceOf[ColumnView]), ArrayBuffer(ret))
jlowe marked this conversation as resolved.
Show resolved Hide resolved
case _ =>
throwMismatchException(cv, dt)
}
case DType.STRUCT =>
dt match {
case _: ArrayType =>
// This is all nulls
val rows = cv.getRowCount().toInt
val ret = withResource(GpuScalar.from(null, dt)) { nullScalar =>
ColumnVector.fromScalar(nullScalar, rows)
}
(Some(ret.asInstanceOf[ColumnView]), ArrayBuffer(ret))
jlowe marked this conversation as resolved.
Show resolved Hide resolved
case _ =>
throwMismatchException(cv, dt)
}
case _ =>
throwMismatchException(cv, dt)
}
}

private def convertToDesiredType(inputCv: ColumnVector,
topLevelType: DataType,
options: JSONOptions): ColumnVector = {
ColumnCastUtil.deepTransform(inputCv, Some(topLevelType)) {
ColumnCastUtil.deepTransform(inputCv, Some(topLevelType),
Some(nestedColumnViewMismatchTransform)) {
case (cv, Some(BooleanType)) if cv.getType == DType.STRING =>
castJsonStringToBool(cv)
case (cv, Some(DateType)) if cv.getType == DType.STRING =>
Expand Down Expand Up @@ -324,6 +369,7 @@ object GpuJsonReadCommon {
ai.rapids.cudf.JSONOptions.builder()
.withRecoverWithNull(true)
.withMixedTypesAsStrings(enableMixedTypes)
.withNormalizeWhitespace(true)
.withKeepQuotes(true)
.withNormalizeSingleQuotes(options.allowSingleQuotes)
.build()
Expand Down
Loading