Skip to content

Commit

Permalink
[SPARK-50641][SQL] Move GetJsonObjectEvaluator to `JsonExpressionEv…
Browse files Browse the repository at this point in the history
…alUtils`

### What changes were proposed in this pull request?
The pr aims to move `GetJsonObjectEvaluator` to `JsonExpressionEvalUtils`.

### Why are the changes needed?
Make code clearly.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
- Pass GA
- Manually test.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#49259 from panbingkun/SPARK-50641.

Authored-by: panbingkun <panbingkun@apache.org>
Signed-off-by: panbingkun <panbingkun@apache.org>
  • Loading branch information
panbingkun committed Dec 23, 2024
1 parent 876450c commit 7cd5c4a
Show file tree
Hide file tree
Showing 3 changed files with 301 additions and 308 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;

import org.apache.spark.sql.catalyst.expressions.SharedFactory;
import org.apache.spark.sql.catalyst.json.CreateJacksonParser;
import org.apache.spark.sql.catalyst.util.GenericArrayData;
import org.apache.spark.unsafe.types.UTF8String;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
*/
package org.apache.spark.sql.catalyst.expressions.json

import java.io.{ByteArrayOutputStream, CharArrayWriter}
import java.io.{ByteArrayOutputStream, CharArrayWriter, StringWriter}

import scala.util.parsing.combinator.RegexParsers

import com.fasterxml.jackson.core._
import com.fasterxml.jackson.core.json.JsonReadFeature

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow, SharedFactory}
import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow}
import org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonGenerator, JacksonParser, JsonInferSchema, JSONOptions}
import org.apache.spark.sql.catalyst.util.{ArrayData, FailFastMode, FailureSafeParser, MapData, PermissiveMode}
Expand All @@ -32,6 +35,79 @@ import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, St
import org.apache.spark.unsafe.types.{UTF8String, VariantVal}
import org.apache.spark.util.Utils

private[this] sealed trait PathInstruction
private[this] object PathInstruction {
private[expressions] case object Subscript extends PathInstruction
private[expressions] case object Wildcard extends PathInstruction
private[expressions] case object Key extends PathInstruction
private[expressions] case class Index(index: Long) extends PathInstruction
private[expressions] case class Named(name: String) extends PathInstruction
}

private[this] sealed trait WriteStyle
private[this] object WriteStyle {
private[expressions] case object RawStyle extends WriteStyle
private[expressions] case object QuotedStyle extends WriteStyle
private[expressions] case object FlattenStyle extends WriteStyle
}

private[this] object JsonPathParser extends RegexParsers {
import PathInstruction._

def root: Parser[Char] = '$'

def long: Parser[Long] = "\\d+".r ^? {
case x => x.toLong
}

// parse `[*]` and `[123]` subscripts
def subscript: Parser[List[PathInstruction]] =
for {
operand <- '[' ~> ('*' ^^^ Wildcard | long ^^ Index) <~ ']'
} yield {
Subscript :: operand :: Nil
}

// parse `.name` or `['name']` child expressions
def named: Parser[List[PathInstruction]] =
for {
name <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\']+".r <~ "']"
} yield {
Key :: Named(name) :: Nil
}

// child wildcards: `..`, `.*` or `['*']`
def wildcard: Parser[List[PathInstruction]] =
(".*" | "['*']") ^^^ List(Wildcard)

def node: Parser[List[PathInstruction]] =
wildcard |
named |
subscript

val expression: Parser[List[PathInstruction]] = {
phrase(root ~> rep(node) ^^ (x => x.flatten))
}

def parse(str: String): Option[List[PathInstruction]] = {
this.parseAll(expression, str) match {
case Success(result, _) =>
Some(result)

case _ =>
None
}
}
}

private[this] object SharedFactory {
val jsonFactory: JsonFactory = new JsonFactoryBuilder()
// The two options below enabled for Hive compatibility
.enable(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS)
.enable(JsonReadFeature.ALLOW_SINGLE_QUOTES)
.build()
}

case class JsonToStructsEvaluator(
options: Map[String, String],
nullableSchema: DataType,
Expand Down Expand Up @@ -278,3 +354,225 @@ case class JsonTupleEvaluator(foldableFieldNames: Array[Option[String]]) {
}
}
}

/**
* The expression `GetJsonObject` will utilize it to support codegen.
*/
case class GetJsonObjectEvaluator(cachedPath: UTF8String) {
import com.fasterxml.jackson.core.JsonToken._
import PathInstruction._
import SharedFactory._
import WriteStyle._

def this() = this(null)

@transient
private lazy val parsedPath: Option[List[PathInstruction]] = parsePath(cachedPath)

@transient
private var jsonStr: UTF8String = _

@transient
private var pathStr: UTF8String = _

def setJson(arg: UTF8String): Unit = {
jsonStr = arg
}

def setPath(arg: UTF8String): Unit = {
pathStr = arg
}

def evaluate(): Any = {
if (jsonStr == null) return null

val parsed = if (cachedPath != null) {
parsedPath
} else {
parsePath(pathStr)
}

if (parsed.isDefined) {
try {
/* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having Jackson
detect character encoding which could fail for some malformed strings */
Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, jsonStr)) { parser =>
val output = new ByteArrayOutputStream()
val matched = Utils.tryWithResource(
jsonFactory.createGenerator(output, JsonEncoding.UTF8)) { generator =>
parser.nextToken()
evaluatePath(parser, generator, RawStyle, parsed.get)
}
if (matched) {
UTF8String.fromBytes(output.toByteArray)
} else {
null
}
}
} catch {
case _: JsonProcessingException => null
}
} else {
null
}
}

