Skip to content

Commit

Permalink
Merge pull request #918 from RaasAhsan/revise-deferred-mvarconc
Browse files Browse the repository at this point in the history
Revise Deferred, MVarConcurrent, LinkedMap
  • Loading branch information
djspiewak authored Jul 11, 2020
2 parents ee0ba3b + 4d9d907 commit 59676de
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 96 deletions.
9 changes: 8 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,14 @@ val mimaSettings = Seq(
exclude[IncompatibleSignatureProblem]("cats.effect.Resource.evalTap"),
// change in encoding of value classes in generic methods https://github.com/lightbend/mima/issues/423
exclude[IncompatibleSignatureProblem]("cats.effect.Blocker.apply"),
exclude[IncompatibleSignatureProblem]("cats.effect.Blocker.fromExecutorService")
exclude[IncompatibleSignatureProblem]("cats.effect.Blocker.fromExecutorService"),
// revise Deferred, MVarConcurrent, LinkedLongMap - https://github.com/typelevel/cats-effect/pull/918
exclude[IncompatibleResultTypeProblem]("cats.effect.concurrent.Deferred#State#Unset.waiting"),
exclude[DirectMissingMethodProblem]("cats.effect.concurrent.Deferred#State#Unset.copy"),
exclude[IncompatibleResultTypeProblem]("cats.effect.concurrent.Deferred#State#Unset.copy$default$1"),
exclude[DirectMissingMethodProblem]("cats.effect.concurrent.Deferred#State#Unset.this"),
exclude[MissingClassProblem]("cats.effect.concurrent.Deferred$Id"),
exclude[DirectMissingMethodProblem]("cats.effect.concurrent.Deferred#State#Unset.apply")
)
}
)
Expand Down
44 changes: 23 additions & 21 deletions core/shared/src/main/scala/cats/effect/concurrent/Deferred.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package cats
package effect
package concurrent

import cats.effect.internals.{Callback, LinkedMap, TrampolineEC}
import cats.implicits._
import cats.effect.internals.{Callback, LinkedLongMap, TrampolineEC}
import java.util.concurrent.atomic.AtomicReference

