Skip to content

Commit

Permalink
MVar: add "tryRead" fixes #732
Browse files Browse the repository at this point in the history
Modification:

- add tryRead: F[Option[A]] similar to Control-Concurrent-MVar#tryRead
- add a test and a line into the docs
- add MVar2 to keep binary compatibility with MVar
  • Loading branch information
kubum committed Feb 8, 2020
1 parent 91c8198 commit cdc6eb4
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 62 deletions.
55 changes: 44 additions & 11 deletions core/shared/src/main/scala/cats/effect/concurrent/MVar.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package cats.effect
package concurrent

import cats.effect.concurrent.MVar.TransformedMVar
import cats.effect.concurrent.MVar.{TransformedMVar, TransformedMVar2}
import cats.effect.internals.{MVarAsync, MVarConcurrent}
import cats.~>

Expand All @@ -37,6 +37,7 @@ import cats.~>
* - [[read]] which reads the current value without touching it,
* assuming there is one, or otherwise it waits until a value
* is made available via `put`
* - `tryRead` returns a variable if it exists. Implemented in the successor [[MVar2]]
* - [[isEmpty]] returns true if currently empty
*
* The `MVar` is appropriate for building synchronization
Expand Down Expand Up @@ -119,6 +120,27 @@ abstract class MVar[F[_], A] {
new TransformedMVar(this, f)
}

/**
* The `MVar2` is the successor of `MVar` with [[tryRead]]. It was implemented separately only to maintain binary
* compatibility with `MVar`.
*/
abstract class MVar2[F[_], A] extends MVar[F, A] {
/**
* Returns the value without waiting or modifying.
*
* This operation is atomic.
*
* @return an Option holding the current value, None means it was empty
*/
def tryRead: F[Option[A]]

/**
* Modify the context `F` using transformation `f`.
*/
override def mapK[G[_]](f: F ~> G): MVar2[G, A] =
new TransformedMVar2(this, f)
}

/** Builders for [[MVar]]. */
object MVar {
/**
Expand Down Expand Up @@ -154,7 +176,7 @@ object MVar {
* @param F is a [[Concurrent]] constraint, needed in order to
* describe cancelable operations
*/
def empty[F[_], A](implicit F: Concurrent[F]): F[MVar[F, A]] =
def empty[F[_], A](implicit F: Concurrent[F]): F[MVar2[F, A]] =
F.delay(MVarConcurrent.empty)

/**
Expand All @@ -170,7 +192,7 @@ object MVar {
*
* @see [[empty]] for creating cancelable MVars
*/
def uncancelableEmpty[F[_], A](implicit F: Async[F]): F[MVar[F, A]] =
def uncancelableEmpty[F[_], A](implicit F: Async[F]): F[MVar2[F, A]] =
F.delay(MVarAsync.empty)

/**
Expand All @@ -185,7 +207,7 @@ object MVar {
* @param F is a [[Concurrent]] constraint, needed in order to
* describe cancelable operations
*/
def of[F[_], A](initial: A)(implicit F: Concurrent[F]): F[MVar[F, A]] =
def of[F[_], A](initial: A)(implicit F: Concurrent[F]): F[MVar2[F, A]] =
F.delay(MVarConcurrent(initial))

/**
Expand All @@ -202,31 +224,31 @@ object MVar {
*
* @see [[of]] for creating cancelable MVars
*/
def uncancelableOf[F[_], A](initial: A)(implicit F: Async[F]): F[MVar[F, A]] =
def uncancelableOf[F[_], A](initial: A)(implicit F: Async[F]): F[MVar2[F, A]] =
F.delay(MVarAsync(initial))

/**
* Like [[of]] but initializes state using another effect constructor
*/
def in[F[_], G[_], A](initial: A)(implicit F: Sync[F], G: Concurrent[G]): F[MVar[G, A]] =
def in[F[_], G[_], A](initial: A)(implicit F: Sync[F], G: Concurrent[G]): F[MVar2[G, A]] =
F.delay(MVarConcurrent(initial))

/**
* Like [[empty]] but initializes state using another effect constructor
*/
def emptyIn[F[_], G[_], A](implicit F: Sync[F], G: Concurrent[G]): F[MVar[G, A]] =
def emptyIn[F[_], G[_], A](implicit F: Sync[F], G: Concurrent[G]): F[MVar2[G, A]] =
F.delay(MVarConcurrent.empty)

/**
* Like [[uncancelableOf]] but initializes state using another effect constructor
*/
def uncancelableIn[F[_], G[_], A](initial: A)(implicit F: Sync[F], G: Async[G]): F[MVar[G, A]] =
def uncancelableIn[F[_], G[_], A](initial: A)(implicit F: Sync[F], G: Async[G]): F[MVar2[G, A]] =
F.delay(MVarAsync(initial))

/**
* Like [[uncancelableEmpty]] but initializes state using another effect constructor
*/
def uncancelableEmptyIn[F[_], G[_], A](implicit F: Sync[F], G: Async[G]): F[MVar[G, A]] =
def uncancelableEmptyIn[F[_], G[_], A](implicit F: Sync[F], G: Async[G]): F[MVar2[G, A]] =
F.delay(MVarAsync.empty)

/**
Expand All @@ -238,15 +260,15 @@ object MVar {
*
* @see documentation for [[MVar.of]]
*/
def of[A](a: A): F[MVar[F, A]] =
def of[A](a: A): F[MVar2[F, A]] =
MVar.of(a)(F)

/**
* Builds an empty `MVar`.
*
* @see documentation for [[MVar.empty]]
*/
def empty[A]: F[MVar[F, A]] =
def empty[A]: F[MVar2[F, A]] =
MVar.empty(F)
}