private def parsePath(path: UTF8String): Option[List[PathInstruction]] = {
if (path != null) {
JsonPathParser.parse(path.toString)
} else {
None
}
}

// advance to the desired array index, assumes to start at the START_ARRAY token
private def arrayIndex(p: JsonParser, f: () => Boolean): Long => Boolean = {
case _ if p.getCurrentToken == END_ARRAY =>
// terminate, nothing has been written
false

case 0 =>
// we've reached the desired index
val dirty = f()

while (p.nextToken() != END_ARRAY) {
// advance the token stream to the end of the array
p.skipChildren()
}

dirty

case i if i > 0 =>
// skip this token and evaluate the next
p.skipChildren()
p.nextToken()
arrayIndex(p, f)(i - 1)
}

/**
* Evaluate a list of JsonPath instructions, returning a bool that indicates if any leaf nodes
* have been written to the generator
*/
private def evaluatePath(
p: JsonParser,
g: JsonGenerator,
style: WriteStyle,
path: List[PathInstruction]): Boolean = {
(p.getCurrentToken, path) match {
case (VALUE_STRING, Nil) if style == RawStyle =>
// there is no array wildcard or slice parent, emit this string without quotes
if (p.hasTextCharacters) {
g.writeRaw(p.getTextCharacters, p.getTextOffset, p.getTextLength)
} else {
g.writeRaw(p.getText)
}
true

case (START_ARRAY, Nil) if style == FlattenStyle =>
// flatten this array into the parent
var dirty = false
while (p.nextToken() != END_ARRAY) {
dirty |= evaluatePath(p, g, style, Nil)
}
dirty

case (_, Nil) =>
// general case: just copy the child tree verbatim
g.copyCurrentStructure(p)
true

case (START_OBJECT, Key :: xs) =>
var dirty = false
while (p.nextToken() != END_OBJECT) {
if (dirty) {
// once a match has been found we can skip other fields
p.skipChildren()
} else {
dirty = evaluatePath(p, g, style, xs)
}
}
dirty

case (START_ARRAY, Subscript :: Wildcard :: Subscript :: Wildcard :: xs) =>
// special handling for the non-structure preserving double wildcard behavior in Hive
var dirty = false
g.writeStartArray()
while (p.nextToken() != END_ARRAY) {
dirty |= evaluatePath(p, g, FlattenStyle, xs)
}
g.writeEndArray()
dirty

case (START_ARRAY, Subscript :: Wildcard :: xs) if style != QuotedStyle =>
// retain Flatten, otherwise use Quoted... cannot use Raw within an array
val nextStyle = style match {
case RawStyle => QuotedStyle
case FlattenStyle => FlattenStyle
case QuotedStyle => throw SparkException.internalError("Unexpected the quoted style.")
}

// temporarily buffer child matches, the emitted json will need to be
// modified slightly if there is only a single element written
val buffer = new StringWriter()

var dirty = 0
Utils.tryWithResource(jsonFactory.createGenerator(buffer)) { flattenGenerator =>
flattenGenerator.writeStartArray()

while (p.nextToken() != END_ARRAY) {
// track the number of array elements and only emit an outer array if
// we've written more than one element, this matches Hive's behavior
dirty += (if (evaluatePath(p, flattenGenerator, nextStyle, xs)) 1 else 0)
}
flattenGenerator.writeEndArray()
}

val buf = buffer.getBuffer
if (dirty > 1) {
g.writeRawValue(buf.toString)
} else if (dirty == 1) {
// remove outer array tokens
g.writeRawValue(buf.substring(1, buf.length() - 1))
} // else do not write anything

dirty > 0

case (START_ARRAY, Subscript :: Wildcard :: xs) =>
var dirty = false
g.writeStartArray()
while (p.nextToken() != END_ARRAY) {
// wildcards can have multiple matches, continually update the dirty count
dirty |= evaluatePath(p, g, QuotedStyle, xs)
}
g.writeEndArray()

dirty

case (START_ARRAY, Subscript :: Index(idx) :: (xs@Subscript :: Wildcard :: _)) =>
p.nextToken()
// we're going to have 1 or more results, switch to QuotedStyle
arrayIndex(p, () => evaluatePath(p, g, QuotedStyle, xs))(idx)

case (START_ARRAY, Subscript :: Index(idx) :: xs) =>
p.nextToken()
arrayIndex(p, () => evaluatePath(p, g, style, xs))(idx)

case (FIELD_NAME, Named(name) :: xs) if p.currentName == name =>
// exact field match
if (p.nextToken() != JsonToken.VALUE_NULL) {
evaluatePath(p, g, style, xs)
} else {
false
}

case (FIELD_NAME, Wildcard :: xs) =>
// wildcard field match
p.nextToken()
evaluatePath(p, g, style, xs)

case _ =>
p.skipChildren()
false
}
}
}
Loading

0 comments on commit 7cd5c4a

Please sign in to comment.