From 46a3ebe77b00119d0ac7027301da9071d654a0e7 Mon Sep 17 00:00:00 2001 From: Dmitry Golubets Date: Sun, 14 Mar 2021 19:32:52 +0300 Subject: [PATCH] Faster groupWithin implementation --- .../src/main/scala/fs2/JunctionBuffer.scala | 28 ++++ core/shared/src/main/scala/fs2/Stream.scala | 144 ++++++++++++------ 2 files changed, 126 insertions(+), 46 deletions(-) create mode 100644 core/shared/src/main/scala/fs2/JunctionBuffer.scala diff --git a/core/shared/src/main/scala/fs2/JunctionBuffer.scala b/core/shared/src/main/scala/fs2/JunctionBuffer.scala new file mode 100644 index 0000000000..38ea247f73 --- /dev/null +++ b/core/shared/src/main/scala/fs2/JunctionBuffer.scala @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 + +private case class JunctionBuffer[T]( + data: Vector[T], + endOfSupply: Option[Either[Throwable, Unit]], + endOfDemand: Option[Either[Throwable, Unit]] +) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index b77838d22b..e7735243f8 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -25,19 +25,18 @@ import scala.annotation.tailrec import scala.concurrent.TimeoutException import scala.concurrent.duration._ import java.io.PrintStream - import cats.{Eval => _, _} import cats.data.Ior -import cats.effect.SyncIO +import cats.effect.{Concurrent, SyncIO} import cats.effect.kernel._ import cats.effect.std.{CountDownLatch, Queue, Semaphore} import cats.effect.kernel.implicits._ import cats.effect.Resource.ExitCase import cats.implicits.{catsSyntaxEither => _, _} - import fs2.compat._ import fs2.concurrent._ import fs2.internal._ + import scala.collection.mutable.ArrayBuffer /** A stream producing output of type `O` and which may evaluate `F` effects. @@ -1413,55 +1412,108 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, n: Int, timeout: FiniteDuration )(implicit F: Temporal[F2]): Stream[F2, Chunk[O]] = - this - .covary[F2] - .pull - .timed { timedPull => - def resize(c: Chunk[O], s: Pull[F2, Chunk[O], Unit]): (Pull[F2, Chunk[O], Unit], Chunk[O]) = - if (c.size < n) s -> c - else { - val (unit, rest) = c.splitAt(n) - resize(rest, s >> Pull.output1(unit)) - } + fs2.Stream.eval(Semaphore[F2](n.toLong)).flatMap { demand => + fs2.Stream.eval(Semaphore[F2](0L)).flatMap { supply => + fs2.Stream + .eval( + Ref[F2].of(JunctionBuffer[O](Vector.empty[O], endOfSupply = None, endOfDemand = None)) + ) + .flatMap { buffer => + def enqueue(t: O): F2[Boolean] = + demand.acquire.flatMap { _ => + buffer + .modify { buf => + (buf.copy(buf.data :+ t), buf) + } + .flatMap { buf => + supply.release.map { _ => + buf.endOfDemand.isEmpty + } + } + } - // Invariants: - // acc.size < n, always - // hasTimedOut == true iff a timeout has been received, and acc.isEmpty - def go(acc: Chunk[O], timedPull: Pull.Timed[F2, O], hasTimedOut: Boolean = false) - : Pull[F2, Chunk[O], Unit] = - timedPull.uncons.flatMap { - case None => - Pull.output1(acc).whenA(acc.nonEmpty) - case Some((e, next)) => - def resetTimerAndGo(q: Chunk[O]) = - timedPull.timeout(timeout) >> go(q, next) - - e match { + def acquireSupplyUpToNWithin(n: Long, timeout: FiniteDuration): F2[Long] = + F.race( + F.sleep(timeout), + supply.acquireN(n) + ).flatMap { case Left(_) => - if (acc.nonEmpty) - Pull.output1(acc) >> resetTimerAndGo(Chunk.empty) - else - go(Chunk.empty, next, hasTimedOut = true) - case Right(c) if hasTimedOut => - // it has timed out without reset, so acc is empty - val (toEmit, rest) = - if (c.size < n) Pull.output1(c) -> Chunk.empty - else resize(c, Pull.done) - toEmit >> resetTimerAndGo(rest) - case Right(c) => - val newAcc = acc ++ c - if (newAcc.size < n) - go(newAcc, next) - else { - val (toEmit, rest) = resize(newAcc, Pull.done) - toEmit >> resetTimerAndGo(rest) + supply.acquire.flatMap { _ => + supply.available.flatMap { m => + val k = m.min(n - 1) + supply.tryAcquireN(k).map { + case true => k + 1 + case false => 1 + } + } } + case Right(_) => + F.pure(n) } - } - timedPull.timeout(timeout) >> go(Chunk.empty, timedPull) + def dequeueN(n: Int): F2[Option[Vector[O]]] = + acquireSupplyUpToNWithin(n.toLong, timeout).flatMap { n => + buffer + .modify { buf => + if (buf.data.size >= n) { + val (head, tail) = buf.data.splitAt(n.toInt) + (buf.copy(tail), buf.copy(head)) + } else { + (buf.copy(Vector.empty), buf) + } + } + .flatMap { buf => + demand.releaseN(buf.data.size.toLong).flatMap { _ => + buf.endOfSupply match { + case Some(Left(error)) => + F.raiseError(error) + case Some(Right(_)) if buf.data.isEmpty => + F.pure(None) + case _ => + F.pure(Some(buf.data)) + } + } + } + } + + def endSupply(result: Either[Throwable, Unit]): F2[Unit] = + buffer.update(_.copy(endOfSupply = Some(result))) *> supply.releaseN(Int.MaxValue) + + def endDemand(result: Either[Throwable, Unit]): F2[Unit] = + buffer.update(_.copy(endOfDemand = Some(result))) *> demand.releaseN(Int.MaxValue) + + val enqueueAsync = F.start { + this + .evalMap(enqueue) + .takeWhile(_ == true) + .onFinalizeCase { + case ExitCase.Succeeded => endSupply(Right(())) + case ExitCase.Errored(e) => endSupply(Left(e)) + case ExitCase.Canceled => endSupply(Right(())) + } + .compile + .drain + } + + fs2.Stream + .eval(enqueueAsync) + .flatMap { upstream => + fs2.Stream + .eval(dequeueN(n)) + .repeat + .collectWhile { case Some(data) => Chunk.vector(data) } + .onFinalizeCase { + case ExitCase.Succeeded => + endDemand(Right(())).flatMap(_ => upstream.cancel) + case ExitCase.Errored(e) => + endDemand(Left(e)).flatMap(_ => upstream.cancel) + case ExitCase.Canceled => + endDemand(Right(())).flatMap(_ => upstream.cancel) + } + } + } } - .stream + } /** If `this` terminates with `Stream.raiseError(e)`, invoke `h(e)`. *