Skip to content

Commit

Permalink
Add an accumulator that builds an AST directly
Browse files Browse the repository at this point in the history
Inspired by the `Facade` abstraction from jawn, we can build the AST
directly, without emitting intermediate tokens, which makes it faster.
  • Loading branch information
satabin committed Jun 26, 2023
1 parent 2a5e85f commit 6df4578
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,15 @@ class JsonParserBenchmarks {
.unsafeRunSync()

@Benchmark
def parseJsonFs2DataValues() =
def parseJsonFs2DataParse() =
jsonStream
.through(ast.parse)
.compile
.drain
.unsafeRunSync()

@Benchmark
def parseJsonFs2DataTokensValues() =
jsonStream
.through(tokens)
.through(ast.values)
Expand Down
24 changes: 21 additions & 3 deletions json/src/main/scala/fs2/data/json/ast/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
*/

package fs2
package data.json

import internals.{TokenSelector, ValueParser}
package data
package json

import cats.syntax.all._

import text.{AsCharBuffer, CharLikeChunks}
import internals.{TokenSelector, ValueParser, JsonTokenParser, LegacyTokenParser, BuilderChunkAccumulator, State}

package object ast {

/** Transforms a stream of token into another one. The transformation function `f` is
Expand Down Expand Up @@ -84,6 +86,22 @@ package object ast {
def values[F[_], Json](implicit F: RaiseThrowable[F], builder: Builder[Json]): Pipe[F, Token, Json] =
ValueParser.pipe[F, Json]

/** Parses a stream of characters into a stream of Json values. */
def parse[F[_], T, Json](implicit
F: RaiseThrowable[F],
T: CharLikeChunks[F, T],
builder: Builder[Json]): Pipe[F, T, Json] = { s =>
T match {
case asCharBuffer: AsCharBuffer[F, T] =>
Stream.suspend(
new JsonTokenParser[F, T, Json](s, new BuilderChunkAccumulator(builder))(F, asCharBuffer)
.go_(State.BeforeValue)
.stream)
case _ =>
Stream.suspend(new LegacyTokenParser[F, T, Json](s).parse(new BuilderChunkAccumulator(builder)).stream)
}
}

/** Transforms a stream of Json values into a stream of Json tokens.
*
* This operation is the opposite of `values`.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright 2023 Lucas Satabin
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fs2
package data
package json
package internals

import scala.collection.immutable.{TreeMap, VectorBuilder}
import scala.collection.mutable.ListBuffer

import ast.Builder

/** A chunk accumulator that allows for building a stream of AST values
* for the provided [[ast.Builder Builder]].
*
* The design is inspired by the jawn `Facade` and `FContext`.
*/
private[json] final class BuilderChunkAccumulator[Json](builder: Builder[Json]) extends ChunkAccumulator[Json] {

private[this] final val chunkAcc: VectorBuilder[Json] = new VectorBuilder

private trait Context {
def string(s: String): Unit
def value(v: Json): Unit
def finish(): Json
}

private def toplevelContext(): Context =
new Context {
override def string(s: String): Unit = chunkAcc.addOne(builder.makeString(s))
override def value(v: Json): Unit = chunkAcc.addOne(v)
override def finish(): Json = builder.makeNull
}
private def arrayContext(): Context =
new Context {
private[this] val vs = ListBuffer.empty[Json]
override def string(s: String): Unit = vs.addOne(builder.makeString(s))
override def value(v: Json): Unit = vs.addOne(v)
override def finish(): Json = builder.makeArray(vs)

}
private def objectContext(): Context =
new Context {
private[this] var key: String = null
private[this] var vs = TreeMap.empty[String, Json]
override def string(s: String): Unit = key = s
override def value(v: Json): Unit = {
vs = vs.updated(key, v)
key = null
}
override def finish(): Json = builder.makeObject(vs)
}

private[this] var context: Context = toplevelContext()
private[this] var stack: List[Context] = Nil

override def startObject(): this.type = {
stack = context :: stack
context = objectContext()
this
}

override def key(key: String): this.type = {
context.string(key)
this
}

override def endObject(): this.type =
if (stack.isEmpty) {
chunkAcc.addOne(context.finish())
context = toplevelContext()
this
} else {
val v = context.finish()
context = stack.head
context.value(v)
stack = stack.tail
this
}

override def startArray(): this.type = {
stack = context :: stack
context = arrayContext()
this
}

override def endArray(): this.type =
if (stack.isEmpty) {
chunkAcc.addOne(context.finish())
context = toplevelContext()
this
} else {
val v = context.finish()
context = stack.head
context.value(v)
stack = stack.tail
this
}

override def nullValue(): this.type = {
context.value(builder.makeNull)
this
}

override def trueValue(): this.type = {
context.value(builder.makeTrue)
this
}

override def falseValue(): this.type = {
context.value(builder.makeFalse)
this
}

override def numberValue(value: String): this.type = {
context.value(builder.makeNumber(value))
this
}

override def stringValue(value: String): this.type = {
context.value(builder.makeString(value))
this
}

override def chunk(): Chunk[Json] =
Chunk.vector(chunkAcc.result())

override def flush(): this.type = {
chunkAcc.clear()
this
}

}
16 changes: 16 additions & 0 deletions json/src/main/scala/fs2/data/json/internal/ChunkAccumulator.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2023 Lucas Satabin
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fs2
package data
package json
Expand Down
34 changes: 20 additions & 14 deletions json/src/main/scala/fs2/data/json/internal/JsonTokenParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ import scala.annotation.switch

import TokenParser._

private class JsonTokenParser[F[_], T, Res](s: Stream[F, T], private[this] final val chunkAcc: ChunkAccumulator[Res])(
implicit
F: RaiseThrowable[F],
T: AsCharBuffer[F, T]) {
private[json] class JsonTokenParser[F[_], T, Res](
s: Stream[F, T],
private[this] final val chunkAcc: ChunkAccumulator[Res])(implicit F: RaiseThrowable[F], T: AsCharBuffer[F, T]) {
private[this] var context = T.create(s)

private[this] def emitChunk[T]() =
Expand Down Expand Up @@ -224,7 +223,10 @@ private class JsonTokenParser[F[_], T, Res](s: Stream[F, T], private[this] final
}
}

private final def keyword_(expected: String, eidx: Int, elen: Int, accumulate: () => ChunkAccumulator[Res]): Pull[F, Res, Unit] = {
private final def keyword_(expected: String,
eidx: Int,
elen: Int,
accumulate: () => ChunkAccumulator[Res]): Pull[F, Res, Unit] = {
if (T.needsPull(context)) {
emitChunk() >> T.pullNext(context).flatMap {
case Some(context) =>
Expand All @@ -250,13 +252,13 @@ private class JsonTokenParser[F[_], T, Res](s: Stream[F, T], private[this] final
}
}

private final def value_(state: Int)(implicit F: RaiseThrowable[F]): Pull[F, Res, Unit] =
private final def value_()(implicit F: RaiseThrowable[F]): Pull[F, Res, Unit] =
if (T.needsPull(context)) {
emitChunk() >> T.pullNext(context).flatMap {
case Some(context) =>
this.context = context
chunkAcc.flush()
value_(state)
value_()
case None => Pull.raiseError[F](new JsonException("unexpected end of input"))
}
} else {
Expand Down Expand Up @@ -292,9 +294,13 @@ private class JsonTokenParser[F[_], T, Res](s: Stream[F, T], private[this] final
chunkAcc.flush()
go_(state)
case None =>
this.context = T.create(Stream.empty)
chunkAcc.flush()
Pull.done
if (state == State.BeforeValue) {
this.context = T.create(Stream.empty)
chunkAcc.flush()
Pull.done
} else {
Pull.raiseError(JsonException("unexpected end of input"))
}
}
} else {
val c = T.current(context)
Expand All @@ -305,7 +311,7 @@ private class JsonTokenParser[F[_], T, Res](s: Stream[F, T], private[this] final
case _ =>
(state: @switch) match {
case State.BeforeValue =>
value_(state) >> go_(State.BeforeValue)
value_() >> go_(State.BeforeValue)
case State.BeforeObjectKey =>
(c: @switch) match {
case '"' =>
Expand Down Expand Up @@ -337,7 +343,7 @@ private class JsonTokenParser[F[_], T, Res](s: Stream[F, T], private[this] final
emitChunk() >> Pull.raiseError[F](new JsonException(s"unexpected '$c' after object key"))
}
case State.BeforeObjectValue =>
value_(State.AfterObjectValue) >> go_(State.AfterObjectValue)
value_() >> go_(State.AfterObjectValue)
case State.AfterObjectValue =>
(c: @switch) match {
case ',' =>
Expand All @@ -351,15 +357,15 @@ private class JsonTokenParser[F[_], T, Res](s: Stream[F, T], private[this] final
emitChunk() >> Pull.raiseError[F](new JsonException(s"unexpected '$c' after object value"))
}
case State.ExpectArrayValue =>
value_(State.AfterArrayValue) >> go_(State.AfterArrayValue)
value_() >> go_(State.AfterArrayValue)
case State.BeforeArrayValue =>
(c: @switch) match {
case ']' =>
T.advance(context)
chunkAcc.endArray()
Pull.done
case _ =>
value_(State.AfterArrayValue) >> go_(State.AfterArrayValue)
value_() >> go_(State.AfterArrayValue)
}
case State.AfterArrayValue =>
(c: @switch) match {
Expand Down
2 changes: 1 addition & 1 deletion json/src/main/scala/fs2/data/json/internal/State.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package fs2.data.json.internals

private[internals] object State {
private[json] object State {
final val BeforeValue = 0
final val BeforeObjectKey = 1
final val ExpectObjectKey = 2
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,29 @@
/*
* Copyright 2023 Lucas Satabin
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fs2
package data
package json
package internals

import scala.collection.mutable.ListBuffer
import scala.collection.immutable.VectorBuilder

private[json] final class TokenChunkAccumulator extends ChunkAccumulator[Token] {

private[this] final val chunkAcc: ListBuffer[Token] = new ListBuffer
private[this] final val chunkAcc: VectorBuilder[Token] = new VectorBuilder

override def startObject(): this.type = {
chunkAcc.addOne(Token.StartObject)
Expand Down Expand Up @@ -60,7 +76,7 @@ private[json] final class TokenChunkAccumulator extends ChunkAccumulator[Token]
}

override def chunk(): Chunk[Token] =
Chunk.seq(chunkAcc.result())
Chunk.vector(chunkAcc.result())

override def flush(): this.type = {
chunkAcc.clear()
Expand Down
Loading

0 comments on commit 6df4578

Please sign in to comment.