Expand All @@ -259,4 +281,15 @@ object MVar {
override def tryTake: G[Option[A]] = trans(underlying.tryTake)
override def read: G[A] = trans(underlying.read)
}

final private[concurrent] class TransformedMVar2[F[_], G[_], A](underlying: MVar2[F, A], trans: F ~> G)
extends MVar2[G, A] {
override def isEmpty: G[Boolean] = trans(underlying.isEmpty)
override def put(a: A): G[Unit] = trans(underlying.put(a))
override def tryPut(a: A): G[Boolean] = trans(underlying.tryPut(a))
override def take: G[A] = trans(underlying.take)
override def tryTake: G[Option[A]] = trans(underlying.tryTake)
override def read: G[A] = trans(underlying.read)
override def tryRead: G[Option[A]] = trans(underlying.tryRead)
}
}
17 changes: 13 additions & 4 deletions core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@ package cats.effect
package internals

import java.util.concurrent.atomic.AtomicReference
import cats.effect.concurrent.MVar

import cats.effect.concurrent.MVar2
import cats.effect.internals.Callback.rightUnit

import scala.annotation.tailrec
import scala.collection.immutable.Queue

/**
* [[MVar]] implementation for [[Async]] data types.
*/
final private[effect] class MVarAsync[F[_], A] private (initial: MVarAsync.State[A])(implicit F: Async[F])
extends MVar[F, A] {
extends MVar2[F, A] {
import MVarAsync._

/** Shared mutable state. */
Expand Down Expand Up @@ -55,6 +57,13 @@ final private[effect] class MVarAsync[F[_], A] private (initial: MVarAsync.State
def read: F[A] =
F.async(unsafeRead)

def tryRead: F[Option[A]] = F.delay {
stateRef.get match {
case WaitForTake(value, _) => Some(value)
case WaitForPut(_, _) => None
}
}

def isEmpty: F[Boolean] =
F.delay {
stateRef.get match {
Expand Down Expand Up @@ -225,11 +234,11 @@ final private[effect] class MVarAsync[F[_], A] private (initial: MVarAsync.State

private[effect] object MVarAsync {
/** Builds an [[MVarAsync]] instance with an `initial` value. */
def apply[F[_], A](initial: A)(implicit F: Async[F]): MVar[F, A] =
def apply[F[_], A](initial: A)(implicit F: Async[F]): MVar2[F, A] =
new MVarAsync[F, A](State(initial))

/** Returns an empty [[MVarAsync]] instance. */
def empty[F[_], A](implicit F: Async[F]): MVar[F, A] =
def empty[F[_], A](implicit F: Async[F]): MVar2[F, A] =
new MVarAsync[F, A](State.empty)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@ package cats.effect
package internals

import java.util.concurrent.atomic.AtomicReference
import cats.effect.concurrent.MVar

import cats.effect.concurrent.MVar2
import cats.effect.internals.Callback.rightUnit

import scala.annotation.tailrec

/**
* [[MVar]] implementation for [[Concurrent]] data types.
*/
final private[effect] class MVarConcurrent[F[_], A] private (initial: MVarConcurrent.State[A])(
implicit F: Concurrent[F]
) extends MVar[F, A] {
) extends MVar2[F, A] {
import MVarConcurrent._

/** Shared mutable state. */
Expand Down Expand Up @@ -58,6 +60,14 @@ final private[effect] class MVarConcurrent[F[_], A] private (initial: MVarConcur
val read: F[A] =
F.cancelable(unsafeRead)

def tryRead =
F.delay {
stateRef.get match {
case WaitForTake(value, _) => Some(value)
case WaitForPut(_, _) => None
}
}

def isEmpty: F[Boolean] =
F.delay {
stateRef.get match {
Expand Down Expand Up @@ -278,11 +288,11 @@ final private[effect] class MVarConcurrent[F[_], A] private (initial: MVarConcur

private[effect] object MVarConcurrent {
/** Builds an [[MVarConcurrent]] instance with an `initial` value. */
def apply[F[_], A](initial: A)(implicit F: Concurrent[F]): MVar[F, A] =
def apply[F[_], A](initial: A)(implicit F: Concurrent[F]): MVar2[F, A] =
new MVarConcurrent[F, A](State(initial))

/** Returns an empty [[MVarConcurrent]] instance. */
def empty[F[_], A](implicit F: Concurrent[F]): MVar[F, A] =
def empty[F[_], A](implicit F: Concurrent[F]): MVar2[F, A] =
new MVarConcurrent[F, A](State.empty)

/**
Expand Down
35 changes: 25 additions & 10 deletions core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

class MVarConcurrentTests extends BaseMVarTests {
def init[A](a: A): IO[MVar[IO, A]] =
def init[A](a: A): IO[MVar2[IO, A]] =
MVar[IO].of(a)

def empty[A]: IO[MVar[IO, A]] =
def empty[A]: IO[MVar2[IO, A]] =
MVar[IO].empty[A]

test("put is cancelable") {
Expand Down Expand Up @@ -87,10 +87,10 @@ class MVarConcurrentTests extends BaseMVarTests {
}

class MVarAsyncTests extends BaseMVarTests {
def init[A](a: A): IO[MVar[IO, A]] =
def init[A](a: A): IO[MVar2[IO, A]] =
MVar.uncancelableOf(a)

def empty[A]: IO[MVar[IO, A]] =
def empty[A]: IO[MVar2[IO, A]] =
MVar.uncancelableEmpty
}

Expand All @@ -102,8 +102,8 @@ abstract class BaseMVarTests extends AsyncFunSuite with Matchers {
implicit val cs: ContextShift[IO] =
IO.contextShift(executionContext)

def init[A](a: A): IO[MVar[IO, A]]
def empty[A]: IO[MVar[IO, A]]
def init[A](a: A): IO[MVar2[IO, A]]
def empty[A]: IO[MVar2[IO, A]]

test("empty; put; take; put; take") {
val task = for {
Expand Down Expand Up @@ -232,6 +232,21 @@ abstract class BaseMVarTests extends AsyncFunSuite with Matchers {
}
}

test("empty; tryRead; read; put; tryRead; read") {
val task = for {
av <- empty[Int]
tryReadEmpty <- av.tryRead
read <- av.read.start
_ <- av.put(10)
tryReadContains <- av.tryRead
r <- read.join
} yield (tryReadEmpty, tryReadContains, r)

for (v <- task.unsafeToFuture()) yield {
v shouldBe ((None, Some(10), 10))
}
}

test("put(null) works") {
val task = empty[String].flatMap { mvar =>
mvar.put(null) *> mvar.read
Expand All @@ -243,7 +258,7 @@ abstract class BaseMVarTests extends AsyncFunSuite with Matchers {

test("producer-consumer parallel loop") {
// Signaling option, because we need to detect completion
type Channel[A] = MVar[IO, Option[A]]
type Channel[A] = MVar2[IO, Option[A]]

def producer(ch: Channel[Int], list: List[Int]): IO[Unit] =
list match {
Expand Down Expand Up @@ -281,7 +296,7 @@ abstract class BaseMVarTests extends AsyncFunSuite with Matchers {

test("stack overflow test") {
// Signaling option, because we need to detect completion
type Channel[A] = MVar[IO, Option[A]]
type Channel[A] = MVar2[IO, Option[A]]
val count = 10000

def consumer(ch: Channel[Int], sum: Long): IO[Long] =
Expand Down Expand Up @@ -313,7 +328,7 @@ abstract class BaseMVarTests extends AsyncFunSuite with Matchers {
}

test("take/put test is stack safe") {
def loop(n: Int, acc: Int)(ch: MVar[IO, Int]): IO[Int] =
def loop(n: Int, acc: Int)(ch: MVar2[IO, Int]): IO[Int] =
if (n <= 0) IO.pure(acc)
else
ch.take.flatMap { x =>
Expand All @@ -328,7 +343,7 @@ abstract class BaseMVarTests extends AsyncFunSuite with Matchers {
}
}

def testStackSequential(channel: MVar[IO, Int]): (Int, IO[Int], IO[Unit]) = {
def testStackSequential(channel: MVar2[IO, Int]): (Int, IO[Int], IO[Unit]) = {
val count = if (Platform.isJvm) 10000 else 5000

def readLoop(n: Int, acc: Int): IO[Int] =
Expand Down
Loading

0 comments on commit cdc6eb4

Please sign in to comment.