diff --git a/std/shared/src/main/scala/cats/effect/std/PQueue.scala b/std/shared/src/main/scala/cats/effect/std/PQueue.scala index a8f1864950..4ef532a570 100644 --- a/std/shared/src/main/scala/cats/effect/std/PQueue.scala +++ b/std/shared/src/main/scala/cats/effect/std/PQueue.scala @@ -214,37 +214,7 @@ object PQueue { else () } -trait PQueueSource[F[_], A] { - - /** - * Dequeues the least element from the PQueue, possibly fiber blocking until an element - * becomes available. - * - * O(log(n)) - * - * Note: If there are multiple elements with least priority, the order in which they are - * dequeued is undefined. If you want to break ties with FIFO order you will need an - * additional `Ref[F, Long]` to track insertion, and embed that information into your instance - * for `Order[A]`. - */ - def take: F[A] - - /** - * Attempts to dequeue the least element from the PQueue, if one is available without fiber - * blocking. - * - * O(log(n)) - * - * @return - * an effect that describes whether the dequeueing of an element from the PQueue succeeded - * without blocking, with `None` denoting that no element was available - * - * Note: If there are multiple elements with least priority, the order in which they are - * dequeued is undefined. If you want to break ties with FIFO order you will need an - * additional `Ref[F, Long]` to track insertion, and embed that information into your instance - * for `Order[A]`. - */ - def tryTake: F[Option[A]] +trait PQueueSource[F[_], A] extends QueueSource[F, A] { /** * Attempts to dequeue elements from the PQueue, if they are available without semantically @@ -260,33 +230,12 @@ trait PQueueSource[F[_], A] { * Note: If there are multiple elements with least priority, the order in which they are * dequeued is undefined. */ - def tryTakeN(maxN: Option[Int])(implicit F: Monad[F]): F[List[A]] = { - PQueueSource.assertMaxNPositive(maxN) - - def loop(i: Int, limit: Int, acc: List[A]): F[List[A]] = - if (i >= limit) - F.pure(acc.reverse) - else - tryTake flatMap { - case Some(a) => loop(i + 1, limit, a :: acc) - case None => F.pure(acc.reverse) - } - - maxN match { - case Some(limit) => loop(0, limit, Nil) - case None => loop(0, Int.MaxValue, Nil) - } - } + override def tryTakeN(maxN: Option[Int])(implicit F: Monad[F]): F[List[A]] = + QueueSource.tryTakeN[F, A](maxN, tryTake) - def size: F[Int] } object PQueueSource { - private def assertMaxNPositive(maxN: Option[Int]): Unit = maxN match { - case Some(n) if n <= 0 => - throw new IllegalArgumentException(s"Provided maxN parameter must be positive, was $n") - case _ => () - } implicit def catsFunctorForPQueueSource[F[_]: Functor]: Functor[PQueueSource[F, *]] = new Functor[PQueueSource[F, *]] { @@ -302,31 +251,7 @@ object PQueueSource { } } -trait PQueueSink[F[_], A] { - - /** - * Enqueues the given element, possibly fiber blocking until sufficient capacity becomes - * available. - * - * O(log(n)) - * - * @param a - * the element to be put in the PQueue - */ - def offer(a: A): F[Unit] - - /** - * Attempts to enqueue the given element without fiber blocking. - * - * O(log(n)) - * - * @param a - * the element to be put in the PQueue - * @return - * an effect that describes whether the enqueuing of the given element succeeded without - * blocking - */ - def tryOffer(a: A): F[Boolean] +trait PQueueSink[F[_], A] extends QueueSink[F, A] { /** * Attempts to enqueue the given elements without semantically blocking. If an item in the @@ -339,17 +264,13 @@ trait PQueueSink[F[_], A] { * @return * an effect that contains the remaining valus that could not be offered. */ - def tryOfferN(list: List[A])(implicit F: Monad[F]): F[List[A]] = list match { - case Nil => F.pure(list) - case h :: t => - tryOffer(h).ifM( - tryOfferN(t), - F.pure(list) - ) - } + override def tryOfferN(list: List[A])(implicit F: Monad[F]): F[List[A]] = + QueueSink.tryOfferN(list, tryOffer) + } object PQueueSink { + implicit def catsContravariantForPQueueSink[F[_]]: Contravariant[PQueueSink[F, *]] = new Contravariant[PQueueSink[F, *]] { override def contramap[A, B](fa: PQueueSink[F, A])(f: B => A): PQueueSink[F, B] = diff --git a/std/shared/src/main/scala/cats/effect/std/Queue.scala b/std/shared/src/main/scala/cats/effect/std/Queue.scala index 635e626113..81e5c827eb 100644 --- a/std/shared/src/main/scala/cats/effect/std/Queue.scala +++ b/std/shared/src/main/scala/cats/effect/std/Queue.scala @@ -1135,7 +1135,16 @@ trait QueueSource[F[_], A] { * @return * an effect that contains the dequeued elements */ - def tryTakeN(maxN: Option[Int])(implicit F: Monad[F]): F[List[A]] = { + def tryTakeN(maxN: Option[Int])(implicit F: Monad[F]): F[List[A]] = + QueueSource.tryTakeN[F, A](maxN, tryTake) + + def size: F[Int] +} + +object QueueSource { + + private[std] def tryTakeN[F[_], A](maxN: Option[Int], tryTake: F[Option[A]])( + implicit F: Monad[F]): F[List[A]] = { QueueSource.assertMaxNPositive(maxN) def loop(i: Int, limit: Int, acc: List[A]): F[List[A]] = @@ -1153,10 +1162,6 @@ trait QueueSource[F[_], A] { } } - def size: F[Int] -} - -object QueueSource { private[std] def assertMaxNPositive(maxN: Option[Int]): Unit = maxN match { case Some(n) if n <= 0 => throw new IllegalArgumentException(s"Provided maxN parameter must be positive, was $n") @@ -1212,17 +1217,23 @@ trait QueueSink[F[_], A] { * @return * an effect that contains the remaining valus that could not be offered. */ - def tryOfferN(list: List[A])(implicit F: Monad[F]): F[List[A]] = list match { + def tryOfferN(list: List[A])(implicit F: Monad[F]): F[List[A]] = + QueueSink.tryOfferN[F, A](list, tryOffer) + +} + +object QueueSink { + + private[std] def tryOfferN[F[_], A](list: List[A], tryOffer: A => F[Boolean])( + implicit F: Monad[F]): F[List[A]] = list match { case Nil => F.pure(list) case h :: t => tryOffer(h).ifM( - tryOfferN(t), + tryOfferN(t, tryOffer), F.pure(list) ) } -} -object QueueSink { implicit def catsContravariantForQueueSink[F[_]]: Contravariant[QueueSink[F, *]] = new Contravariant[QueueSink[F, *]] { override def contramap[A, B](fa: QueueSink[F, A])(f: B => A): QueueSink[F, B] =