From 492626ce2a0c047e8ec3723954a311796a9a7a37 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 4 Jul 2021 22:52:11 +0000 Subject: [PATCH 1/2] Relax Queue-related signatures --- core/shared/src/main/scala/fs2/Stream.scala | 55 +++++++++++++++++---- 1 file changed, 46 insertions(+), 9 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 7fcf3b50e7..a2eae61508 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -30,7 +30,7 @@ import cats.data.Ior import cats.effect.{Concurrent, SyncIO} import cats.effect.kernel._ import cats.effect.kernel.implicits._ -import cats.effect.std.{Console, Queue, Semaphore} +import cats.effect.std.{Console, Queue, QueueSink, QueueSource, Semaphore} import cats.effect.Resource.ExitCase import cats.syntax.all._ @@ -876,30 +876,47 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, /** Enqueues the elements of this stream to the supplied queue. */ - def enqueueUnterminated[F2[x] >: F[x], O2 >: O](queue: Queue[F2, O2]): Stream[F2, Nothing] = + def enqueueUnterminated[F2[x] >: F[x], O2 >: O](queue: QueueSink[F2, O2]): Stream[F2, Nothing] = this.foreach(queue.offer) + private[fs2] def enqueueUnterminated[F2[x] >: F[x], O2 >: O]( + queue: Queue[F2, O2] + ): Stream[F2, Nothing] = + enqueueUnterminated(queue: QueueSink[F2, O2]) + /** Enqueues the chunks of this stream to the supplied queue. */ def enqueueUnterminatedChunks[F2[x] >: F[x], O2 >: O]( - queue: Queue[F2, Chunk[O2]] + queue: QueueSink[F2, Chunk[O2]] ): Stream[F2, Nothing] = this.chunks.foreach(queue.offer) + private[fs2] def enqueueUnterminatedChunks[F2[x] >: F[x], O2 >: O]( + queue: Queue[F2, Chunk[O2]] + ): Stream[F2, Nothing] = enqueueUnterminatedChunks(queue: QueueSink[F2, Chunk[O2]]) + /** Enqueues the elements of this stream to the supplied queue and enqueues `None` when this stream terminates. */ def enqueueNoneTerminated[F2[x] >: F[x], O2 >: O]( - queue: Queue[F2, Option[O2]] + queue: QueueSink[F2, Option[O2]] ): Stream[F2, Nothing] = this.noneTerminate.foreach(queue.offer) + private[fs2] def enqueueNoneTerminated[F2[x] >: F[x], O2 >: O]( + queue: Queue[F2, Option[O2]] + ): Stream[F2, Nothing] = enqueueNoneTerminated(queue: QueueSink[F2, Option[O2]]) + /** Enqueues the chunks of this stream to the supplied queue and enqueues `None` when this stream terminates. */ def enqueueNoneTerminatedChunks[F2[x] >: F[x], O2 >: O]( - queue: Queue[F2, Option[Chunk[O2]]] + queue: QueueSink[F2, Option[Chunk[O2]]] ): Stream[F2, Nothing] = this.chunks.noneTerminate.foreach(queue.offer) + private[fs2] def enqueueNoneTerminatedChunks[F2[x] >: F[x], O2 >: O]( + queue: Queue[F2, Option[Chunk[O2]]] + ): Stream[F2, Nothing] = enqueueNoneTerminatedChunks(queue: QueueSink[F2, Option[Chunk[O2]]]) + /** Alias for `flatMap(o => Stream.eval(f(o)))`. * * @example {{{ @@ -3208,7 +3225,7 @@ object Stream extends StreamLowPriority { * are dequeued and emitted as a single chunk. */ def fromQueueUnterminated[F[_]: Functor, A]( - queue: Queue[F, A], + queue: QueueSource[F, A], limit: Int = Int.MaxValue ): Stream[F, A] = fromQueueNoneTerminatedChunk_[F, A]( @@ -3217,13 +3234,18 @@ object Stream extends StreamLowPriority { limit ) + private[fs2] def fromQueueUnterminated[F[_]: Functor, A]( + queue: Queue[F, A], + limit: Int + ): Stream[F, A] = fromQueueUnterminated(queue: QueueSource[F, A], limit) + /** Returns a stream of elements from the supplied queue. * * All elements that are available, up to the specified limit, * are dequeued and emitted as a single chunk. */ def fromQueueUnterminatedChunk[F[_]: Functor, A]( - queue: Queue[F, Chunk[A]], + queue: QueueSource[F, Chunk[A]], limit: Int = Int.MaxValue ): Stream[F, A] = fromQueueNoneTerminatedChunk_[F, A]( @@ -3232,6 +3254,11 @@ object Stream extends StreamLowPriority { limit ) + private[fs2] def fromQueueUnterminatedChunk[F[_]: Functor, A]( + queue: Queue[F, Chunk[A]], + limit: Int + ): Stream[F, A] = fromQueueUnterminatedChunk(queue: QueueSource[F, Chunk[A]], limit) + /** Returns a stream of elements from the supplied queue. * * The stream terminates upon dequeuing a `None`. @@ -3240,7 +3267,7 @@ object Stream extends StreamLowPriority { * are dequeued and emitted as a single chunk. */ def fromQueueNoneTerminated[F[_]: Functor, A]( - queue: Queue[F, Option[A]], + queue: QueueSource[F, Option[A]], limit: Int = Int.MaxValue ): Stream[F, A] = fromQueueNoneTerminatedChunk_( @@ -3249,6 +3276,11 @@ object Stream extends StreamLowPriority { limit ) + private[fs2] def fromQueueNoneTerminated[F[_]: Functor, A]( + queue: Queue[F, Option[A]], + limit: Int + ): Stream[F, A] = fromQueueNoneTerminated(queue: QueueSource[F, Option[A]], limit) + /** Returns a stream of elements from the supplied queue. * * The stream terminates upon dequeuing a `None`. @@ -3257,11 +3289,16 @@ object Stream extends StreamLowPriority { * are dequeued and emitted as a single chunk. */ def fromQueueNoneTerminatedChunk[F[_], A]( - queue: Queue[F, Option[Chunk[A]]], + queue: QueueSource[F, Option[Chunk[A]]], limit: Int = Int.MaxValue ): Stream[F, A] = fromQueueNoneTerminatedChunk_(queue.take, queue.tryTake, limit) + private[fs2] def fromQueueNoneTerminatedChunk[F[_], A]( + queue: Queue[F, Option[Chunk[A]]], + limit: Int + ): Stream[F, A] = fromQueueNoneTerminatedChunk(queue: QueueSource[F, Option[Chunk[A]]], limit) + private def fromQueueNoneTerminatedChunk_[F[_], A]( take: F[Option[Chunk[A]]], tryTake: F[Option[Option[Chunk[A]]]], From baeeda30739ed537b20e27f2d101f6c891c4b542 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 5 Jul 2021 14:52:26 +0000 Subject: [PATCH 2/2] Fix bincompat --- core/shared/src/main/scala/fs2/Stream.scala | 32 ++++++++++++++++++--- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index a2eae61508..4b59d0a1f7 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -3234,7 +3234,12 @@ object Stream extends StreamLowPriority { limit ) - private[fs2] def fromQueueUnterminated[F[_]: Functor, A]( + /** Returns a stream of elements from the supplied queue. + * + * All elements that are available, up to the specified limit, + * are dequeued and emitted as a single chunk. + */ + def fromQueueUnterminated[F[_]: Functor, A]( queue: Queue[F, A], limit: Int ): Stream[F, A] = fromQueueUnterminated(queue: QueueSource[F, A], limit) @@ -3254,7 +3259,12 @@ object Stream extends StreamLowPriority { limit ) - private[fs2] def fromQueueUnterminatedChunk[F[_]: Functor, A]( + /** Returns a stream of elements from the supplied queue. + * + * All elements that are available, up to the specified limit, + * are dequeued and emitted as a single chunk. + */ + def fromQueueUnterminatedChunk[F[_]: Functor, A]( queue: Queue[F, Chunk[A]], limit: Int ): Stream[F, A] = fromQueueUnterminatedChunk(queue: QueueSource[F, Chunk[A]], limit) @@ -3276,7 +3286,14 @@ object Stream extends StreamLowPriority { limit ) - private[fs2] def fromQueueNoneTerminated[F[_]: Functor, A]( + /** Returns a stream of elements from the supplied queue. + * + * The stream terminates upon dequeuing a `None`. + * + * All elements that are available, up to the specified limit, + * are dequeued and emitted as a single chunk. + */ + def fromQueueNoneTerminated[F[_]: Functor, A]( queue: Queue[F, Option[A]], limit: Int ): Stream[F, A] = fromQueueNoneTerminated(queue: QueueSource[F, Option[A]], limit) @@ -3294,7 +3311,14 @@ object Stream extends StreamLowPriority { ): Stream[F, A] = fromQueueNoneTerminatedChunk_(queue.take, queue.tryTake, limit) - private[fs2] def fromQueueNoneTerminatedChunk[F[_], A]( + /** Returns a stream of elements from the supplied queue. + * + * The stream terminates upon dequeuing a `None`. + * + * All elements that are available, up to the specified limit, + * are dequeued and emitted as a single chunk. + */ + def fromQueueNoneTerminatedChunk[F[_], A]( queue: Queue[F, Option[Chunk[A]]], limit: Int ): Stream[F, A] = fromQueueNoneTerminatedChunk(queue: QueueSource[F, Option[Chunk[A]]], limit)