Skip to content

Commit

Permalink
Faster groupWithin implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
DGolubets committed Mar 14, 2021
1 parent 8711623 commit 46a3ebe
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 46 deletions.
28 changes: 28 additions & 0 deletions core/shared/src/main/scala/fs2/JunctionBuffer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package fs2

private case class JunctionBuffer[T](
data: Vector[T],
endOfSupply: Option[Either[Throwable, Unit]],
endOfDemand: Option[Either[Throwable, Unit]]
)
144 changes: 98 additions & 46 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,18 @@ import scala.annotation.tailrec
import scala.concurrent.TimeoutException
import scala.concurrent.duration._
import java.io.PrintStream

import cats.{Eval => _, _}
import cats.data.Ior
import cats.effect.SyncIO
import cats.effect.{Concurrent, SyncIO}
import cats.effect.kernel._
import cats.effect.std.{CountDownLatch, Queue, Semaphore}
import cats.effect.kernel.implicits._
import cats.effect.Resource.ExitCase
import cats.implicits.{catsSyntaxEither => _, _}

import fs2.compat._
import fs2.concurrent._
import fs2.internal._

import scala.collection.mutable.ArrayBuffer

/** A stream producing output of type `O` and which may evaluate `F` effects.
Expand Down Expand Up @@ -1413,55 +1412,108 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
n: Int,
timeout: FiniteDuration
)(implicit F: Temporal[F2]): Stream[F2, Chunk[O]] =
this
.covary[F2]
.pull
.timed { timedPull =>
def resize(c: Chunk[O], s: Pull[F2, Chunk[O], Unit]): (Pull[F2, Chunk[O], Unit], Chunk[O]) =
if (c.size < n) s -> c
else {
val (unit, rest) = c.splitAt(n)
resize(rest, s >> Pull.output1(unit))
}
fs2.Stream.eval(Semaphore[F2](n.toLong)).flatMap { demand =>
fs2.Stream.eval(Semaphore[F2](0L)).flatMap { supply =>
fs2.Stream
.eval(
Ref[F2].of(JunctionBuffer[O](Vector.empty[O], endOfSupply = None, endOfDemand = None))
)
.flatMap { buffer =>
def enqueue(t: O): F2[Boolean] =
demand.acquire.flatMap { _ =>
buffer
.modify { buf =>
(buf.copy(buf.data :+ t), buf)
}
.flatMap { buf =>
supply.release.map { _ =>
buf.endOfDemand.isEmpty
}
}
}

// Invariants:
// acc.size < n, always
// hasTimedOut == true iff a timeout has been received, and acc.isEmpty
def go(acc: Chunk[O], timedPull: Pull.Timed[F2, O], hasTimedOut: Boolean = false)
: Pull[F2, Chunk[O], Unit] =
timedPull.uncons.flatMap {
case None =>
Pull.output1(acc).whenA(acc.nonEmpty)
case Some((e, next)) =>
def resetTimerAndGo(q: Chunk[O]) =
timedPull.timeout(timeout) >> go(q, next)

e match {
def acquireSupplyUpToNWithin(n: Long, timeout: FiniteDuration): F2[Long] =
F.race(
F.sleep(timeout),
supply.acquireN(n)
).flatMap {
case Left(_) =>
if (acc.nonEmpty)
Pull.output1(acc) >> resetTimerAndGo(Chunk.empty)
else
go(Chunk.empty, next, hasTimedOut = true)
case Right(c) if hasTimedOut =>
// it has timed out without reset, so acc is empty
val (toEmit, rest) =
if (c.size < n) Pull.output1(c) -> Chunk.empty
else resize(c, Pull.done)
toEmit >> resetTimerAndGo(rest)
case Right(c) =>
val newAcc = acc ++ c
if (newAcc.size < n)
go(newAcc, next)
else {
val (toEmit, rest) = resize(newAcc, Pull.done)
toEmit >> resetTimerAndGo(rest)
supply.acquire.flatMap { _ =>
supply.available.flatMap { m =>
val k = m.min(n - 1)
supply.tryAcquireN(k).map {
case true => k + 1
case false => 1
}
}
}
case Right(_) =>
F.pure(n)
}
}

timedPull.timeout(timeout) >> go(Chunk.empty, timedPull)
def dequeueN(n: Int): F2[Option[Vector[O]]] =
acquireSupplyUpToNWithin(n.toLong, timeout).flatMap { n =>
buffer
.modify { buf =>
if (buf.data.size >= n) {
val (head, tail) = buf.data.splitAt(n.toInt)
(buf.copy(tail), buf.copy(head))
} else {
(buf.copy(Vector.empty), buf)
}
}
.flatMap { buf =>
demand.releaseN(buf.data.size.toLong).flatMap { _ =>
buf.endOfSupply match {
case Some(Left(error)) =>
F.raiseError(error)
case Some(Right(_)) if buf.data.isEmpty =>
F.pure(None)
case _ =>
F.pure(Some(buf.data))
}
}
}
}

def endSupply(result: Either[Throwable, Unit]): F2[Unit] =
buffer.update(_.copy(endOfSupply = Some(result))) *> supply.releaseN(Int.MaxValue)

def endDemand(result: Either[Throwable, Unit]): F2[Unit] =
buffer.update(_.copy(endOfDemand = Some(result))) *> demand.releaseN(Int.MaxValue)

val enqueueAsync = F.start {
this
.evalMap(enqueue)
.takeWhile(_ == true)
.onFinalizeCase {
case ExitCase.Succeeded => endSupply(Right(()))
case ExitCase.Errored(e) => endSupply(Left(e))
case ExitCase.Canceled => endSupply(Right(()))
}
.compile
.drain
}

fs2.Stream
.eval(enqueueAsync)
.flatMap { upstream =>
fs2.Stream
.eval(dequeueN(n))
.repeat
.collectWhile { case Some(data) => Chunk.vector(data) }
.onFinalizeCase {
case ExitCase.Succeeded =>
endDemand(Right(())).flatMap(_ => upstream.cancel)
case ExitCase.Errored(e) =>
endDemand(Left(e)).flatMap(_ => upstream.cancel)
case ExitCase.Canceled =>
endDemand(Right(())).flatMap(_ => upstream.cancel)
}
}
}
}
.stream
}

/** If `this` terminates with `Stream.raiseError(e)`, invoke `h(e)`.
*
Expand Down

0 comments on commit 46a3ebe

Please sign in to comment.