Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relax queue-related signatures to use QueueSink / QueueSource #2466

Merged
merged 2 commits into from
Jul 8, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 70 additions & 9 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import cats.data.Ior
import cats.effect.{Concurrent, SyncIO}
import cats.effect.kernel._
import cats.effect.kernel.implicits._
import cats.effect.std.{Console, Queue, Semaphore}
import cats.effect.std.{Console, Queue, QueueSink, QueueSource, Semaphore}
import cats.effect.Resource.ExitCase
import cats.syntax.all._

Expand Down Expand Up @@ -876,30 +876,47 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,

/** Enqueues the elements of this stream to the supplied queue.
*/
def enqueueUnterminated[F2[x] >: F[x], O2 >: O](queue: Queue[F2, O2]): Stream[F2, Nothing] =
def enqueueUnterminated[F2[x] >: F[x], O2 >: O](queue: QueueSink[F2, O2]): Stream[F2, Nothing] =
this.foreach(queue.offer)

private[fs2] def enqueueUnterminated[F2[x] >: F[x], O2 >: O](
queue: Queue[F2, O2]
): Stream[F2, Nothing] =
enqueueUnterminated(queue: QueueSink[F2, O2])

/** Enqueues the chunks of this stream to the supplied queue.
*/
def enqueueUnterminatedChunks[F2[x] >: F[x], O2 >: O](
queue: Queue[F2, Chunk[O2]]
queue: QueueSink[F2, Chunk[O2]]
): Stream[F2, Nothing] =
this.chunks.foreach(queue.offer)

private[fs2] def enqueueUnterminatedChunks[F2[x] >: F[x], O2 >: O](
queue: Queue[F2, Chunk[O2]]
): Stream[F2, Nothing] = enqueueUnterminatedChunks(queue: QueueSink[F2, Chunk[O2]])

/** Enqueues the elements of this stream to the supplied queue and enqueues `None` when this stream terminates.
*/
def enqueueNoneTerminated[F2[x] >: F[x], O2 >: O](
queue: Queue[F2, Option[O2]]
queue: QueueSink[F2, Option[O2]]
): Stream[F2, Nothing] =
this.noneTerminate.foreach(queue.offer)

private[fs2] def enqueueNoneTerminated[F2[x] >: F[x], O2 >: O](
queue: Queue[F2, Option[O2]]
): Stream[F2, Nothing] = enqueueNoneTerminated(queue: QueueSink[F2, Option[O2]])

/** Enqueues the chunks of this stream to the supplied queue and enqueues `None` when this stream terminates.
*/
def enqueueNoneTerminatedChunks[F2[x] >: F[x], O2 >: O](
queue: Queue[F2, Option[Chunk[O2]]]
queue: QueueSink[F2, Option[Chunk[O2]]]
): Stream[F2, Nothing] =
this.chunks.noneTerminate.foreach(queue.offer)

private[fs2] def enqueueNoneTerminatedChunks[F2[x] >: F[x], O2 >: O](
queue: Queue[F2, Option[Chunk[O2]]]
): Stream[F2, Nothing] = enqueueNoneTerminatedChunks(queue: QueueSink[F2, Option[Chunk[O2]]])

/** Alias for `flatMap(o => Stream.eval(f(o)))`.
*
* @example {{{
Expand Down Expand Up @@ -3208,7 +3225,7 @@ object Stream extends StreamLowPriority {
* are dequeued and emitted as a single chunk.
*/
def fromQueueUnterminated[F[_]: Functor, A](
queue: Queue[F, A],
queue: QueueSource[F, A],
limit: Int = Int.MaxValue
): Stream[F, A] =
fromQueueNoneTerminatedChunk_[F, A](
Expand All @@ -3217,13 +3234,23 @@ object Stream extends StreamLowPriority {
limit
)

/** Returns a stream of elements from the supplied queue.
*
* All elements that are available, up to the specified limit,
* are dequeued and emitted as a single chunk.
*/
def fromQueueUnterminated[F[_]: Functor, A](
queue: Queue[F, A],
limit: Int
): Stream[F, A] = fromQueueUnterminated(queue: QueueSource[F, A], limit)

/** Returns a stream of elements from the supplied queue.
*
* All elements that are available, up to the specified limit,
* are dequeued and emitted as a single chunk.
*/
def fromQueueUnterminatedChunk[F[_]: Functor, A](
queue: Queue[F, Chunk[A]],
queue: QueueSource[F, Chunk[A]],
limit: Int = Int.MaxValue
): Stream[F, A] =
fromQueueNoneTerminatedChunk_[F, A](
Expand All @@ -3232,6 +3259,16 @@ object Stream extends StreamLowPriority {
limit
)

/** Returns a stream of elements from the supplied queue.
*
* All elements that are available, up to the specified limit,
* are dequeued and emitted as a single chunk.
*/
def fromQueueUnterminatedChunk[F[_]: Functor, A](
queue: Queue[F, Chunk[A]],
limit: Int
): Stream[F, A] = fromQueueUnterminatedChunk(queue: QueueSource[F, Chunk[A]], limit)

/** Returns a stream of elements from the supplied queue.
*
* The stream terminates upon dequeuing a `None`.
Expand All @@ -3240,7 +3277,7 @@ object Stream extends StreamLowPriority {
* are dequeued and emitted as a single chunk.
*/
def fromQueueNoneTerminated[F[_]: Functor, A](
queue: Queue[F, Option[A]],
queue: QueueSource[F, Option[A]],
limit: Int = Int.MaxValue
): Stream[F, A] =
fromQueueNoneTerminatedChunk_(
Expand All @@ -3249,6 +3286,18 @@ object Stream extends StreamLowPriority {
limit
)

/** Returns a stream of elements from the supplied queue.
*
* The stream terminates upon dequeuing a `None`.
*
* All elements that are available, up to the specified limit,
* are dequeued and emitted as a single chunk.
*/
def fromQueueNoneTerminated[F[_]: Functor, A](
queue: Queue[F, Option[A]],
limit: Int
): Stream[F, A] = fromQueueNoneTerminated(queue: QueueSource[F, Option[A]], limit)

/** Returns a stream of elements from the supplied queue.
*
* The stream terminates upon dequeuing a `None`.
Expand All @@ -3257,11 +3306,23 @@ object Stream extends StreamLowPriority {
* are dequeued and emitted as a single chunk.
*/
def fromQueueNoneTerminatedChunk[F[_], A](
queue: Queue[F, Option[Chunk[A]]],
queue: QueueSource[F, Option[Chunk[A]]],
limit: Int = Int.MaxValue
): Stream[F, A] =
fromQueueNoneTerminatedChunk_(queue.take, queue.tryTake, limit)

/** Returns a stream of elements from the supplied queue.
*
* The stream terminates upon dequeuing a `None`.
*
* All elements that are available, up to the specified limit,
* are dequeued and emitted as a single chunk.
*/
def fromQueueNoneTerminatedChunk[F[_], A](
queue: Queue[F, Option[Chunk[A]]],
limit: Int
): Stream[F, A] = fromQueueNoneTerminatedChunk(queue: QueueSource[F, Option[Chunk[A]]], limit)

private def fromQueueNoneTerminatedChunk_[F[_], A](
take: F[Option[Chunk[A]]],
tryTake: F[Option[Option[Chunk[A]]]],
Expand Down