From 1612477bbe7dd0d39152f977fc918096003a408d Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Fri, 28 Aug 2020 18:55:00 +0200 Subject: [PATCH] Add Json stream wrappers --- build.sbt | 1 + .../data/csv/generic/DerivedCellEncoder.scala | 1 - documentation/docs/json/index.md | 64 ++++++++++++++++++- documentation/docs/json/libraries.md | 2 +- .../src/fs2/data/json/circe/package.scala | 10 ++- json/src/fs2/data/json/ast/Tokenizer.scala | 8 +++ .../data/json/internal/ObjectWrapper.scala | 40 ++++++++++++ json/src/fs2/data/json/package.scala | 39 +++++++++++ 8 files changed, 159 insertions(+), 6 deletions(-) create mode 100644 json/src/fs2/data/json/internal/ObjectWrapper.scala diff --git a/build.sbt b/build.sbt index 38fd983ce..6ba18a1fe 100644 --- a/build.sbt +++ b/build.sbt @@ -194,6 +194,7 @@ lazy val documentation = project libraryDependencies ++= List( "com.beachape" %% "enumeratum" % "1.5.15", "org.gnieh" %% "diffson-circe" % "4.0.3", + "io.circe" %% "circe-generic-extras" % circeVersion, "co.fs2" %% "fs2-io" % fs2Version ) ) diff --git a/csv/generic/src/fs2/data/csv/generic/DerivedCellEncoder.scala b/csv/generic/src/fs2/data/csv/generic/DerivedCellEncoder.scala index a556db360..2b84c7813 100644 --- a/csv/generic/src/fs2/data/csv/generic/DerivedCellEncoder.scala +++ b/csv/generic/src/fs2/data/csv/generic/DerivedCellEncoder.scala @@ -16,7 +16,6 @@ package fs2.data.csv package generic -import cats.implicits._ import shapeless._ import shapeless.labelled._ import shapeless.ops.hlist.IsHCons diff --git a/documentation/docs/json/index.md b/documentation/docs/json/index.md index 475f2d1f8..b8cdde0d6 100644 --- a/documentation/docs/json/index.md +++ b/documentation/docs/json/index.md @@ -13,7 +13,7 @@ This page covers the following topics: * Contents {:toc} -### Basic usage +### JSON parsing To create a stream of JSON tokens from an input stream, use the `tokens` pipe in `fs2.data.json` package. This pipe accepts a stream of characters and returns a stream of JSON tokens. This produces a stream of structurally valid tokens forming the JSON documents. @@ -196,9 +196,71 @@ stream.compile.to(collector.pretty()) stream.compile.to(collector.pretty("\t")) ``` +### Generating JSON streams + +Another use case of the library can be to generate a JSON token stream. This comes in handy if you are developing a web service that returns some big JSON in chunks. + +To this end you can use the pipes in `wrap` which allow you to wrap a stream into an object structure. + +For instance imagine you have a store of events which can return a stream of events, and you have a way to serialize the events into JSON. +```scala mdoc +sealed trait Event +case class CreateCounter(name: String, initialValue: Int) extends Event +case class RemoveCounter(name: String) extends Event +case class IncreaseCounter(name: String) extends Event + +object Event { + import _root_.io.circe.Encoder + import _root_.io.circe.generic.extras.Configuration + import _root_.io.circe.generic.extras.semiauto._ + implicit val configuration = Configuration.default.withDiscriminator("type") + + implicit val encoder: Encoder[Event] = deriveConfiguredEncoder +} + +val events = Stream.emits( + List[Event]( + CreateCounter("counter1", 0), + IncreaseCounter("counter1"), + CreateCounter("counter2", 0), + RemoveCounter("counter2") + ) +) +``` + +You can generate a stream of JSON token wrapped in an object at a key named `events` like this: + +```scala mdoc +import fs2.data.json.circe._ + +val wrappedTokens = events.through(tokenize).through(wrap.asArrayInObject(at = "events")) +``` + +You can use the renderers described above to generate the rendered chunks to send to the client. + +```scala mdoc +wrappedTokens.through(render.compact).compile.toList +``` + +You can also add other fields to the the generated object stream. For instance, let's assume we can know how big the stream will be in advance from our event store, we can send this piece of data in the first chunks, so that the client can react accordingly. + +```scala mdoc +import _root_.io.circe.Json + +events + .through(tokenize) + .through(wrap.asArrayInObject(at = "events", in = Map("size" -> Json.fromInt(4)))) + .through(render.compact) + .compile + .toList +``` + +For more pipes and options, please refer to the [API documentation][wrap-api]. + [json-lib-doc]: /documentation/json/libraries [interpolator-doc]: /documentation/json/libraries [builder-api]: /api/fs2/data/json/ast/Builder.html [tokenizer-api]: /api/fs2/data/json/ast/Tokenizer.html +[wrap-api]: /api/fs2/data/json/package$$wrap$.html [monad-error]: https://typelevel.org/cats/api/cats/MonadError.html [collector-doc]: https://oss.sonatype.org/service/local/repositories/releases/archive/co/fs2/fs2-core_2.13/2.3.0/fs2-core_2.13-2.3.0-javadoc.jar/!/fs2/Collector.html diff --git a/documentation/docs/json/libraries.md b/documentation/docs/json/libraries.md index 88783fb00..a726a29bf 100644 --- a/documentation/docs/json/libraries.md +++ b/documentation/docs/json/libraries.md @@ -38,7 +38,7 @@ val filtered = stream.through(filter(sel)) Module: [![Maven Central](https://img.shields.io/maven-central/v/org.gnieh/fs2-data-json-circe_2.13.svg)](https://mvnrepository.com/artifact/org.gnieh/fs2-data-json-circe_2.13) -The `fs2-data-json-circe` module provides `Builder` and `Tokenizer` instances for the [circe][circe] `Json` type. +The `fs2-data-json-circe` module provides `Builder` and `Tokenizer` instances for the [circe][circe] `Json` type and a `Tokenizer` instance for each type `T` having an implicit `Encoder[T]` in scope. For instance both examples from the [core module documentation][json-doc] with circe can be written that way: ```scala mdoc:nest diff --git a/json/circe/src/fs2/data/json/circe/package.scala b/json/circe/src/fs2/data/json/circe/package.scala index 002fe02ee..013ac47d0 100644 --- a/json/circe/src/fs2/data/json/circe/package.scala +++ b/json/circe/src/fs2/data/json/circe/package.scala @@ -13,9 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package fs2 -package data -package json +package fs2.data.json import ast._ @@ -55,4 +53,10 @@ package object circe { ) } + implicit def tokenizerForEncoder[T](implicit encoder: Encoder[T]): Tokenizer[T] = + new Tokenizer[T] { + def tokenize(json: T): NonEmptyList[Token] = + CirceTokenizer.tokenize(json.asJson) + } + } diff --git a/json/src/fs2/data/json/ast/Tokenizer.scala b/json/src/fs2/data/json/ast/Tokenizer.scala index b25f4c1e7..7f2a90981 100644 --- a/json/src/fs2/data/json/ast/Tokenizer.scala +++ b/json/src/fs2/data/json/ast/Tokenizer.scala @@ -27,3 +27,11 @@ trait Tokenizer[Json] { def tokenize(json: Json): NonEmptyList[Token] } + +object Tokenizer { + + implicit object NELTokenizer extends Tokenizer[NonEmptyList[Token]] { + def tokenize(json: NonEmptyList[Token]): NonEmptyList[Token] = json + } + +} diff --git a/json/src/fs2/data/json/internal/ObjectWrapper.scala b/json/src/fs2/data/json/internal/ObjectWrapper.scala new file mode 100644 index 000000000..9d32c3ff5 --- /dev/null +++ b/json/src/fs2/data/json/internal/ObjectWrapper.scala @@ -0,0 +1,40 @@ +/* + * Copyright 2020 Lucas Satabin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package fs2 +package data +package json +package internals + +import ast.Tokenizer + +private[json] object ObjectWrapper { + + def pipe[F[_], Json](at: String, in: Map[String, Json], mapFirst: Boolean, single: Boolean)( + implicit tokenizer: Tokenizer[Json]): Pipe[F, Token, Token] = { s => + val mapStream = Stream.emits(in.flatMap { + case (key, value) => Token.Key(key) :: tokenizer.tokenize(value).toList + }.toSeq) + val wrappedStream = if (single) s else Stream.emit(Token.StartArray) ++ s ++ Stream.emit(Token.EndArray) + val kvStream = + if (mapFirst) { + mapStream ++ Stream.emit(Token.Key(at)) ++ wrappedStream + } else { + Stream.emit(Token.Key(at)) ++ wrappedStream ++ mapStream + } + Stream.emit(Token.StartObject) ++ kvStream ++ Stream.emit(Token.EndObject) + } + +} diff --git a/json/src/fs2/data/json/package.scala b/json/src/fs2/data/json/package.scala index 2eaf0c90b..223769065 100644 --- a/json/src/fs2/data/json/package.scala +++ b/json/src/fs2/data/json/package.scala @@ -94,6 +94,45 @@ package object json { def tokenize[F[_], Json](implicit tokenizer: Tokenizer[Json]): Pipe[F, Json, Token] = _.flatMap(value => Stream.emits(tokenizer.tokenize(value).toList)) + /** A collection of pipes to wrap streams inside objects. */ + object wrap { + + /** Wraps the stream elements as an array inside an object at the given `at` key. + * The object also contains the keys from the `in` map if any. + * If `mapFirst` is true, then the map elements are emitted first, then the stream, otherwise the stream is emitted first. + * + * The resulting token stream is a valid single JSON object stream, iff the original + * stream is a valid stream of JSON values. + */ + def asArrayInObject[F[_], Json]( + at: String, + in: Map[String, Json] = Map.empty[String, Json], + mapFirst: Boolean = true)(implicit tokenizer: Tokenizer[Json]): Pipe[F, Token, Token] = + ObjectWrapper.pipe[F, Json](at, in, mapFirst, false) + + /** Wraps the stream element as a single value inside an object at the given `at` key. + * The object also contains the keys from the `in` map if any. + * If `mapFirst` is true, then the map elements are emitted first, then the stream, otherwise the stream is emitted first. + * + * The resulting token stream is a valid single JSON object stream, iff the original + * stream is a valid stream of a **single** JSON value. + */ + def asValueInObject[F[_], Json]( + at: String, + in: Map[String, Json] = Map.empty[String, Json], + mapFirst: Boolean = true)(implicit tokenizer: Tokenizer[Json]): Pipe[F, Token, Token] = + ObjectWrapper.pipe[F, Json](at, in, mapFirst, true) + + /** Wraps the stream elements as an array at top-level. + * + * The resulting token stream is a valid single JSON array stream, iff the original + * stream is a valid stream of JSON values. + */ + def asTopLevelArray[F[_]]: Pipe[F, Token, Token] = + s => Stream.emit(Token.StartArray) ++ s ++ Stream.emit(Token.EndArray) + + } + /** Json Token stream pipes to render Json values. */ object render {