diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 7fcf3b50e7..4b59d0a1f7 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -30,7 +30,7 @@ import cats.data.Ior import cats.effect.{Concurrent, SyncIO} import cats.effect.kernel._ import cats.effect.kernel.implicits._ -import cats.effect.std.{Console, Queue, Semaphore} +import cats.effect.std.{Console, Queue, QueueSink, QueueSource, Semaphore} import cats.effect.Resource.ExitCase import cats.syntax.all._ @@ -876,30 +876,47 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, /** Enqueues the elements of this stream to the supplied queue. */ - def enqueueUnterminated[F2[x] >: F[x], O2 >: O](queue: Queue[F2, O2]): Stream[F2, Nothing] = + def enqueueUnterminated[F2[x] >: F[x], O2 >: O](queue: QueueSink[F2, O2]): Stream[F2, Nothing] = this.foreach(queue.offer) + private[fs2] def enqueueUnterminated[F2[x] >: F[x], O2 >: O]( + queue: Queue[F2, O2] + ): Stream[F2, Nothing] = + enqueueUnterminated(queue: QueueSink[F2, O2]) + /** Enqueues the chunks of this stream to the supplied queue. */ def enqueueUnterminatedChunks[F2[x] >: F[x], O2 >: O]( - queue: Queue[F2, Chunk[O2]] + queue: QueueSink[F2, Chunk[O2]] ): Stream[F2, Nothing] = this.chunks.foreach(queue.offer) + private[fs2] def enqueueUnterminatedChunks[F2[x] >: F[x], O2 >: O]( + queue: Queue[F2, Chunk[O2]] + ): Stream[F2, Nothing] = enqueueUnterminatedChunks(queue: QueueSink[F2, Chunk[O2]]) + /** Enqueues the elements of this stream to the supplied queue and enqueues `None` when this stream terminates. */ def enqueueNoneTerminated[F2[x] >: F[x], O2 >: O]( - queue: Queue[F2, Option[O2]] + queue: QueueSink[F2, Option[O2]] ): Stream[F2, Nothing] = this.noneTerminate.foreach(queue.offer) + private[fs2] def enqueueNoneTerminated[F2[x] >: F[x], O2 >: O]( + queue: Queue[F2, Option[O2]] + ): Stream[F2, Nothing] = enqueueNoneTerminated(queue: QueueSink[F2, Option[O2]]) + /** Enqueues the chunks of this stream to the supplied queue and enqueues `None` when this stream terminates. */ def enqueueNoneTerminatedChunks[F2[x] >: F[x], O2 >: O]( - queue: Queue[F2, Option[Chunk[O2]]] + queue: QueueSink[F2, Option[Chunk[O2]]] ): Stream[F2, Nothing] = this.chunks.noneTerminate.foreach(queue.offer) + private[fs2] def enqueueNoneTerminatedChunks[F2[x] >: F[x], O2 >: O]( + queue: Queue[F2, Option[Chunk[O2]]] + ): Stream[F2, Nothing] = enqueueNoneTerminatedChunks(queue: QueueSink[F2, Option[Chunk[O2]]]) + /** Alias for `flatMap(o => Stream.eval(f(o)))`. * * @example {{{ @@ -3208,7 +3225,7 @@ object Stream extends StreamLowPriority { * are dequeued and emitted as a single chunk. */ def fromQueueUnterminated[F[_]: Functor, A]( - queue: Queue[F, A], + queue: QueueSource[F, A], limit: Int = Int.MaxValue ): Stream[F, A] = fromQueueNoneTerminatedChunk_[F, A]( @@ -3217,13 +3234,23 @@ object Stream extends StreamLowPriority { limit ) + /** Returns a stream of elements from the supplied queue. + * + * All elements that are available, up to the specified limit, + * are dequeued and emitted as a single chunk. + */ + def fromQueueUnterminated[F[_]: Functor, A]( + queue: Queue[F, A], + limit: Int + ): Stream[F, A] = fromQueueUnterminated(queue: QueueSource[F, A], limit) + /** Returns a stream of elements from the supplied queue. * * All elements that are available, up to the specified limit, * are dequeued and emitted as a single chunk. */ def fromQueueUnterminatedChunk[F[_]: Functor, A]( - queue: Queue[F, Chunk[A]], + queue: QueueSource[F, Chunk[A]], limit: Int = Int.MaxValue ): Stream[F, A] = fromQueueNoneTerminatedChunk_[F, A]( @@ -3232,6 +3259,16 @@ object Stream extends StreamLowPriority { limit ) + /** Returns a stream of elements from the supplied queue. + * + * All elements that are available, up to the specified limit, + * are dequeued and emitted as a single chunk. + */ + def fromQueueUnterminatedChunk[F[_]: Functor, A]( + queue: Queue[F, Chunk[A]], + limit: Int + ): Stream[F, A] = fromQueueUnterminatedChunk(queue: QueueSource[F, Chunk[A]], limit) + /** Returns a stream of elements from the supplied queue. * * The stream terminates upon dequeuing a `None`. @@ -3240,7 +3277,7 @@ object Stream extends StreamLowPriority { * are dequeued and emitted as a single chunk. */ def fromQueueNoneTerminated[F[_]: Functor, A]( - queue: Queue[F, Option[A]], + queue: QueueSource[F, Option[A]], limit: Int = Int.MaxValue ): Stream[F, A] = fromQueueNoneTerminatedChunk_( @@ -3249,6 +3286,18 @@ object Stream extends StreamLowPriority { limit ) + /** Returns a stream of elements from the supplied queue. + * + * The stream terminates upon dequeuing a `None`. + * + * All elements that are available, up to the specified limit, + * are dequeued and emitted as a single chunk. + */ + def fromQueueNoneTerminated[F[_]: Functor, A]( + queue: Queue[F, Option[A]], + limit: Int + ): Stream[F, A] = fromQueueNoneTerminated(queue: QueueSource[F, Option[A]], limit) + /** Returns a stream of elements from the supplied queue. * * The stream terminates upon dequeuing a `None`. @@ -3257,11 +3306,23 @@ object Stream extends StreamLowPriority { * are dequeued and emitted as a single chunk. */ def fromQueueNoneTerminatedChunk[F[_], A]( - queue: Queue[F, Option[Chunk[A]]], + queue: QueueSource[F, Option[Chunk[A]]], limit: Int = Int.MaxValue ): Stream[F, A] = fromQueueNoneTerminatedChunk_(queue.take, queue.tryTake, limit) + /** Returns a stream of elements from the supplied queue. + * + * The stream terminates upon dequeuing a `None`. + * + * All elements that are available, up to the specified limit, + * are dequeued and emitted as a single chunk. + */ + def fromQueueNoneTerminatedChunk[F[_], A]( + queue: Queue[F, Option[Chunk[A]]], + limit: Int + ): Stream[F, A] = fromQueueNoneTerminatedChunk(queue: QueueSource[F, Option[Chunk[A]]], limit) + private def fromQueueNoneTerminatedChunk_[F[_], A]( take: F[Option[Chunk[A]]], tryTake: F[Option[Option[Chunk[A]]]],