import cats.effect.concurrent.Deferred.TransformedDeferred
Expand Down Expand Up @@ -148,17 +149,15 @@ object Deferred {
def unsafeUncancelable[F[_]: Async, A]: Deferred[F, A] = unsafeTryableUncancelable[F, A]

private def unsafeTryable[F[_]: Concurrent, A]: TryableDeferred[F, A] =
new ConcurrentDeferred[F, A](new AtomicReference(Deferred.State.Unset(LinkedMap.empty)))
new ConcurrentDeferred[F, A](new AtomicReference(Deferred.State.Unset(LinkedLongMap.empty, 1)))

private def unsafeTryableUncancelable[F[_]: Async, A]: TryableDeferred[F, A] =
new UncancelableDeferred[F, A](Promise[A]())

final private class Id

sealed abstract private class State[A]
private object State {
final case class Set[A](a: A) extends State[A]
final case class Unset[A](waiting: LinkedMap[Id, A => Unit]) extends State[A]
final case class Unset[A](waiting: LinkedLongMap[A => Unit], nextId: Long) extends State[A]
}

final private class ConcurrentDeferred[F[_], A](ref: AtomicReference[State[A]])(implicit F: Concurrent[F])
Expand All @@ -168,15 +167,15 @@ object Deferred {
ref.get match {
case State.Set(a) =>
F.pure(a)
case State.Unset(_) =>
case State.Unset(_, _) =>
F.cancelable[A] { cb =>
val id = unsafeRegister(cb)
@tailrec
def unregister(): Unit =
ref.get match {
case State.Set(_) => ()
case s @ State.Unset(waiting) =>
val updated = State.Unset(waiting - id)
case s @ State.Unset(waiting, _) =>
val updated = s.copy(waiting = waiting - id)
if (ref.compareAndSet(s, updated)) ()
else unregister()
}
Expand All @@ -188,26 +187,29 @@ object Deferred {
def tryGet: F[Option[A]] =
F.delay {
ref.get match {
case State.Set(a) => Some(a)
case State.Unset(_) => None
case State.Set(a) => Some(a)
case State.Unset(_, _) => None
}
}

private[this] def unsafeRegister(cb: Either[Throwable, A] => Unit): Id = {
val id = new Id

private[this] def unsafeRegister(cb: Either[Throwable, A] => Unit): Long = {
@tailrec
def register(): Option[A] =
def register(): Either[Long, A] =
ref.get match {
case State.Set(a) => Some(a)
case s @ State.Unset(waiting) =>
val updated = State.Unset(waiting.updated(id, (a: A) => cb(Right(a))))
if (ref.compareAndSet(s, updated)) None
case State.Set(a) => Right(a)
case s @ State.Unset(waiting, nextId) =>
val updated = State.Unset(waiting.updated(nextId, (a: A) => cb(Right(a))), nextId + 1)
if (ref.compareAndSet(s, updated)) Left(nextId)
else register()
}

register().foreach(a => cb(Right(a)))
id
register() match {
case Left(id) => id
case r @ Right(_) => {
cb(r.leftCast[Throwable])
0L
}
}
}

def complete(a: A): F[Unit] =
Expand All @@ -219,7 +221,7 @@ object Deferred {
case State.Set(_) =>
throw new IllegalStateException("Attempting to complete a Deferred that has already been completed")

case s @ State.Unset(_) =>
case s @ State.Unset(_, _) =>
if (ref.compareAndSet(s, State.Set(a))) {
val list = s.waiting.values
if (list.nonEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import scala.collection.immutable.LongMap
* traversed in the order they were inserted. Alternative to `ListMap` that
* has better asymptotic performance at the cost of more memory usage.
*/
private[effect] class LinkedMap[K, +V](
val entries: Map[K, (V, Long)],
private[this] val insertionOrder: LongMap[K],
private[effect] class LinkedLongMap[+V](
val entries: LongMap[(V, Long)],
private[this] val insertionOrder: LongMap[Long],
private[this] val nextId: Long
) {

Expand All @@ -36,28 +36,28 @@ private[effect] class LinkedMap[K, +V](
entries.isEmpty

/** Returns a new map with the supplied key/value added. */
def updated[V2 >: V](k: K, v: V2): LinkedMap[K, V2] = {
def updated[V2 >: V](k: Long, v: V2): LinkedLongMap[V2] = {
val insertionOrderOldRemoved = entries.get(k).fold(insertionOrder) { case (_, id) => insertionOrder - id }
new LinkedMap(entries.updated(k, (v, nextId)), insertionOrderOldRemoved.updated(nextId, k), nextId + 1)
new LinkedLongMap(entries.updated(k, (v, nextId)), insertionOrderOldRemoved.updated(nextId, k), nextId + 1)
}

/** Removes the element at the specified key. */
def -(k: K): LinkedMap[K, V] =
new LinkedMap(entries - k,
entries
.get(k)
.map { case (_, id) => insertionOrder - id }
.getOrElse(insertionOrder),
nextId)
def -(k: Long): LinkedLongMap[V] =
new LinkedLongMap(entries - k,
entries
.get(k)
.map { case (_, id) => insertionOrder - id }
.getOrElse(insertionOrder),
nextId)

/** The keys in this map, in the order they were added. */
def keys: Iterable[K] = insertionOrder.values
def keys: Iterable[Long] = insertionOrder.values

/** The values in this map, in the order they were added. */
def values: Iterable[V] = keys.flatMap(k => entries.get(k).toList.map(_._1))

/** Pulls the first value from this `LinkedMap`, in FIFO order. */
def dequeue: (V, LinkedMap[K, V]) = {
def dequeue: (V, LinkedLongMap[V]) = {
val k = insertionOrder.head._2
(entries(k)._1, this - k)
}
Expand All @@ -66,10 +66,10 @@ private[effect] class LinkedMap[K, +V](
keys.zip(values).mkString("LinkedMap(", ", ", ")")
}

private[effect] object LinkedMap {
def empty[K, V]: LinkedMap[K, V] =
emptyRef.asInstanceOf[LinkedMap[K, V]]
private[effect] object LinkedLongMap {
def empty[V]: LinkedLongMap[V] =
emptyRef.asInstanceOf[LinkedLongMap[V]]

private val emptyRef =
new LinkedMap[Nothing, Nothing](Map.empty, LongMap.empty, 0)
new LinkedLongMap[Nothing](LongMap.empty, LongMap.empty, 0)
}
Loading

0 comments on commit 59676de

Please sign in to comment.