Skip to content

Commit

Permalink
MVar: add "tryRead" and "swap" fixes #732
Browse files Browse the repository at this point in the history
Modification:

- add `tryRead: F[Option[A]]` similar to Control-Concurrent-MVar#tryRead
- add `swap(newValue: A): F[A]` similar to Control-Concurrent-MVar#swapMVar
- add a test and a line into the docs
- add MVar2 to keep binary compatibility with MVar
  • Loading branch information
kubum committed Mar 22, 2020
1 parent 49dd0e9 commit 9399999
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 69 deletions.
77 changes: 62 additions & 15 deletions core/shared/src/main/scala/cats/effect/concurrent/MVar.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
package cats.effect
package concurrent

import cats.effect.concurrent.MVar.TransformedMVar
import cats.effect.concurrent.MVar.{TransformedMVar, TransformedMVar2}
import cats.effect.internals.{MVarAsync, MVarConcurrent}
import cats.~>

/**
* A mutable location, that is either empty or contains
* a value of type `A`.
* @define mVarDescription A mutable location, that is either empty or contains a value of type `A`.
*
* It has the following fundamental atomic operations:
*
Expand All @@ -37,6 +36,8 @@ import cats.~>
* - [[read]] which reads the current value without touching it,
* assuming there is one, or otherwise it waits until a value
* is made available via `put`
* - `tryRead` returns a variable if it exists. Implemented in the successor [[MVar2]]
* - `swap` takes a value, replaces it and returns the taken value. Implemented in the successor [[MVar2]]
* - [[isEmpty]] returns true if currently empty
*
* The `MVar` is appropriate for building synchronization
Expand All @@ -52,8 +53,12 @@ import cats.~>
* Inspired by `Control.Concurrent.MVar` from Haskell and
* by `scalaz.concurrent.MVar`.
*/
abstract class MVar[F[_], A] {
sealed private[concurrent] trait MVarDocumentation extends Any {}

/**
* $mVarDescription
*/
abstract class MVar[F[_], A] extends MVarDocumentation {
/**
* Returns `true` if the `MVar` is empty and can receive a `put`, or
* `false` otherwise.
Expand Down Expand Up @@ -120,9 +125,39 @@ abstract class MVar[F[_], A] {
new TransformedMVar(this, f)
}

/**
* $mVarDescription
*
* The `MVar2` is the successor of `MVar` with [[tryRead]] and [[swap]]. It was implemented separately only to maintain
* binary compatibility with `MVar`.
*/
abstract class MVar2[F[_], A] extends MVar[F, A] {
/**
* Replaces a value in MVar and returns the old value.
* @param newValue is a new value
* @return the value taken
*/
def swap(newValue: A): F[A]

/**
* Returns the value without waiting or modifying.
*
* This operation is atomic.
*
* @return an Option holding the current value, None means it was empty
*/
def tryRead: F[Option[A]]

/**
* Modify the context `F` using transformation `f`.
*/
override def mapK[G[_]](f: F ~> G): MVar2[G, A] =
new TransformedMVar2(this, f)
}

/** Builders for [[MVar]]. */
object MVar {

/**
* Builds an [[MVar]] value for `F` data types that are [[Concurrent]].
*
Expand Down Expand Up @@ -156,7 +191,7 @@ object MVar {
* @param F is a [[Concurrent]] constraint, needed in order to
* describe cancelable operations
*/
def empty[F[_], A](implicit F: Concurrent[F]): F[MVar[F, A]] =
def empty[F[_], A](implicit F: Concurrent[F]): F[MVar2[F, A]] =
F.delay(MVarConcurrent.empty)

/**
Expand All @@ -172,7 +207,7 @@ object MVar {
*
* @see [[empty]] for creating cancelable MVars
*/
def uncancelableEmpty[F[_], A](implicit F: Async[F]): F[MVar[F, A]] =
def uncancelableEmpty[F[_], A](implicit F: Async[F]): F[MVar2[F, A]] =
F.delay(MVarAsync.empty)

/**
Expand All @@ -187,7 +222,7 @@ object MVar {
* @param F is a [[Concurrent]] constraint, needed in order to
* describe cancelable operations
*/
def of[F[_], A](initial: A)(implicit F: Concurrent[F]): F[MVar[F, A]] =
def of[F[_], A](initial: A)(implicit F: Concurrent[F]): F[MVar2[F, A]] =
F.delay(MVarConcurrent(initial))

/**
Expand All @@ -204,31 +239,31 @@ object MVar {
*
* @see [[of]] for creating cancelable MVars
*/
def uncancelableOf[F[_], A](initial: A)(implicit F: Async[F]): F[MVar[F, A]] =
def uncancelableOf[F[_], A](initial: A)(implicit F: Async[F]): F[MVar2[F, A]] =
F.delay(MVarAsync(initial))

/**
* Like [[of]] but initializes state using another effect constructor
*/
def in[F[_], G[_], A](initial: A)(implicit F: Sync[F], G: Concurrent[G]): F[MVar[G, A]] =
def in[F[_], G[_], A](initial: A)(implicit F: Sync[F], G: Concurrent[G]): F[MVar2[G, A]] =
F.delay(MVarConcurrent(initial))

/**
* Like [[empty]] but initializes state using another effect constructor
*/
def emptyIn[F[_], G[_], A](implicit F: Sync[F], G: Concurrent[G]): F[MVar[G, A]] =
def emptyIn[F[_], G[_], A](implicit F: Sync[F], G: Concurrent[G]): F[MVar2[G, A]] =
F.delay(MVarConcurrent.empty)

/**
* Like [[uncancelableOf]] but initializes state using another effect constructor
*/
def uncancelableIn[F[_], G[_], A](initial: A)(implicit F: Sync[F], G: Async[G]): F[MVar[G, A]] =
def uncancelableIn[F[_], G[_], A](initial: A)(implicit F: Sync[F], G: Async[G]): F[MVar2[G, A]] =
F.delay(MVarAsync(initial))

/**
* Like [[uncancelableEmpty]] but initializes state using another effect constructor
*/
def uncancelableEmptyIn[F[_], G[_], A](implicit F: Sync[F], G: Async[G]): F[MVar[G, A]] =
def uncancelableEmptyIn[F[_], G[_], A](implicit F: Sync[F], G: Async[G]): F[MVar2[G, A]] =
F.delay(MVarAsync.empty)

/**
Expand All @@ -241,15 +276,15 @@ object MVar {
*
* @see documentation for [[MVar.of]]
*/
def of[A](a: A): F[MVar[F, A]] =
def of[A](a: A): F[MVar2[F, A]] =
MVar.of(a)(F)

/**
* Builds an empty `MVar`.
*
* @see documentation for [[MVar.empty]]
*/
def empty[A]: F[MVar[F, A]] =
def empty[A]: F[MVar2[F, A]] =
MVar.empty(F)
}

Expand All @@ -262,4 +297,16 @@ object MVar {
override def tryTake: G[Option[A]] = trans(underlying.tryTake)
override def read: G[A] = trans(underlying.read)
}

final private[concurrent] class TransformedMVar2[F[_], G[_], A](underlying: MVar2[F, A], trans: F ~> G)
extends MVar2[G, A] {
override def isEmpty: G[Boolean] = trans(underlying.isEmpty)
override def put(a: A): G[Unit] = trans(underlying.put(a))
override def tryPut(a: A): G[Boolean] = trans(underlying.tryPut(a))
override def take: G[A] = trans(underlying.take)
override def tryTake: G[Option[A]] = trans(underlying.tryTake)
override def read: G[A] = trans(underlying.read)
override def tryRead: G[Option[A]] = trans(underlying.tryRead)
override def swap(newValue: A): G[A] = trans(underlying.swap(newValue))
}
}
22 changes: 18 additions & 4 deletions core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@ package cats.effect
package internals

import java.util.concurrent.atomic.AtomicReference
import cats.effect.concurrent.MVar

import cats.effect.concurrent.MVar2
import cats.effect.internals.Callback.rightUnit

import scala.annotation.tailrec
import scala.collection.immutable.Queue

/**
* [[MVar]] implementation for [[Async]] data types.
*/
final private[effect] class MVarAsync[F[_], A] private (initial: MVarAsync.State[A])(implicit F: Async[F])
extends MVar[F, A] {
extends MVar2[F, A] {
import MVarAsync._

/** Shared mutable state. */
Expand Down Expand Up @@ -55,6 +57,18 @@ final private[effect] class MVarAsync[F[_], A] private (initial: MVarAsync.State
def read: F[A] =
F.async(unsafeRead)

def tryRead: F[Option[A]] = F.delay {
stateRef.get match {
case WaitForTake(value, _) => Some(value)
case WaitForPut(_, _) => None
}
}

def swap(newValue: A): F[A] =
F.flatMap(take) { oldValue =>
F.map(put(newValue))(_ => oldValue)
}

def isEmpty: F[Boolean] =
F.delay {
stateRef.get match {
Expand Down Expand Up @@ -226,11 +240,11 @@ final private[effect] class MVarAsync[F[_], A] private (initial: MVarAsync.State
private[effect] object MVarAsync {

/** Builds an [[MVarAsync]] instance with an `initial` value. */
def apply[F[_], A](initial: A)(implicit F: Async[F]): MVar[F, A] =
def apply[F[_], A](initial: A)(implicit F: Async[F]): MVar2[F, A] =
new MVarAsync[F, A](State(initial))

/** Returns an empty [[MVarAsync]] instance. */
def empty[F[_], A](implicit F: Async[F]): MVar[F, A] =
def empty[F[_], A](implicit F: Async[F]): MVar2[F, A] =
new MVarAsync[F, A](State.empty)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@ package cats.effect
package internals

import java.util.concurrent.atomic.AtomicReference
import cats.effect.concurrent.MVar

import cats.effect.concurrent.MVar2
import cats.effect.internals.Callback.rightUnit

import scala.annotation.tailrec

/**
* [[MVar]] implementation for [[Concurrent]] data types.
*/
final private[effect] class MVarConcurrent[F[_], A] private (initial: MVarConcurrent.State[A])(
implicit F: Concurrent[F]
) extends MVar[F, A] {
) extends MVar2[F, A] {
import MVarConcurrent._

/** Shared mutable state. */
Expand Down Expand Up @@ -58,6 +60,14 @@ final private[effect] class MVarConcurrent[F[_], A] private (initial: MVarConcur
val read: F[A] =
F.cancelable(unsafeRead)

def tryRead =
F.delay {
stateRef.get match {
case WaitForTake(value, _) => Some(value)
case WaitForPut(_, _) => None
}
}

def isEmpty: F[Boolean] =
F.delay {
stateRef.get match {
Expand All @@ -66,6 +76,11 @@ final private[effect] class MVarConcurrent[F[_], A] private (initial: MVarConcur
}
}

def swap(newValue: A): F[A] =
F.flatMap(take) { oldValue =>
F.map(put(newValue))(_ => oldValue)
}

@tailrec private def unsafeTryPut(a: A): F[Boolean] =
stateRef.get match {
case WaitForTake(_, _) => F.pure(false)
Expand Down Expand Up @@ -280,11 +295,11 @@ final private[effect] class MVarConcurrent[F[_], A] private (initial: MVarConcur
private[effect] object MVarConcurrent {

/** Builds an [[MVarConcurrent]] instance with an `initial` value. */
def apply[F[_], A](initial: A)(implicit F: Concurrent[F]): MVar[F, A] =
def apply[F[_], A](initial: A)(implicit F: Concurrent[F]): MVar2[F, A] =
new MVarConcurrent[F, A](State(initial))

/** Returns an empty [[MVarConcurrent]] instance. */
def empty[F[_], A](implicit F: Concurrent[F]): MVar[F, A] =
def empty[F[_], A](implicit F: Concurrent[F]): MVar2[F, A] =
new MVarConcurrent[F, A](State.empty)

/**
Expand Down
Loading

0 comments on commit 9399999

Please sign in to comment.