Skip to content

Commit

Permalink
Add Json stream wrappers
Browse files Browse the repository at this point in the history
  • Loading branch information
satabin committed Sep 30, 2020
1 parent e085953 commit a3d4d6c
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 5 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ lazy val documentation = project
libraryDependencies ++= List(
"com.beachape" %% "enumeratum" % "1.5.15",
"org.gnieh" %% "diffson-circe" % "4.0.3",
"io.circe" %% "circe-generic-extras" % circeVersion,
"co.fs2" %% "fs2-io" % fs2Version
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package fs2.data.csv
package generic

import cats.implicits._
import shapeless._
import shapeless.labelled._
import shapeless.ops.hlist.IsHCons
Expand Down
64 changes: 63 additions & 1 deletion documentation/docs/json/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ This page covers the following topics:
* Contents
{:toc}

### Basic usage
### JSON parsing

To create a stream of JSON tokens from an input stream, use the `tokens` pipe in `fs2.data.json` package. This pipe accepts a stream of characters and returns a stream of JSON tokens. This produces a stream of structurally valid tokens forming the JSON documents.

Expand Down Expand Up @@ -196,9 +196,71 @@ stream.compile.to(collector.pretty())
stream.compile.to(collector.pretty("\t"))
```

### Generating JSON streams

Another use case of the library can be to generate a JSON token stream. This comes in handy if you are developing a web service that returns some big JSON in chunks.

To this end you can use the pipes in `wrap` which allow you to wrap a stream into an object structure.

For instance imagine you have a store of events which can return a stream of events, and you have a way to serialize the events into JSON.
```scala mdoc
sealed trait Event
case class CreateCounter(name: String, initialValue: Int) extends Event
case class RemoveCounter(name: String) extends Event
case class IncreaseCounter(name: String) extends Event

object Event {
import _root_.io.circe.Encoder
import _root_.io.circe.generic.extras.Configuration
import _root_.io.circe.generic.extras.semiauto._
implicit val configuration = Configuration.default.withDiscriminator("type")

implicit val encoder: Encoder[Event] = deriveConfiguredEncoder
}

val events = Stream.emits(
List[Event](
CreateCounter("counter1", 0),
IncreaseCounter("counter1"),
CreateCounter("counter2", 0),
RemoveCounter("counter2")
)
)
```

You can generate a stream of JSON token wrapped in an object at a key named `events` like this:

```scala mdoc
import fs2.data.json.circe._

val wrappedTokens = events.through(tokenize).through(wrap.asArray(at = "events"))
```

You can use the renderers described above to generate the rendered chunks to send to the client.

```scala mdoc
wrappedTokens.through(render.compact).compile.toList
```

You can also add other fields to the the generated object stream. For instance, let's assume we can know how big the stream will be in advance from our event store, we can send this piece of data in the first chunks, so that the client can react accordingly.

```scala mdoc
import _root_.io.circe.Json

events
.through(tokenize)
.through(wrap.asArray(at = "events", in = Map("size" -> Json.fromInt(4))))
.through(render.compact)
.compile
.toList
```

For more pipes and options, please refer to the [API documentation][wrap-api].

[json-lib-doc]: /documentation/json/libraries
[interpolator-doc]: /documentation/json/libraries
[builder-api]: /api/fs2/data/json/ast/Builder.html
[tokenizer-api]: /api/fs2/data/json/ast/Tokenizer.html
[wrap-api]: /api/fs2/data/json/package$$wrap$.html
[monad-error]: https://typelevel.org/cats/api/cats/MonadError.html
[collector-doc]: https://oss.sonatype.org/service/local/repositories/releases/archive/co/fs2/fs2-core_2.13/2.3.0/fs2-core_2.13-2.3.0-javadoc.jar/!/fs2/Collector.html
10 changes: 7 additions & 3 deletions json/circe/src/fs2/data/json/circe/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fs2
package data
package json
package fs2.data.json

import ast._

Expand Down Expand Up @@ -55,4 +53,10 @@ package object circe {
)
}

implicit def tokenizerForEncoder[T](implicit encoder: Encoder[T]): Tokenizer[T] =
new Tokenizer[T] {
def tokenize(json: T): NonEmptyList[Token] =
CirceTokenizer.tokenize(json.asJson)
}

}
8 changes: 8 additions & 0 deletions json/src/fs2/data/json/ast/Tokenizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,11 @@ trait Tokenizer[Json] {
def tokenize(json: Json): NonEmptyList[Token]

}

object Tokenizer {

implicit object NELTokenizer extends Tokenizer[NonEmptyList[Token]] {
def tokenize(json: NonEmptyList[Token]): NonEmptyList[Token] = json
}

}
40 changes: 40 additions & 0 deletions json/src/fs2/data/json/internal/Wrapper.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2020 Lucas Satabin
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fs2
package data
package json
package internals

import ast.Tokenizer

private[json] object Wrapper {

def pipe[F[_], Json](at: String, in: Map[String, Json], mapFirst: Boolean, single: Boolean)(
implicit tokenizer: Tokenizer[Json]): Pipe[F, Token, Token] = { s =>
val mapStream = Stream.emits(in.flatMap {
case (key, value) => Token.Key(key) :: tokenizer.tokenize(value).toList
}.toSeq)
val wrappedStream = if (single) s else Stream.emit(Token.StartArray) ++ s ++ Stream.emit(Token.EndArray)
val kvStream =
if (mapFirst) {
mapStream ++ Stream.emit(Token.Key(at)) ++ wrappedStream
} else {
Stream.emit(Token.Key(at)) ++ wrappedStream ++ mapStream
}
Stream.emit(Token.StartObject) ++ kvStream ++ Stream.emit(Token.EndObject)
}

}
27 changes: 27 additions & 0 deletions json/src/fs2/data/json/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,33 @@ package object json {
def tokenize[F[_], Json](implicit tokenizer: Tokenizer[Json]): Pipe[F, Json, Token] =
_.flatMap(value => Stream.emits(tokenizer.tokenize(value).toList))

/** A collection of pipes to wrap streams inside objects. */
object wrap {

/** Wraps the stream elements as an array inside an object at the given `at` key.
* The object also contains the keys from the `in` map if any.
* If `mapFirst` is true, then the map elements are emitted first, then the stream, otherwise the stream is emitted first.
*
* The resulting token stream is a valid single JSON object stream, iff the original
* stream is a valid stream of JSON values.
*/
def asArray[F[_], Json](at: String, in: Map[String, Json] = Map.empty[String, Json], mapFirst: Boolean = true)(
implicit tokenizer: Tokenizer[Json]): Pipe[F, Token, Token] =
Wrapper.pipe[F, Json](at, in, mapFirst, false)

/** Wraps the stream element as a single value inside an object at the given `at` key.
* The object also contains the keys from the `in` map if any.
* If `mapFirst` is true, then the map elements are emitted first, then the stream, otherwise the stream is emitted first.
*
* The resulting token stream is a valid single JSON object stream, iff the original
* stream is a valid stream of a **single** JSON value.
*/
def asValue[F[_], Json](at: String, in: Map[String, Json] = Map.empty[String, Json], mapFirst: Boolean = true)(
implicit tokenizer: Tokenizer[Json]): Pipe[F, Token, Token] =
Wrapper.pipe[F, Json](at, in, mapFirst, true)

}

/** Json Token stream pipes to render Json values. */
object render {

Expand Down

0 comments on commit a3d4d6c

Please sign in to comment.