Skip to content

Commit

Permalink
Put unconsumed chunk back into stream
Browse files Browse the repository at this point in the history
  • Loading branch information
satabin committed Jan 2, 2022
1 parent e27d2ba commit dbbcb90
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 15 deletions.
5 changes: 3 additions & 2 deletions transducers/shared/src/main/scala/fs2/data/stt/STT.scala
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,14 @@ class STT[F[_], T[_, _], In, Out, StackElem](initial: Int,
lastKnownFinal match {
case Some((state, env)) =>
// we reached a final state before, let's emit what should have been emitted
// there, and push the input buffer back to the stream
// there, and push the input buffer back to the stream,
// as well as unconsumed current chunk
Pull
.eval(eval0(env, finalStates(state)))
.flatMap(outs =>
go(Chunk.chain(accSinceLastFinal),
0,
rest,
Stream.chunk(chunk.drop(idx)) ++ rest,
initial,
Nil,
Env.create(variables),
Expand Down
16 changes: 16 additions & 0 deletions transducers/shared/src/main/scala/fs2/data/stt/package.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2021 Lucas Satabin
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fs2.data

package object stt {
Expand Down
58 changes: 45 additions & 13 deletions transducers/shared/src/test/scala/fs2/data/stt/STTSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package stt

import weaver._
import cats.Show
import cats.effect.IO

sealed trait Tree
object Tree {
Expand Down Expand Up @@ -75,12 +76,12 @@ object STTSpec extends SimpleIOSuite {
)
val finalStates = Map(0 -> Expr0.Var[Tree](x))
val reverse =
new STT(0,
internalTransition,
callTransition,
returnTransition,
finalStates,
Map("x" -> Type.Type0, "y" -> Type.Type0, "z" -> Type.Type1))
new STT[IO, Map, Tree, Tree, Tree](0,
internalTransition,
callTransition,
returnTransition,
finalStates,
Map("x" -> Type.Type0, "y" -> Type.Type0, "z" -> Type.Type1))

Stream(openA, openB, leaf0, closeB, leaf1, closeA, openB, closeB)
.rechunkRandomly()
Expand All @@ -105,12 +106,12 @@ object STTSpec extends SimpleIOSuite {
}
val finalStates = Map(0 -> Expr0.Var[Tree](x))
val reverse =
new STT(0,
internalTransition,
callTransition,
returnTransition,
finalStates,
Map("x" -> Type.Type0, "y" -> Type.Type0, "z" -> Type.Type1))
new STT[IO, PartialFunction, Tree, Tree, Tree](0,
internalTransition,
callTransition,
returnTransition,
finalStates,
Map("x" -> Type.Type0, "y" -> Type.Type0, "z" -> Type.Type1))

Stream(openA, openB, leaf0, closeB, leaf1, closeA, openB, closeB)
.rechunkRandomly()
Expand All @@ -120,6 +121,37 @@ object STTSpec extends SimpleIOSuite {
.map(result => expect(result == List(openB, closeB, openA, leaf1, openB, leaf0, closeB, closeA)))
}

test("")
test("emit until error") {
import Assignment._
val internalTransition =
Map[(Int, Tree), InternalTransition[Tree]]()
val callTransition = Map[(Int, Tree), CallTransition[Tree, Unit]](
(0, openA) ->
CallTransition(0, (), List(Char(y, openA), Append(x, y))))
val returnTransition = Map[(Int, Unit, Tree), ReturnTransition[Tree]](
(0, (), closeA) ->
ReturnTransition(0, List(Char(y, closeA), Append(x, y), Prepend(x, xp)))
)
val finalStates = Map(0 -> Expr0.Var[Tree](x))
val onlyA = new STT[IO, Map, Tree, Tree, Unit](0,
internalTransition,
callTransition,
returnTransition,
finalStates,
Map("x" -> Type.Type0, "y" -> Type.Type0))
Stream(openA, openA, closeA, closeA, openB, closeB)
.through(onlyA)
.attempt
.compile
.toList
.map { result =>
expect(
result == List(Right(openA),
Right(openA),
Right(closeA),
Right(closeA),
Left(STTException("malformed input, prefix <b is not accepted"))))
}
}

}

0 comments on commit dbbcb90

Please sign in to comment.