Skip to content

Commit

Permalink
Merge pull request #3930 from neomaclin/fix/priority-queue-sink-sourc…
Browse files Browse the repository at this point in the history
…e-use-queue-sink-source

fix: Added priority queue sink and source extending queue
  • Loading branch information
djspiewak authored Jan 15, 2024
2 parents 1bc18fe + 416bf4f commit 0f34a2d
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 96 deletions.
95 changes: 8 additions & 87 deletions std/shared/src/main/scala/cats/effect/std/PQueue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -214,37 +214,7 @@ object PQueue {
else ()
}

trait PQueueSource[F[_], A] {

/**
* Dequeues the least element from the PQueue, possibly fiber blocking until an element
* becomes available.
*
* O(log(n))
*
* Note: If there are multiple elements with least priority, the order in which they are
* dequeued is undefined. If you want to break ties with FIFO order you will need an
* additional `Ref[F, Long]` to track insertion, and embed that information into your instance
* for `Order[A]`.
*/
def take: F[A]

/**
* Attempts to dequeue the least element from the PQueue, if one is available without fiber
* blocking.
*
* O(log(n))
*
* @return
* an effect that describes whether the dequeueing of an element from the PQueue succeeded
* without blocking, with `None` denoting that no element was available
*
* Note: If there are multiple elements with least priority, the order in which they are
* dequeued is undefined. If you want to break ties with FIFO order you will need an
* additional `Ref[F, Long]` to track insertion, and embed that information into your instance
* for `Order[A]`.
*/
def tryTake: F[Option[A]]
trait PQueueSource[F[_], A] extends QueueSource[F, A] {

/**
* Attempts to dequeue elements from the PQueue, if they are available without semantically
Expand All @@ -260,33 +230,12 @@ trait PQueueSource[F[_], A] {
* Note: If there are multiple elements with least priority, the order in which they are
* dequeued is undefined.
*/
def tryTakeN(maxN: Option[Int])(implicit F: Monad[F]): F[List[A]] = {
PQueueSource.assertMaxNPositive(maxN)

def loop(i: Int, limit: Int, acc: List[A]): F[List[A]] =
if (i >= limit)
F.pure(acc.reverse)
else
tryTake flatMap {
case Some(a) => loop(i + 1, limit, a :: acc)
case None => F.pure(acc.reverse)
}

maxN match {
case Some(limit) => loop(0, limit, Nil)
case None => loop(0, Int.MaxValue, Nil)
}
}
override def tryTakeN(maxN: Option[Int])(implicit F: Monad[F]): F[List[A]] =
QueueSource.tryTakeN[F, A](maxN, tryTake)

def size: F[Int]
}

object PQueueSource {
private def assertMaxNPositive(maxN: Option[Int]): Unit = maxN match {
case Some(n) if n <= 0 =>
throw new IllegalArgumentException(s"Provided maxN parameter must be positive, was $n")
case _ => ()
}

implicit def catsFunctorForPQueueSource[F[_]: Functor]: Functor[PQueueSource[F, *]] =
new Functor[PQueueSource[F, *]] {
Expand All @@ -302,31 +251,7 @@ object PQueueSource {
}
}

trait PQueueSink[F[_], A] {

/**
* Enqueues the given element, possibly fiber blocking until sufficient capacity becomes
* available.
*
* O(log(n))
*
* @param a
* the element to be put in the PQueue
*/
def offer(a: A): F[Unit]

/**
* Attempts to enqueue the given element without fiber blocking.
*
* O(log(n))
*
* @param a
* the element to be put in the PQueue
* @return
* an effect that describes whether the enqueuing of the given element succeeded without
* blocking
*/
def tryOffer(a: A): F[Boolean]
trait PQueueSink[F[_], A] extends QueueSink[F, A] {

/**
* Attempts to enqueue the given elements without semantically blocking. If an item in the
Expand All @@ -339,17 +264,13 @@ trait PQueueSink[F[_], A] {
* @return
* an effect that contains the remaining valus that could not be offered.
*/
def tryOfferN(list: List[A])(implicit F: Monad[F]): F[List[A]] = list match {
case Nil => F.pure(list)
case h :: t =>
tryOffer(h).ifM(
tryOfferN(t),
F.pure(list)
)
}
override def tryOfferN(list: List[A])(implicit F: Monad[F]): F[List[A]] =
QueueSink.tryOfferN(list, tryOffer)

}

object PQueueSink {

implicit def catsContravariantForPQueueSink[F[_]]: Contravariant[PQueueSink[F, *]] =
new Contravariant[PQueueSink[F, *]] {
override def contramap[A, B](fa: PQueueSink[F, A])(f: B => A): PQueueSink[F, B] =
Expand Down
29 changes: 20 additions & 9 deletions std/shared/src/main/scala/cats/effect/std/Queue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1135,7 +1135,16 @@ trait QueueSource[F[_], A] {
* @return
* an effect that contains the dequeued elements
*/
def tryTakeN(maxN: Option[Int])(implicit F: Monad[F]): F[List[A]] = {
def tryTakeN(maxN: Option[Int])(implicit F: Monad[F]): F[List[A]] =
QueueSource.tryTakeN[F, A](maxN, tryTake)

def size: F[Int]
}

object QueueSource {

private[std] def tryTakeN[F[_], A](maxN: Option[Int], tryTake: F[Option[A]])(
implicit F: Monad[F]): F[List[A]] = {
QueueSource.assertMaxNPositive(maxN)

def loop(i: Int, limit: Int, acc: List[A]): F[List[A]] =
Expand All @@ -1153,10 +1162,6 @@ trait QueueSource[F[_], A] {
}
}

def size: F[Int]
}

object QueueSource {
private[std] def assertMaxNPositive(maxN: Option[Int]): Unit = maxN match {
case Some(n) if n <= 0 =>
throw new IllegalArgumentException(s"Provided maxN parameter must be positive, was $n")
Expand Down Expand Up @@ -1212,17 +1217,23 @@ trait QueueSink[F[_], A] {
* @return
* an effect that contains the remaining valus that could not be offered.
*/
def tryOfferN(list: List[A])(implicit F: Monad[F]): F[List[A]] = list match {
def tryOfferN(list: List[A])(implicit F: Monad[F]): F[List[A]] =
QueueSink.tryOfferN[F, A](list, tryOffer)

}

object QueueSink {

private[std] def tryOfferN[F[_], A](list: List[A], tryOffer: A => F[Boolean])(
implicit F: Monad[F]): F[List[A]] = list match {
case Nil => F.pure(list)
case h :: t =>
tryOffer(h).ifM(
tryOfferN(t),
tryOfferN(t, tryOffer),
F.pure(list)
)
}
}

object QueueSink {
implicit def catsContravariantForQueueSink[F[_]]: Contravariant[QueueSink[F, *]] =
new Contravariant[QueueSink[F, *]] {
override def contramap[A, B](fa: QueueSink[F, A])(f: B => A): QueueSink[F, B] =
Expand Down

0 comments on commit 0f34a2d

Please sign in to comment.