diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala index d0a8a3b44d299..33e430ed730f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala @@ -169,38 +169,38 @@ case class SchemaOfJsonEvaluator(options: Map[String, String]) { /** * The expression `JsonTuple` will utilize it to support codegen. */ -case class JsonTupleEvaluator(foldableFields: IndexedSeq[Option[String]]) { +case class JsonTupleEvaluator(foldableFieldNames: Array[Option[String]]) { import SharedFactory._ // If processing fails this shared value will be returned. @transient private lazy val nullRow: Seq[InternalRow] = - new GenericInternalRow(Array.ofDim[Any](foldableFields.length)) :: Nil + new GenericInternalRow(Array.ofDim[Any](foldableFieldNames.length)) :: Nil // And count the number of foldable fields, we'll use this later to optimize evaluation. - @transient private lazy val constantFields: Int = foldableFields.count(_ != null) + @transient private lazy val constantFields: Int = foldableFieldNames.count(_ != null) - private def getCachedFields(fields: Seq[UTF8String]): Seq[String] = { + private def getFieldNameStrings(fields: Array[UTF8String]): Array[String] = { // Evaluate the field names as String rather than UTF8String to // optimize lookups from the json token, which is also a String. if (constantFields == fields.length) { // Typically the user will provide the field names as foldable expressions // so we can use the cached copy. - foldableFields.map(_.orNull) + foldableFieldNames.map(_.orNull) } else if (constantFields == 0) { // None are foldable so all field names need to be evaluated from the input row. fields.map { f => Option(f.toString).orNull } } else { // If there is a mix of constant and non-constant expressions // prefer the cached copy when available. - foldableFields.zip(fields).map { + foldableFieldNames.zip(fields).map { case (null, f) => Option(f.toString).orNull case (fieldName, _) => fieldName.orNull } } } - private def parseRow(parser: JsonParser, fieldNames: Seq[String]): Seq[InternalRow] = { + private def parseRow(parser: JsonParser, fieldNames: Array[String]): Seq[InternalRow] = { // Only objects are supported. if (parser.nextToken() != JsonToken.START_OBJECT) return nullRow @@ -265,13 +265,13 @@ case class JsonTupleEvaluator(foldableFields: IndexedSeq[Option[String]]) { } } - final def evaluate(json: UTF8String, fields: Seq[UTF8String]): Seq[InternalRow] = { + final def evaluate(json: UTF8String, fieldNames: Array[UTF8String]): Seq[InternalRow] = { if (json == null) return nullRow try { /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having Jackson detect character encoding which could fail for some malformed strings. */ Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) { parser => - parseRow(parser, getCachedFields(fields)) + parseRow(parser, getFieldNameStrings(fieldNames)) } } catch { case _: JsonProcessingException => nullRow diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 36e5710e7b1e8..8728a7333181d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.expressions import java.io._ -import scala.collection.immutable.ArraySeq import scala.util.parsing.combinator.RegexParsers import com.fasterxml.jackson.core._ @@ -463,11 +462,11 @@ case class JsonTuple(children: Seq[Expression]) @transient private lazy val fieldExpressions: Seq[Expression] = children.tail // Eagerly evaluate any foldable the field names. - @transient private lazy val foldableFieldNames: IndexedSeq[Option[String]] = { + @transient private lazy val foldableFieldNames: Array[Option[String]] = { fieldExpressions.map { case expr if expr.foldable => Option(expr.eval()).map(_.asInstanceOf[UTF8String].toString) case _ => null - }.toIndexedSeq + }.toArray } override def elementSchema: StructType = StructType(fieldExpressions.zipWithIndex.map { @@ -498,17 +497,16 @@ case class JsonTuple(children: Seq[Expression]) override def eval(input: InternalRow): IterableOnce[InternalRow] = { val json = jsonExpr.eval(input).asInstanceOf[UTF8String] - val fields = fieldExpressions.map(_.eval(input).asInstanceOf[UTF8String]) - evaluator.evaluate(json, fields) + val filedNames = fieldExpressions.map(_.eval(input).asInstanceOf[UTF8String]).toArray + evaluator.evaluate(json, filedNames) } override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val refEvaluator = ctx.addReferenceObj("evaluator", evaluator) val jsonTerm = ctx.freshName("json") val jsonEval = jsonExpr.genCode(ctx) - val fieldsTerm = ctx.freshName("fields") - val fieldsEval = fieldExpressions.map(_.genCode(ctx)) - val arraySeqClass = classOf[ArraySeq[_]].getName + val filedNamesTerm = ctx.freshName("fieldNames") + val fieldNamesEval = fieldExpressions.map(_.genCode(ctx)) val wrapperClass = classOf[IterableOnce[_]].getName val setJson = s""" @@ -518,27 +516,27 @@ case class JsonTuple(children: Seq[Expression]) | $jsonTerm = ${jsonEval.value}; |} |""".stripMargin - val setFields = fieldsEval.zipWithIndex.map { - case (fieldEval, idx) => + val setFieldNames = fieldNamesEval.zipWithIndex.map { + case (fieldNameEval, idx) => s""" - |if (${fieldEval.isNull}) { - | $fieldsTerm[$idx] = null; + |if (${fieldNameEval.isNull}) { + | $filedNamesTerm[$idx] = null; |} else { - | $fieldsTerm[$idx] = ${fieldEval.value}; + | $filedNamesTerm[$idx] = ${fieldNameEval.value}; |} |""".stripMargin } ev.copy(code = code""" |UTF8String $jsonTerm = null; - |UTF8String[] $fieldsTerm = new UTF8String[${fieldExpressions.length - 1}]; + |UTF8String[] $filedNamesTerm = new UTF8String[${fieldExpressions.length}]; |${jsonEval.code} - |${fieldsEval.map(_.code).mkString("\n")} + |${fieldNamesEval.map(_.code).mkString("\n")} |$setJson - |${setFields.mkString("\n")} + |${setFieldNames.mkString("\n")} |boolean ${ev.isNull} = false; |$wrapperClass ${ev.value} = - | $refEvaluator.evaluate($jsonTerm, new $arraySeqClass.ofRef($fieldsTerm)); + | $refEvaluator.evaluate($jsonTerm, $filedNamesTerm); |""".stripMargin) }