Skip to content

Commit

Permalink
Add more Json transformation pipes
Browse files Browse the repository at this point in the history
One of them allows for filtering values out of the stream, which
includes to be cautious to be sure the emitted stream is still valid. We
have to make sure that the key was not emitted if the value is going to
be dropped. This introduces a way to delay the emission of a key until
we know for sure the value is kept.

Fixes #47
  • Loading branch information
satabin committed Aug 7, 2020
1 parent e5267fb commit 168a11c
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 60 deletions.
2 changes: 2 additions & 0 deletions documentation/docs/json/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ val transformed = stream.through(transform[Fallible, Json](selector, json => Som
```
For concrete examples of provided `Builder`s and `Tokenizer`s, please refer to [the JSON library binding modules documentation][json-lib-doc]

Sometimes you would like to delete some Json values from the input stream, based o some predicate at a given path, and keep the rest untouched. In this case, you can use the `transformOpt` pipe, and return `None` for values you want to remove from the stream.

### JSON Renderers

Once you got a JSON token stream, selected and transformed what you needed in it, you can then write the resulting token stream to some storage. This can be achieved using renderers.
Expand Down
34 changes: 23 additions & 11 deletions documentation/docs/json/libraries.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ This page covers the following libraries:

Examples on this page use the following input:

```scala mdoc:silent
import cats.effect._

import fs2.Stream
```scala mdoc
import fs2.{Stream, Fallible}
import fs2.data.json._
import fs2.data.json.selector._

val input = """{
| "field1": 0,
Expand All @@ -28,11 +27,11 @@ val input = """{
| "field3": []
|}""".stripMargin

val stream = Stream.emits(input).through(tokens[IO])
val stream = Stream.emits(input).through(tokens[Fallible])

val selector = ".field3.[]".parseSelector[IO].unsafeRunSync()
val sel = root.field("field3").iterate.compile

val filtered = stream.through(filter(selector))
val filtered = stream.through(filter(sel))
```

### Circe
Expand All @@ -46,11 +45,24 @@ For instance both examples from the [core module documentation][json-doc] with c
import fs2.data.json.circe._
import io.circe._

val asts = stream.through(values[IO, Json])
asts.compile.toList.unsafeRunSync()
val asts = stream.through(values[Fallible, Json])
asts.compile.toList

val transformed = stream.through(transform[Fallible, Json](sel, json => Json.obj("test" -> json)))
transformed.compile.to(collector.pretty())
```

If you want to only keep `field1` if it is greater than `1`, you can use the `transformOpt` pipe for this.

```scala mdoc:nest
import fs2.data.json.circe._
import io.circe._
import cats.implicits._

val f1 = root.field("field1").compile

val transformed = stream.through(transform[IO, Json](selector, json => Json.obj("test" -> json)))
transformed.through(values[IO, Json]).compile.toList.unsafeRunSync()
val transformed = stream.through(transformOpt[Fallible, Json](f1, json => json.as[Int].toOption.filter(_ > 1).as(json)))
transformed.compile.to(collector.pretty())
```

[json-doc]: /documentation/json/
Expand Down
183 changes: 138 additions & 45 deletions json/src/fs2/data/json/internal/TokenSelector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,37 @@ private[json] object TokenSelector {
private def transformValue[F[_], Json](chunk: Chunk[Token],
idx: Int,
rest: Stream[F, Token],
f: Json => Json,
chunkAcc: List[Token])(
f: Json => Option[Json],
chunkAcc: List[Token],
key: Option[String])(
implicit F: RaiseThrowable[F],
builder: Builder[Json],
tokenizer: Tokenizer[Json]): Pull[F, Token, Result[F, Token, List[Token]]] =
ValueParser.pullValue(chunk, idx, rest).flatMap {
case Some((chunk, idx, rest, json)) =>
Pull.pure(Some((chunk, idx, rest, tokenizer.tokenize(f(json)).toList reverse_::: chunkAcc)))
val chunkAcc1 = f(json) match {
case Some(json) => tokenizer.tokenize(json).toList reverse_::: key.map(Token.Key(_)).toList ::: chunkAcc
case None => chunkAcc
}
Pull.pure(Some((chunk, idx, rest, chunkAcc1)))
case None => Pull.pure(None)
}

private def transformValueF[F[_], Json](chunk: Chunk[Token],
idx: Int,
rest: Stream[F, Token],
f: Json => F[Json],
chunkAcc: List[Token],
key: Option[String])(
implicit F: RaiseThrowable[F],
builder: Builder[Json],
tokenizer: Tokenizer[Json]): Pull[F, Token, Result[F, Token, List[Token]]] =
ValueParser.pullValue(chunk, idx, rest).flatMap {
case Some((chunk, idx, rest, json)) =>
Pull
.eval(f(json))
.handleErrorWith(t => emitChunk(chunkAcc) >> Pull.raiseError(t))
.map(transformed => Some((chunk, idx, rest, tokenizer.tokenize(transformed).toList reverse_::: chunkAcc)))
case None => Pull.pure(None)
}

Expand All @@ -42,23 +65,29 @@ private[json] object TokenSelector {
rest: Stream[F, Token],
emitNonSelected: Boolean,
wrap: Boolean,
emitEarly: Boolean,
toSelect: String => Boolean,
mandatories: Set[String],
onSelect: (Chunk[Token], Int, Stream[F, Token], List[Token]) => Pull[F, Token, Result[F, Token, List[Token]]],
onSelect: (Chunk[Token],
Int,
Stream[F, Token],
List[Token],
Option[String]) => Pull[F, Token, Result[F, Token, List[Token]]],
chunkAcc: List[Token])(implicit F: RaiseThrowable[F]): Pull[F, Token, Result[F, Token, List[Token]]] =
if (idx >= chunk.size) {
emitChunk(chunkAcc) >> rest.pull.uncons.flatMap {
case Some((hd, tl)) => selectName(hd, 0, tl, emitNonSelected, wrap, toSelect, mandatories, onSelect, Nil)
case None => Pull.raiseError[F](new JsonException("unexpected end of input"))
case Some((hd, tl)) =>
selectName(hd, 0, tl, emitNonSelected, wrap, emitEarly, toSelect, mandatories, onSelect, Nil)
case None => Pull.raiseError[F](new JsonException("unexpected end of input"))
}
} else
chunk(idx) match {
case key @ Token.Key(name) =>
val action =
if (toSelect(name)) {
// name is to be selected, then continue
val chunkAcc1 = if (wrap) key :: chunkAcc else chunkAcc
onSelect(chunk, idx + 1, rest, chunkAcc1)
val chunkAcc1 = if (wrap && emitEarly) key :: chunkAcc else chunkAcc
onSelect(chunk, idx + 1, rest, chunkAcc1, Some(name))
} else if (emitNonSelected) {
val chunkAcc1 = if (wrap) key :: chunkAcc else chunkAcc
emitValue(chunk, idx + 1, rest, 0, chunkAcc1)
Expand All @@ -68,7 +97,16 @@ private[json] object TokenSelector {
}
action.flatMap {
case Some((chunk, idx, rest, chunkAcc)) =>
selectName(chunk, idx, rest, emitNonSelected, wrap, toSelect, mandatories - name, onSelect, chunkAcc)
selectName(chunk,
idx,
rest,
emitNonSelected,
wrap,
emitEarly,
toSelect,
mandatories - name,
onSelect,
chunkAcc)
case None =>
Pull.raiseError[F](new JsonException("unexpected end of input"))
}
Expand All @@ -95,7 +133,11 @@ private[json] object TokenSelector {
wrap: Boolean,
arrIdx: Int,
toSelect: Int => Boolean,
onSelect: (Chunk[Token], Int, Stream[F, Token], List[Token]) => Pull[F, Token, Result[F, Token, List[Token]]],
onSelect: (Chunk[Token],
Int,
Stream[F, Token],
List[Token],
Option[String]) => Pull[F, Token, Result[F, Token, List[Token]]],
chunkAcc: List[Token])(implicit F: RaiseThrowable[F]): Pull[F, Token, Result[F, Token, List[Token]]] =
if (idx >= chunk.size) {
emitChunk(chunkAcc) >> rest.pull.uncons.flatMap {
Expand All @@ -112,7 +154,7 @@ private[json] object TokenSelector {
val action =
if (toSelect(arrIdx))
// index is to be selected, then continue
onSelect(chunk, idx, rest, chunkAcc)
onSelect(chunk, idx, rest, chunkAcc, None)
else if (emitNonSelected)
emitValue(chunk, idx, rest, 0, chunkAcc)
else
Expand All @@ -132,23 +174,38 @@ private[json] object TokenSelector {
selector: Selector,
emitNonSelected: Boolean,
wrap: Boolean,
onSelect: (Chunk[Token], Int, Stream[F, Token], List[Token]) => Pull[F, Token, Result[F, Token, List[Token]]],
chunkAcc: List[Token])(implicit F: RaiseThrowable[F]): Pull[F, Token, Result[F, Token, List[Token]]] =
emitEarly: Boolean,
onSelect: (Chunk[Token],
Int,
Stream[F, Token],
List[Token],
Option[String]) => Pull[F, Token, Result[F, Token, List[Token]]],
chunkAcc: List[Token],
key: Option[String])(implicit F: RaiseThrowable[F]): Pull[F, Token, Result[F, Token, List[Token]]] =
if (idx >= chunk.size) {
emitChunk(chunkAcc) >> rest.pull.uncons.flatMap {
case Some((hd, tl)) => filterChunk(hd, 0, tl, selector, emitNonSelected, wrap, onSelect, Nil)
case Some((hd, tl)) => filterChunk(hd, 0, tl, selector, emitNonSelected, wrap, emitEarly, onSelect, Nil, key)
case None => Pull.pure(None)
}
} else
selector match {
case Selector.ThisSelector =>
onSelect(chunk, idx, rest, chunkAcc)
onSelect(chunk, idx, rest, chunkAcc, key)
case Selector.NameSelector(pred, strict, mandatory) =>
chunk(idx) match {
case Token.StartObject =>
// enter the object context and go down to the name
val chunkAcc1 = if (wrap) Token.StartObject :: chunkAcc else chunkAcc
selectName(chunk, idx + 1, rest, emitNonSelected, wrap, pred, pred.values, onSelect, chunkAcc1)
selectName(chunk,
idx + 1,
rest,
emitNonSelected,
wrap,
emitEarly,
pred,
if (mandatory) pred.values else Set.empty,
onSelect,
chunkAcc1)
case token =>
if (strict)
emitChunk(chunkAcc) >> Pull.raiseError[F](new JsonException(s"cannot ${token.kind} number with string"))
Expand Down Expand Up @@ -182,7 +239,16 @@ private[json] object TokenSelector {
case Token.StartObject =>
// enter the object context and go down to the name
val chunkAcc1 = if (wrap) Token.StartObject :: chunkAcc else chunkAcc
selectName(chunk, idx + 1, rest, emitNonSelected, wrap, NamePredicate.All, Set.empty, onSelect, chunkAcc1)
selectName(chunk,
idx + 1,
rest,
emitNonSelected,
wrap,
emitEarly,
NamePredicate.All,
Set.empty,
onSelect,
chunkAcc1)
case token =>
if (strict)
emitChunk(chunkAcc) >> Pull.raiseError[F](new JsonException(s"cannot iterate over ${token.kind}"))
Expand All @@ -193,46 +259,73 @@ private[json] object TokenSelector {
skipValue(chunk, idx, rest, 0, chunkAcc)
}
case Selector.PipeSelector(left, right) =>
filterChunk(chunk,
idx,
rest,
left,
emitNonSelected,
wrap,
filterChunk(_, _, _, right, emitNonSelected, wrap, onSelect, _),
chunkAcc)
filterChunk(
chunk,
idx,
rest,
left,
emitNonSelected,
wrap,
true,
filterChunk(_, _, _, right, emitNonSelected, wrap, emitEarly, onSelect, _, _),
chunkAcc,
key
)
}

private def go[F[_]](
chunk: Chunk[Token],
idx: Int,
rest: Stream[F, Token],
selector: Selector,
emitNonSelected: Boolean,
wrap: Boolean,
onSelect: (Chunk[Token], Int, Stream[F, Token], List[Token]) => Pull[F, Token, Result[F, Token, List[Token]]],
chunkAcc: List[Token])(implicit F: RaiseThrowable[F]): Pull[F, Token, Unit] =
filterChunk(chunk, idx, rest, selector, emitNonSelected, wrap, onSelect, chunkAcc).flatMap {
private def go[F[_]](chunk: Chunk[Token],
idx: Int,
rest: Stream[F, Token],
selector: Selector,
emitNonSelected: Boolean,
wrap: Boolean,
emitEarly: Boolean,
onSelect: (Chunk[Token],
Int,
Stream[F, Token],
List[Token],
Option[String]) => Pull[F, Token, Result[F, Token, List[Token]]],
chunkAcc: List[Token])(implicit F: RaiseThrowable[F]): Pull[F, Token, Unit] =
filterChunk(chunk, idx, rest, selector, emitNonSelected, wrap, emitEarly, onSelect, chunkAcc, None).flatMap {
case Some((chunk, idx, rest, chunkAcc)) =>
go(chunk, idx, rest, selector, emitNonSelected, wrap, onSelect, chunkAcc)
go(chunk, idx, rest, selector, emitNonSelected, wrap, emitEarly, onSelect, chunkAcc)
case None =>
Pull.done
}

def pipe[F[_]](selector: Selector, wrap: Boolean)(implicit F: RaiseThrowable[F]): Pipe[F, Token, Token] =
s => go(Chunk.empty, 0, s, selector, false, wrap, emit[F], Nil).stream
s => go(Chunk.empty, 0, s, selector, false, wrap, true, emit[F], Nil).stream

def transformPipe[F[_], Json: Builder: Tokenizer](selector: Selector, f: Json => Option[Json])(
implicit F: RaiseThrowable[F]): Pipe[F, Token, Token] =
s => go(Chunk.empty, 0, s, selector, true, true, false, transform[F, Json](f), Nil).stream

def transformPipe[F[_], Json: Builder: Tokenizer](selector: Selector, f: Json => Json)(
def transformPipeF[F[_], Json: Builder: Tokenizer](selector: Selector, f: Json => F[Json])(
implicit F: RaiseThrowable[F]): Pipe[F, Token, Token] =
s => go(Chunk.empty, 0, s, selector, true, true, transform[F, Json](f), Nil).stream
s => go(Chunk.empty, 0, s, selector, true, true, true, transformF[F, Json](f), Nil).stream

private def emit[F[_]](chunk: Chunk[Token], idx: Int, rest: Stream[F, Token], chunkAcc: List[Token])(
implicit F: RaiseThrowable[F]): Pull[F, Token, Result[F, Token, List[Token]]] =
private def emit[F[_]](
chunk: Chunk[Token],
idx: Int,
rest: Stream[F, Token],
chunkAcc: List[Token],
key: Option[String])(implicit F: RaiseThrowable[F]): Pull[F, Token, Result[F, Token, List[Token]]] =
emitValue[F](chunk, idx, rest, 0, chunkAcc)

private def transform[F[_], Json: Builder: Tokenizer](
f: Json => Json)(chunk: Chunk[Token], idx: Int, rest: Stream[F, Token], chunkAcc: List[Token])(
implicit F: RaiseThrowable[F]): Pull[F, Token, Result[F, Token, List[Token]]] =
transformValue(chunk, idx, rest, f, chunkAcc)
private def transform[F[_], Json: Builder: Tokenizer](f: Json => Option[Json])(
chunk: Chunk[Token],
idx: Int,
rest: Stream[F, Token],
chunkAcc: List[Token],
key: Option[String])(implicit F: RaiseThrowable[F]): Pull[F, Token, Result[F, Token, List[Token]]] =
transformValue(chunk, idx, rest, f, chunkAcc, key)

private def transformF[F[_], Json: Builder: Tokenizer](f: Json => F[Json])(
chunk: Chunk[Token],
idx: Int,
rest: Stream[F, Token],
chunkAcc: List[Token],
key: Option[String])(implicit F: RaiseThrowable[F]): Pull[F, Token, Result[F, Token, List[Token]]] =
transformValueF(chunk, idx, rest, f, chunkAcc, key)

}
28 changes: 28 additions & 0 deletions json/src/fs2/data/json/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,36 @@ package object json {
def transform[F[_], Json](selector: Selector, f: Json => Json)(implicit F: RaiseThrowable[F],
builder: Builder[Json],
tokenizer: Tokenizer[Json]): Pipe[F, Token, Token] =
TokenSelector.transformPipe[F, Json](selector, f.andThen(Some(_)))

/** Transforms a stream of token into another one. The transformation function `f` is
* called on every selected value from upstream, and the resulting value replaces it.
* If the function returns `None`, then the entire value is dropped (and the object key it
* is located at, if any).
* The rest of the stream is left unchanged.
*
* This operator locally creates Json AST values using the [[Builder]], and
* returns tokens as emitted by the [[Tokenizer]] on the resulting value.
*/
def transformOpt[F[_], Json](selector: Selector, f: Json => Option[Json])(implicit F: RaiseThrowable[F],
builder: Builder[Json],
tokenizer: Tokenizer[Json]): Pipe[F, Token, Token] =
TokenSelector.transformPipe[F, Json](selector, f)

/** Transforms a stream of token into another one. The transformation function `f` is
* called on every selected value from upstream, and the resulting value replaces it.
* The rest of the stream is left unchanged. The operation can fail, in case the returned
* `F` is failed at one step.
*
* This operator locally creates Json AST values using the [[Builder]], and
* returns tokens as emitted by the [[Tokenizer]] on the resulting value.
*/
def transformF[F[_], Json](selector: Selector, f: Json => F[Json])(
implicit F: RaiseThrowable[F],
builder: Builder[Json],
tokenizer: Tokenizer[Json]): Pipe[F, Token, Token] =
TokenSelector.transformPipeF[F, Json](selector, f)

/** Transforms a stream of Json tokens into a stream of json values.
*/
def values[F[_], Json](implicit F: RaiseThrowable[F], builder: Builder[Json]): Pipe[F, Token, Json] =
Expand Down
Loading

0 comments on commit 168a11c

Please sign in to comment.