Skip to content

Commit

Permalink
improve
Browse files Browse the repository at this point in the history
  • Loading branch information
panbingkun committed Dec 19, 2024
1 parent 0c2fd7c commit be1ea18
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,38 +169,38 @@ case class SchemaOfJsonEvaluator(options: Map[String, String]) {
/**
* The expression `JsonTuple` will utilize it to support codegen.
*/
case class JsonTupleEvaluator(foldableFields: IndexedSeq[Option[String]]) {
case class JsonTupleEvaluator(foldableFieldNames: Array[Option[String]]) {

import SharedFactory._

// If processing fails this shared value will be returned.
@transient private lazy val nullRow: Seq[InternalRow] =
new GenericInternalRow(Array.ofDim[Any](foldableFields.length)) :: Nil
new GenericInternalRow(Array.ofDim[Any](foldableFieldNames.length)) :: Nil

// And count the number of foldable fields, we'll use this later to optimize evaluation.
@transient private lazy val constantFields: Int = foldableFields.count(_ != null)
@transient private lazy val constantFields: Int = foldableFieldNames.count(_ != null)

private def getCachedFields(fields: Seq[UTF8String]): Seq[String] = {
private def getFieldNameStrings(fields: Array[UTF8String]): Array[String] = {
// Evaluate the field names as String rather than UTF8String to
// optimize lookups from the json token, which is also a String.
if (constantFields == fields.length) {
// Typically the user will provide the field names as foldable expressions
// so we can use the cached copy.
foldableFields.map(_.orNull)
foldableFieldNames.map(_.orNull)
} else if (constantFields == 0) {
// None are foldable so all field names need to be evaluated from the input row.
fields.map { f => Option(f.toString).orNull }
} else {
// If there is a mix of constant and non-constant expressions
// prefer the cached copy when available.
foldableFields.zip(fields).map {
foldableFieldNames.zip(fields).map {
case (null, f) => Option(f.toString).orNull
case (fieldName, _) => fieldName.orNull
}
}
}

private def parseRow(parser: JsonParser, fieldNames: Seq[String]): Seq[InternalRow] = {
private def parseRow(parser: JsonParser, fieldNames: Array[String]): Seq[InternalRow] = {
// Only objects are supported.
if (parser.nextToken() != JsonToken.START_OBJECT) return nullRow

Expand Down Expand Up @@ -265,13 +265,13 @@ case class JsonTupleEvaluator(foldableFields: IndexedSeq[Option[String]]) {
}
}

final def evaluate(json: UTF8String, fields: Seq[UTF8String]): Seq[InternalRow] = {
final def evaluate(json: UTF8String, fieldNames: Array[UTF8String]): Seq[InternalRow] = {
if (json == null) return nullRow
try {
/* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having Jackson
detect character encoding which could fail for some malformed strings. */
Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) { parser =>
parseRow(parser, getCachedFields(fields))
parseRow(parser, getFieldNameStrings(fieldNames))
}
} catch {
case _: JsonProcessingException => nullRow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.expressions

import java.io._

import scala.collection.immutable.ArraySeq
import scala.util.parsing.combinator.RegexParsers

import com.fasterxml.jackson.core._
Expand Down Expand Up @@ -463,11 +462,11 @@ case class JsonTuple(children: Seq[Expression])
@transient private lazy val fieldExpressions: Seq[Expression] = children.tail

// Eagerly evaluate any foldable the field names.
@transient private lazy val foldableFieldNames: IndexedSeq[Option[String]] = {
@transient private lazy val foldableFieldNames: Array[Option[String]] = {
fieldExpressions.map {
case expr if expr.foldable => Option(expr.eval()).map(_.asInstanceOf[UTF8String].toString)
case _ => null
}.toIndexedSeq
}.toArray
}

override def elementSchema: StructType = StructType(fieldExpressions.zipWithIndex.map {
Expand Down Expand Up @@ -498,17 +497,16 @@ case class JsonTuple(children: Seq[Expression])

override def eval(input: InternalRow): IterableOnce[InternalRow] = {
val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
val fields = fieldExpressions.map(_.eval(input).asInstanceOf[UTF8String])
evaluator.evaluate(json, fields)
val filedNames = fieldExpressions.map(_.eval(input).asInstanceOf[UTF8String]).toArray
evaluator.evaluate(json, filedNames)
}

override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val refEvaluator = ctx.addReferenceObj("evaluator", evaluator)
val jsonTerm = ctx.freshName("json")
val jsonEval = jsonExpr.genCode(ctx)
val fieldsTerm = ctx.freshName("fields")
val fieldsEval = fieldExpressions.map(_.genCode(ctx))
val arraySeqClass = classOf[ArraySeq[_]].getName
val filedNamesTerm = ctx.freshName("fieldNames")
val fieldNamesEval = fieldExpressions.map(_.genCode(ctx))
val wrapperClass = classOf[IterableOnce[_]].getName
val setJson =
s"""
Expand All @@ -518,27 +516,27 @@ case class JsonTuple(children: Seq[Expression])
| $jsonTerm = ${jsonEval.value};
|}
|""".stripMargin
val setFields = fieldsEval.zipWithIndex.map {
case (fieldEval, idx) =>
val setFieldNames = fieldNamesEval.zipWithIndex.map {
case (fieldNameEval, idx) =>
s"""
|if (${fieldEval.isNull}) {
| $fieldsTerm[$idx] = null;
|if (${fieldNameEval.isNull}) {
| $filedNamesTerm[$idx] = null;
|} else {
| $fieldsTerm[$idx] = ${fieldEval.value};
| $filedNamesTerm[$idx] = ${fieldNameEval.value};
|}
|""".stripMargin
}
ev.copy(code =
code"""
|UTF8String $jsonTerm = null;
|UTF8String[] $fieldsTerm = new UTF8String[${fieldExpressions.length - 1}];
|UTF8String[] $filedNamesTerm = new UTF8String[${fieldExpressions.length}];
|${jsonEval.code}
|${fieldsEval.map(_.code).mkString("\n")}
|${fieldNamesEval.map(_.code).mkString("\n")}
|$setJson
|${setFields.mkString("\n")}
|${setFieldNames.mkString("\n")}
|boolean ${ev.isNull} = false;
|$wrapperClass<InternalRow> ${ev.value} =
| $refEvaluator.evaluate($jsonTerm, new $arraySeqClass.ofRef($fieldsTerm));
| $refEvaluator.evaluate($jsonTerm, $filedNamesTerm);
|""".stripMargin)
}

Expand Down

0 comments on commit be1ea18

Please sign in to comment.