From f84cde7337aa6ee8356e32f1b4b19168afc94e28 Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Fri, 13 Nov 2020 13:34:07 +0000 Subject: [PATCH 01/16] Remove outdated comment --- .../src/test/scala/cats/effect/std/CyclicBarrierSpec.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala index 8b516d476b..96498d4f98 100644 --- a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala +++ b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala @@ -14,11 +14,6 @@ * limitations under the License. */ -/* - * These tests have been inspired by and adapted from `monix-catnap`'s `ConcurrentQueueSuite`, available at - * https://github.com/monix/monix/blob/series/3.x/monix-catnap/shared/src/test/scala/monix/catnap/ConcurrentQueueSuite.scala. - */ - package cats.effect package std From f20a2f6ed7bf262b6a22f3d98d6f47ec73bec785 Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Fri, 13 Nov 2020 14:02:34 +0000 Subject: [PATCH 02/16] Test renames --- .../cats/effect/std/CyclicBarrierSpec.scala | 56 +++++++++---------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala index 96498d4f98..d042af7923 100644 --- a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala +++ b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala @@ -35,9 +35,9 @@ class CyclicBarrierSpec extends BaseSpec { private def cyclicBarrierTests( name: String, - constructor: Int => IO[CyclicBarrier[IO]]): Fragments = { + newBarrier: Int => IO[CyclicBarrier[IO]]): Fragments = { s"$name - raise an exception when constructed with a negative capacity" in real { - val test = IO.defer(constructor(-1)).attempt + val test = IO.defer(newBarrier(-1)).attempt test.flatMap { res => IO { res must beLike { @@ -48,7 +48,7 @@ class CyclicBarrierSpec extends BaseSpec { } s"$name - raise an exception when constructed with zero capacity" in real { - val test = IO.defer(constructor(0)).attempt + val test = IO.defer(newBarrier(0)).attempt test.flatMap { res => IO { res must beLike { @@ -60,21 +60,21 @@ class CyclicBarrierSpec extends BaseSpec { s"$name - remaining when contructed" in real { for { - cb <- constructor(5) - awaiting <- cb.awaiting + barrier <- newBarrier(5) + awaiting <- barrier.awaiting _ <- IO(awaiting must beEqualTo(0)) - r <- cb.remaining + r <- barrier.remaining res <- IO(r must beEqualTo(5)) } yield res } s"$name - await releases all fibers" in real { for { - cb <- constructor(2) - f1 <- cb.await.start - f2 <- cb.await.start + barrier <- newBarrier(2) + f1 <- barrier.await.start + f2 <- barrier.await.start r <- (f1.joinAndEmbedNever, f2.joinAndEmbedNever).tupled - awaiting <- cb.awaiting + awaiting <- barrier.awaiting _ <- IO(awaiting must beEqualTo(0)) res <- IO(r must beEqualTo(((), ()))) } yield res @@ -82,8 +82,8 @@ class CyclicBarrierSpec extends BaseSpec { s"$name - await is blocking" in real { for { - cb <- constructor(2) - r <- cb.await.timeout(5.millis).attempt + barrier <- newBarrier(2) + r <- barrier.await.timeout(5.millis).attempt res <- IO(r must beLike { case Left(e) => e must haveClass[TimeoutException] }) @@ -92,12 +92,12 @@ class CyclicBarrierSpec extends BaseSpec { s"$name - await is cancelable" in real { for { - cb <- constructor(2) - f <- cb.await.start + barrier <- newBarrier(2) + f <- barrier.await.start _ <- IO.sleep(1.milli) _ <- f.cancel r <- f.join - awaiting <- cb.awaiting + awaiting <- barrier.awaiting _ <- IO(awaiting must beEqualTo(0)) res <- IO(r must beEqualTo(Outcome.Canceled())) } yield res @@ -105,15 +105,15 @@ class CyclicBarrierSpec extends BaseSpec { s"$name - reset once full" in real { for { - cb <- constructor(2) - f1 <- cb.await.start - f2 <- cb.await.start + barrier <- newBarrier(2) + f1 <- barrier.await.start + f2 <- barrier.await.start r <- (f1.joinAndEmbedNever, f2.joinAndEmbedNever).tupled _ <- IO(r must beEqualTo(((), ()))) //Should have reset at this point - awaiting <- cb.awaiting + awaiting <- barrier.awaiting _ <- IO(awaiting must beEqualTo(0)) - r <- cb.await.timeout(5.millis).attempt + r <- barrier.await.timeout(5.millis).attempt res <- IO(r must beLike { case Left(e) => e must haveClass[TimeoutException] }) @@ -122,15 +122,15 @@ class CyclicBarrierSpec extends BaseSpec { s"$name - clean up upon cancellation of await" in real { for { - cb <- constructor(2) + barrier <- newBarrier(2) //This should time out and reduce the current capacity to 0 again - _ <- cb.await.timeout(5.millis).attempt + _ <- barrier.await.timeout(5.millis).attempt //Therefore the capacity should only be 1 when this awaits so will block again - r <- cb.await.timeout(5.millis).attempt + r <- barrier.await.timeout(5.millis).attempt _ <- IO(r must beLike { case Left(e) => e must haveClass[TimeoutException] }) - awaiting <- cb.awaiting + awaiting <- barrier.awaiting res <- IO(awaiting must beEqualTo(0)) // } yield res } @@ -144,10 +144,10 @@ class CyclicBarrierSpec extends BaseSpec { val iterations = 100 val run = for { - cb <- constructor(2) - f <- cb.await.start - _ <- IO.race(cb.await, f.cancel) - awaiting <- cb.awaiting + barrier <- newBarrier(2) + f <- barrier.await.start + _ <- IO.race(barrier.await, f.cancel) + awaiting <- barrier.awaiting res <- IO(awaiting must beGreaterThanOrEqualTo(0)) } yield res From efd03c5c0162ec91426b371b61f5e2b691ac2ad7 Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Fri, 13 Nov 2020 14:21:07 +0000 Subject: [PATCH 03/16] Abstract assertions on failure type in CyclicBarrierSpec --- .../cats/effect/std/CyclicBarrierSpec.scala | 55 ++++++++----------- 1 file changed, 24 insertions(+), 31 deletions(-) diff --git a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala index d042af7923..5b0d14f96b 100644 --- a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala +++ b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala @@ -19,10 +19,12 @@ package std import cats.implicits._ import cats.arrow.FunctionK -import org.specs2.specification.core.Fragments - import scala.concurrent.duration._ + +import org.specs2.specification.core.Fragments +import org.specs2.matcher.Matcher import java.util.concurrent.TimeoutException +import scala.reflect.ClassTag class CyclicBarrierSpec extends BaseSpec { @@ -33,29 +35,26 @@ class CyclicBarrierSpec extends BaseSpec { CyclicBarrier.apply[IO](_).map(_.mapK(FunctionK.id))) } - private def cyclicBarrierTests( - name: String, - newBarrier: Int => IO[CyclicBarrier[IO]]): Fragments = { - s"$name - raise an exception when constructed with a negative capacity" in real { - val test = IO.defer(newBarrier(-1)).attempt - test.flatMap { res => + implicit class Fails(fa: IO[_]) { + def mustFailWith[E <: Throwable: ClassTag] = + fa.attempt.flatMap { res => IO { res must beLike { - case Left(e) => e must haveClass[IllegalArgumentException] + case Left(e) => e must haveClass[E] } } - } + } + } + + private def cyclicBarrierTests( + name: String, + newBarrier: Int => IO[CyclicBarrier[IO]]): Fragments = { + s"$name - raise an exception when constructed with a negative capacity" in real { + IO.defer(newBarrier(-1)).mustFailWith[IllegalArgumentException] } s"$name - raise an exception when constructed with zero capacity" in real { - val test = IO.defer(newBarrier(0)).attempt - test.flatMap { res => - IO { - res must beLike { - case Left(e) => e must haveClass[IllegalArgumentException] - } - } - } + IO.defer(newBarrier(0)).mustFailWith[IllegalArgumentException] } s"$name - remaining when contructed" in real { @@ -83,10 +82,8 @@ class CyclicBarrierSpec extends BaseSpec { s"$name - await is blocking" in real { for { barrier <- newBarrier(2) - r <- barrier.await.timeout(5.millis).attempt - res <- IO(r must beLike { - case Left(e) => e must haveClass[TimeoutException] - }) + r = barrier.await.timeout(5.millis) + res <- r.mustFailWith[TimeoutException] } yield res } @@ -113,10 +110,8 @@ class CyclicBarrierSpec extends BaseSpec { //Should have reset at this point awaiting <- barrier.awaiting _ <- IO(awaiting must beEqualTo(0)) - r <- barrier.await.timeout(5.millis).attempt - res <- IO(r must beLike { - case Left(e) => e must haveClass[TimeoutException] - }) + r = barrier.await.timeout(5.millis) + res <- r.mustFailWith[TimeoutException] } yield res } @@ -126,12 +121,10 @@ class CyclicBarrierSpec extends BaseSpec { //This should time out and reduce the current capacity to 0 again _ <- barrier.await.timeout(5.millis).attempt //Therefore the capacity should only be 1 when this awaits so will block again - r <- barrier.await.timeout(5.millis).attempt - _ <- IO(r must beLike { - case Left(e) => e must haveClass[TimeoutException] - }) + r = barrier.await.timeout(5.millis) + _ <- r.mustFailWith[TimeoutException] awaiting <- barrier.awaiting - res <- IO(awaiting must beEqualTo(0)) // + res <- IO(awaiting must beEqualTo(0)) } yield res } From 3185b673fdb9859ac2f16e07a704d801931d2743 Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Fri, 13 Nov 2020 14:38:16 +0000 Subject: [PATCH 04/16] Abstract over successful assertions --- .../cats/effect/std/CyclicBarrierSpec.scala | 57 ++++++++----------- 1 file changed, 25 insertions(+), 32 deletions(-) diff --git a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala index 5b0d14f96b..bb7d204fa5 100644 --- a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala +++ b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala @@ -22,7 +22,6 @@ import cats.arrow.FunctionK import scala.concurrent.duration._ import org.specs2.specification.core.Fragments -import org.specs2.matcher.Matcher import java.util.concurrent.TimeoutException import scala.reflect.ClassTag @@ -35,7 +34,7 @@ class CyclicBarrierSpec extends BaseSpec { CyclicBarrier.apply[IO](_).map(_.mapK(FunctionK.id))) } - implicit class Fails(fa: IO[_]) { + implicit class Assertions[A](fa: IO[A]) { def mustFailWith[E <: Throwable: ClassTag] = fa.attempt.flatMap { res => IO { @@ -43,6 +42,10 @@ class CyclicBarrierSpec extends BaseSpec { case Left(e) => e must haveClass[E] } } + } + + def mustEqual(a: A) = fa.flatMap { res => + IO(res must beEqualTo(a)) } } @@ -57,14 +60,11 @@ class CyclicBarrierSpec extends BaseSpec { IO.defer(newBarrier(0)).mustFailWith[IllegalArgumentException] } - s"$name - remaining when contructed" in real { - for { - barrier <- newBarrier(5) - awaiting <- barrier.awaiting - _ <- IO(awaiting must beEqualTo(0)) - r <- barrier.remaining - res <- IO(r must beEqualTo(5)) - } yield res + s"$name - remaining when constructed" in real { + newBarrier(5).flatMap { barrier => + barrier.awaiting.mustEqual(0) >> + barrier.remaining.mustEqual(5) + } } s"$name - await releases all fibers" in real { @@ -72,31 +72,28 @@ class CyclicBarrierSpec extends BaseSpec { barrier <- newBarrier(2) f1 <- barrier.await.start f2 <- barrier.await.start - r <- (f1.joinAndEmbedNever, f2.joinAndEmbedNever).tupled - awaiting <- barrier.awaiting - _ <- IO(awaiting must beEqualTo(0)) - res <- IO(r must beEqualTo(((), ()))) + r = (f1.joinAndEmbedNever, f2.joinAndEmbedNever).tupled + res <- r.mustEqual(((), ())) + _ <- barrier.awaiting.mustEqual(0) } yield res } + // TODO ticker here s"$name - await is blocking" in real { - for { - barrier <- newBarrier(2) - r = barrier.await.timeout(5.millis) - res <- r.mustFailWith[TimeoutException] - } yield res + newBarrier(2).flatMap { + _.await.timeout(100.millis).mustFailWith[TimeoutException] + } } + // TODO ticker here s"$name - await is cancelable" in real { for { barrier <- newBarrier(2) f <- barrier.await.start - _ <- IO.sleep(1.milli) + _ <- IO.sleep(100.millis) _ <- f.cancel - r <- f.join - awaiting <- barrier.awaiting - _ <- IO(awaiting must beEqualTo(0)) - res <- IO(r must beEqualTo(Outcome.Canceled())) + res <- f.join.mustEqual(Outcome.Canceled()) + _ <- barrier.awaiting.mustEqual(0) } yield res } @@ -108,10 +105,8 @@ class CyclicBarrierSpec extends BaseSpec { r <- (f1.joinAndEmbedNever, f2.joinAndEmbedNever).tupled _ <- IO(r must beEqualTo(((), ()))) //Should have reset at this point - awaiting <- barrier.awaiting - _ <- IO(awaiting must beEqualTo(0)) - r = barrier.await.timeout(5.millis) - res <- r.mustFailWith[TimeoutException] + _ <- barrier.awaiting.mustEqual(0) + res <- barrier.await.timeout(5.millis).mustFailWith[TimeoutException] } yield res } @@ -121,10 +116,8 @@ class CyclicBarrierSpec extends BaseSpec { //This should time out and reduce the current capacity to 0 again _ <- barrier.await.timeout(5.millis).attempt //Therefore the capacity should only be 1 when this awaits so will block again - r = barrier.await.timeout(5.millis) - _ <- r.mustFailWith[TimeoutException] - awaiting <- barrier.awaiting - res <- IO(awaiting must beEqualTo(0)) + _ <- barrier.await.timeout(5.millis).mustFailWith[TimeoutException] + res <- barrier.awaiting.mustEqual(0) } yield res } From fa4b5b2c7174b3f6444db79a2d3dcc6a2f3ad91d Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Fri, 13 Nov 2020 16:14:20 +0000 Subject: [PATCH 05/16] Change CyclicBarrier state machine to mirror latch --- .../scala/cats/effect/std/CyclicBarrier.scala | 79 +++++++++---------- 1 file changed, 37 insertions(+), 42 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala b/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala index 4aed746723..0e733f65d2 100644 --- a/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala +++ b/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala @@ -66,50 +66,45 @@ abstract class CyclicBarrier[F[_]] { self => } object CyclicBarrier { - def apply[F[_]](n: Int)(implicit F: GenConcurrent[F, _]): F[CyclicBarrier[F]] = - if (n < 1) + def apply[F[_]](capacity: Int)(implicit F: GenConcurrent[F, _]): F[CyclicBarrier[F]] = { + if (capacity < 1) throw new IllegalArgumentException( - s"Cyclic barrier constructed with capacity $n. Must be > 0") - else - for { - state <- State.initial[F] - ref <- F.ref(state) - } yield new ConcurrentCyclicBarrier(n, ref) - - private[std] class ConcurrentCyclicBarrier[F[_]](capacity: Int, state: Ref[F, State[F]])( - implicit F: GenConcurrent[F, _]) - extends CyclicBarrier[F] { - - val await: F[Unit] = - F.deferred[Unit].flatMap { newSignal => - F.uncancelable { poll => - state.modify { - case State(awaiting, epoch, signal) => - if (awaiting < capacity - 1) { - val cleanup = state.update { s => - if (epoch == s.epoch) - //The cyclic barrier hasn't been reset since the cancelled fiber start to await - s.copy(awaiting = s.awaiting - 1) - else s - } - - val nextState = State(awaiting + 1, epoch, signal) - (nextState, poll(signal.get).onCancel(cleanup)) - } else (State(0, epoch + 1, newSignal), signal.complete(()).void) - }.flatten + s"Cyclic barrier constructed with capacity $capacity. Must be > 0") + + case class State[F[_]](awaiting: Int, epoch: Long, unblock: Deferred[F, Unit]) + + F.deferred[Unit] + .map(gate => State(awaiting = capacity, epoch = 0, unblock = gate)) + .flatMap(F.ref) + .map { state => + new CyclicBarrier[F] { + val await: F[Unit] = + F.deferred[Unit].flatMap { gate => + F.uncancelable { poll => + state.modify { + case State(awaiting, epoch, unblock) => + val awaitingNow = awaiting - 1 + + if (awaitingNow == 0) + State(capacity, epoch + 1, gate) -> unblock.complete(()).void + else { + val newState = State(awaitingNow, epoch, unblock) + val cleanup = state.update { s => + if (s.epoch == epoch) + s.copy(awaiting = s.awaiting + 1) + else s + } + + newState -> poll(unblock.get).onCancel(cleanup) + } + + }.flatten + } + } + + val remaining: F[Int] = state.get.map(_.awaiting) + val awaiting: F[Int] = state.get.map(s => capacity - s.awaiting) } } - - val remaining: F[Int] = state.get.map(s => capacity - s.awaiting) - - val awaiting: F[Int] = state.get.map(_.awaiting) - - } - - private[std] case class State[F[_]](awaiting: Int, epoch: Long, signal: Deferred[F, Unit]) - - private[std] object State { - def initial[F[_]](implicit F: GenConcurrent[F, _]): F[State[F]] = - F.deferred[Unit].map { signal => State(0, 0, signal) } } } From a6d1d7f0883114a04b4cd989c96e2831f595008b Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Fri, 13 Nov 2020 23:41:23 +0000 Subject: [PATCH 06/16] Use non termination to test blocking in barrier --- .../test/scala/cats/effect/std/CyclicBarrierSpec.scala | 10 ++++------ .../src/main/scala/cats/effect/std/CyclicBarrier.scala | 2 +- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala index bb7d204fa5..838fd5cd97 100644 --- a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala +++ b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala @@ -60,6 +60,10 @@ class CyclicBarrierSpec extends BaseSpec { IO.defer(newBarrier(0)).mustFailWith[IllegalArgumentException] } + s"$name - await is blocking" in ticked { implicit ticker => + newBarrier(2).flatMap(_.await) must nonTerminate + } + s"$name - remaining when constructed" in real { newBarrier(5).flatMap { barrier => barrier.awaiting.mustEqual(0) >> @@ -78,12 +82,6 @@ class CyclicBarrierSpec extends BaseSpec { } yield res } - // TODO ticker here - s"$name - await is blocking" in real { - newBarrier(2).flatMap { - _.await.timeout(100.millis).mustFailWith[TimeoutException] - } - } // TODO ticker here s"$name - await is cancelable" in real { diff --git a/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala b/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala index 0e733f65d2..1ac78dbdef 100644 --- a/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala +++ b/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala @@ -74,7 +74,7 @@ object CyclicBarrier { case class State[F[_]](awaiting: Int, epoch: Long, unblock: Deferred[F, Unit]) F.deferred[Unit] - .map(gate => State(awaiting = capacity, epoch = 0, unblock = gate)) + .map(gate => State(capacity,0, gate)) .flatMap(F.ref) .map { state => new CyclicBarrier[F] { From 5c9a86a2e67ac1d937f02bb8f79f733e86af878b Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Fri, 13 Nov 2020 23:45:00 +0000 Subject: [PATCH 07/16] Use parMapN to test completion of barrier --- .../cats/effect/std/CyclicBarrierSpec.scala | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala index 838fd5cd97..e1f6602ec5 100644 --- a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala +++ b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala @@ -64,6 +64,12 @@ class CyclicBarrierSpec extends BaseSpec { newBarrier(2).flatMap(_.await) must nonTerminate } + s"$name - await releases all fibers" in real { + newBarrier(2).flatMap { barrier => + (barrier.await, barrier.await).parTupled.void.mustEqual(()) + } + } + s"$name - remaining when constructed" in real { newBarrier(5).flatMap { barrier => barrier.awaiting.mustEqual(0) >> @@ -71,16 +77,6 @@ class CyclicBarrierSpec extends BaseSpec { } } - s"$name - await releases all fibers" in real { - for { - barrier <- newBarrier(2) - f1 <- barrier.await.start - f2 <- barrier.await.start - r = (f1.joinAndEmbedNever, f2.joinAndEmbedNever).tupled - res <- r.mustEqual(((), ())) - _ <- barrier.awaiting.mustEqual(0) - } yield res - } // TODO ticker here From ef6662198d81c0f51b564f1384c9d17d0054dbcf Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Fri, 13 Nov 2020 23:54:26 +0000 Subject: [PATCH 08/16] Deterministically test cancelation of CyclicBarrier.await --- .../cats/effect/std/CyclicBarrierSpec.scala | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala index e1f6602ec5..822a899eec 100644 --- a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala +++ b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala @@ -70,6 +70,13 @@ class CyclicBarrierSpec extends BaseSpec { } } + s"$name - await is cancelable" in ticked { implicit ticker => + newBarrier(2) + .flatMap(_.await) + .timeoutTo(1.second, IO.unit) must completeAs(()) + } + + s"$name - remaining when constructed" in real { newBarrier(5).flatMap { barrier => barrier.awaiting.mustEqual(0) >> @@ -79,17 +86,6 @@ class CyclicBarrierSpec extends BaseSpec { - // TODO ticker here - s"$name - await is cancelable" in real { - for { - barrier <- newBarrier(2) - f <- barrier.await.start - _ <- IO.sleep(100.millis) - _ <- f.cancel - res <- f.join.mustEqual(Outcome.Canceled()) - _ <- barrier.awaiting.mustEqual(0) - } yield res - } s"$name - reset once full" in real { for { From 09fbc5938786af9f5460ce7cf62e35b7c6e53fcb Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Sat, 14 Nov 2020 00:09:43 +0000 Subject: [PATCH 09/16] Do not rely on knowing internal barrier count during tests --- .../cats/effect/std/CyclicBarrierSpec.scala | 56 ++++++++----------- 1 file changed, 22 insertions(+), 34 deletions(-) diff --git a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala index 822a899eec..90f1cdf89b 100644 --- a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala +++ b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala @@ -64,51 +64,32 @@ class CyclicBarrierSpec extends BaseSpec { newBarrier(2).flatMap(_.await) must nonTerminate } - s"$name - await releases all fibers" in real { - newBarrier(2).flatMap { barrier => - (barrier.await, barrier.await).parTupled.void.mustEqual(()) - } - } - s"$name - await is cancelable" in ticked { implicit ticker => newBarrier(2) .flatMap(_.await) .timeoutTo(1.second, IO.unit) must completeAs(()) } - - s"$name - remaining when constructed" in real { - newBarrier(5).flatMap { barrier => - barrier.awaiting.mustEqual(0) >> - barrier.remaining.mustEqual(5) + s"$name - await releases all fibers" in real { + newBarrier(2).flatMap { barrier => + (barrier.await, barrier.await).parTupled.void.mustEqual(()) } } - - - - s"$name - reset once full" in real { - for { - barrier <- newBarrier(2) - f1 <- barrier.await.start - f2 <- barrier.await.start - r <- (f1.joinAndEmbedNever, f2.joinAndEmbedNever).tupled - _ <- IO(r must beEqualTo(((), ()))) - //Should have reset at this point - _ <- barrier.awaiting.mustEqual(0) - res <- barrier.await.timeout(5.millis).mustFailWith[TimeoutException] - } yield res + s"$name - reset once full" in ticked { implicit ticker => + newBarrier(2).flatMap { barrier => + (barrier.await, barrier.await).parTupled >> + barrier.await + } must nonTerminate } - s"$name - clean up upon cancellation of await" in real { - for { - barrier <- newBarrier(2) - //This should time out and reduce the current capacity to 0 again - _ <- barrier.await.timeout(5.millis).attempt - //Therefore the capacity should only be 1 when this awaits so will block again - _ <- barrier.await.timeout(5.millis).mustFailWith[TimeoutException] - res <- barrier.awaiting.mustEqual(0) - } yield res + s"$name - clean up upon cancellation of await" in ticked { implicit ticker => + newBarrier(2).flatMap { barrier => + // This should time out, so count goes back to 2 + barrier.await.timeoutTo(1.second, IO.unit) >> + // Therefore count goes only down to 1 when this awaits, and will block again + barrier.await + } must nonTerminate } /* @@ -129,5 +110,12 @@ class CyclicBarrierSpec extends BaseSpec { List.fill(iterations)(run).reduce(_ >> _) } + + s"$name - remaining when constructed" in real { + newBarrier(5).flatMap { barrier => + barrier.awaiting.mustEqual(0) >> + barrier.remaining.mustEqual(5) + } + } } } From 7e3cfb6c2d17f9e3776928277d8aaa32b38b6c11 Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Sat, 14 Nov 2020 00:58:46 +0000 Subject: [PATCH 10/16] Cancelation race test does not depend on inspecting barrier state --- .../cats/effect/std/CyclicBarrierSpec.scala | 28 +++++++++++++------ .../scala/cats/effect/std/CyclicBarrier.scala | 1 + 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala index 90f1cdf89b..2244dc5a69 100644 --- a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala +++ b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala @@ -85,13 +85,19 @@ class CyclicBarrierSpec extends BaseSpec { s"$name - clean up upon cancellation of await" in ticked { implicit ticker => newBarrier(2).flatMap { barrier => - // This should time out, so count goes back to 2 + // This will time out, so count goes back to 2 barrier.await.timeoutTo(1.second, IO.unit) >> // Therefore count goes only down to 1 when this awaits, and will block again barrier.await } must nonTerminate } + s"$name - barrier of capacity 1 is a no op" in real { + newBarrier(1) + .flatMap(_.await) + .mustEqual(()) + } + /* * Original implementation in b31d5a486757f7793851814ec30e056b9c6e40b8 * had a race between cancellation of an awaiting fiber and @@ -100,13 +106,19 @@ class CyclicBarrierSpec extends BaseSpec { s"$name - race fiber cancel and barrier full" in real { val iterations = 100 - val run = for { - barrier <- newBarrier(2) - f <- barrier.await.start - _ <- IO.race(barrier.await, f.cancel) - awaiting <- barrier.awaiting - res <- IO(awaiting must beGreaterThanOrEqualTo(0)) - } yield res + val run = newBarrier(2).flatMap { barrier => + barrier.await.start.flatMap { fiber => + barrier.await.race(fiber.cancel).flatMap { + case Left(_) => + // without the epoch check in CyclicBarrier, + // a late cancelation would increment the count + // after the barrier has already reset, + // causing this code to never terminate (test times out) + (barrier.await, barrier.await).parTupled.void + case Right(_) => IO.unit + } + } + }.mustEqual(()) List.fill(iterations)(run).reduce(_ >> _) } diff --git a/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala b/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala index 1ac78dbdef..807690b5c1 100644 --- a/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala +++ b/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala @@ -90,6 +90,7 @@ object CyclicBarrier { else { val newState = State(awaitingNow, epoch, unblock) val cleanup = state.update { s => + // if the barrier has reset, do not modify the count if (s.epoch == epoch) s.copy(awaiting = s.awaiting + 1) else s From 08067cefc9ae67e651d20cacc60185ddcbfbf67d Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Sat, 14 Nov 2020 01:00:32 +0000 Subject: [PATCH 11/16] Remove unused imports --- .../src/test/scala/cats/effect/std/CyclicBarrierSpec.scala | 1 - std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala index 2244dc5a69..e0c96b59bc 100644 --- a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala +++ b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala @@ -22,7 +22,6 @@ import cats.arrow.FunctionK import scala.concurrent.duration._ import org.specs2.specification.core.Fragments -import java.util.concurrent.TimeoutException import scala.reflect.ClassTag class CyclicBarrierSpec extends BaseSpec { diff --git a/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala b/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala index 807690b5c1..68080e0a81 100644 --- a/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala +++ b/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala @@ -17,7 +17,7 @@ package cats.effect.std import cats.~> -import cats.effect.kernel.{Deferred, GenConcurrent, Ref} +import cats.effect.kernel.{Deferred, GenConcurrent} import cats.effect.kernel.syntax.all._ import cats.syntax.all._ From 7cf1d7cabcf1696d967e33bbf1bcf00315bdc4ff Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Sat, 14 Nov 2020 01:01:42 +0000 Subject: [PATCH 12/16] Remove methods to inspect CyclicBarrier internal count --- .../scala/cats/effect/std/CyclicBarrierSpec.scala | 7 ------- .../scala/cats/effect/std/CyclicBarrier.scala | 15 --------------- 2 files changed, 22 deletions(-) diff --git a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala index e0c96b59bc..17de545e14 100644 --- a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala +++ b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala @@ -121,12 +121,5 @@ class CyclicBarrierSpec extends BaseSpec { List.fill(iterations)(run).reduce(_ >> _) } - - s"$name - remaining when constructed" in real { - newBarrier(5).flatMap { barrier => - barrier.awaiting.mustEqual(0) >> - barrier.remaining.mustEqual(5) - } - } } } diff --git a/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala b/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala index 68080e0a81..7664e9b83e 100644 --- a/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala +++ b/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala @@ -39,16 +39,6 @@ abstract class CyclicBarrier[F[_]] { self => */ def await: F[Unit] - /* - * The number of fibers required to trip the barrier - */ - def remaining: F[Int] - - /* - * The number of fibers currently awaiting - */ - def awaiting: F[Int] - /** * Modifies the context in which this cyclic barrier is executed using the natural * transformation `f`. @@ -59,8 +49,6 @@ abstract class CyclicBarrier[F[_]] { self => def mapK[G[_]](f: F ~> G): CyclicBarrier[G] = new CyclicBarrier[G] { def await: G[Unit] = f(self.await) - def remaining: G[Int] = f(self.remaining) - def awaiting: G[Int] = f(self.awaiting) } } @@ -102,9 +90,6 @@ object CyclicBarrier { }.flatten } } - - val remaining: F[Int] = state.get.map(_.awaiting) - val awaiting: F[Int] = state.get.map(s => capacity - s.awaiting) } } } From f4102e47b3b50d1a32aa4def7e4d8b02a235f9c0 Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Sat, 14 Nov 2020 01:31:17 +0000 Subject: [PATCH 13/16] Formatting --- .../cats/effect/std/CyclicBarrierSpec.scala | 42 +++++++++---------- .../scala/cats/effect/std/CyclicBarrier.scala | 6 +-- 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala index 17de545e14..cf0cfee624 100644 --- a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala +++ b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala @@ -43,9 +43,7 @@ class CyclicBarrierSpec extends BaseSpec { } } - def mustEqual(a: A) = fa.flatMap { res => - IO(res must beEqualTo(a)) - } + def mustEqual(a: A) = fa.flatMap { res => IO(res must beEqualTo(a)) } } private def cyclicBarrierTests( @@ -64,9 +62,7 @@ class CyclicBarrierSpec extends BaseSpec { } s"$name - await is cancelable" in ticked { implicit ticker => - newBarrier(2) - .flatMap(_.await) - .timeoutTo(1.second, IO.unit) must completeAs(()) + newBarrier(2).flatMap(_.await).timeoutTo(1.second, IO.unit) must completeAs(()) } s"$name - await releases all fibers" in real { @@ -78,7 +74,7 @@ class CyclicBarrierSpec extends BaseSpec { s"$name - reset once full" in ticked { implicit ticker => newBarrier(2).flatMap { barrier => (barrier.await, barrier.await).parTupled >> - barrier.await + barrier.await } must nonTerminate } @@ -86,15 +82,13 @@ class CyclicBarrierSpec extends BaseSpec { newBarrier(2).flatMap { barrier => // This will time out, so count goes back to 2 barrier.await.timeoutTo(1.second, IO.unit) >> - // Therefore count goes only down to 1 when this awaits, and will block again - barrier.await + // Therefore count goes only down to 1 when this awaits, and will block again + barrier.await } must nonTerminate } s"$name - barrier of capacity 1 is a no op" in real { - newBarrier(1) - .flatMap(_.await) - .mustEqual(()) + newBarrier(1).flatMap(_.await).mustEqual(()) } /* @@ -105,19 +99,21 @@ class CyclicBarrierSpec extends BaseSpec { s"$name - race fiber cancel and barrier full" in real { val iterations = 100 - val run = newBarrier(2).flatMap { barrier => - barrier.await.start.flatMap { fiber => - barrier.await.race(fiber.cancel).flatMap { - case Left(_) => - // without the epoch check in CyclicBarrier, - // a late cancelation would increment the count - // after the barrier has already reset, - // causing this code to never terminate (test times out) - (barrier.await, barrier.await).parTupled.void - case Right(_) => IO.unit + val run = newBarrier(2) + .flatMap { barrier => + barrier.await.start.flatMap { fiber => + barrier.await.race(fiber.cancel).flatMap { + case Left(_) => + // without the epoch check in CyclicBarrier, + // a late cancelation would increment the count + // after the barrier has already reset, + // causing this code to never terminate (test times out) + (barrier.await, barrier.await).parTupled.void + case Right(_) => IO.unit + } } } - }.mustEqual(()) + .mustEqual(()) List.fill(iterations)(run).reduce(_ >> _) } diff --git a/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala b/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala index 7664e9b83e..3ba8eda79f 100644 --- a/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala +++ b/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala @@ -77,10 +77,10 @@ object CyclicBarrier { State(capacity, epoch + 1, gate) -> unblock.complete(()).void else { val newState = State(awaitingNow, epoch, unblock) + // reincrement count if this await gets canceled, + // but only if the barrier hasn't reset in the meantime val cleanup = state.update { s => - // if the barrier has reset, do not modify the count - if (s.epoch == epoch) - s.copy(awaiting = s.awaiting + 1) + if (s.epoch == epoch) s.copy(awaiting = s.awaiting + 1) else s } From 3a0feac3ec491e7254c9191ae5db7fd5cb4a316a Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Sat, 14 Nov 2020 16:59:29 +0000 Subject: [PATCH 14/16] More assertion helpers to Runner --- .../src/test/scala/cats/effect/Runners.scala | 17 +++++++++++++++++ .../cats/effect/std/CyclicBarrierSpec.scala | 14 -------------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/core/shared/src/test/scala/cats/effect/Runners.scala b/core/shared/src/test/scala/cats/effect/Runners.scala index ea223db83e..58055380b7 100644 --- a/core/shared/src/test/scala/cats/effect/Runners.scala +++ b/core/shared/src/test/scala/cats/effect/Runners.scala @@ -43,6 +43,7 @@ import scala.concurrent.{ TimeoutException } import scala.concurrent.duration._ +import scala.reflect.ClassTag import scala.util.Try import java.io.{ByteArrayOutputStream, PrintStream} @@ -272,6 +273,22 @@ trait Runners extends SpecificationLike with RunnersPlatform { outer => (oc eqv expected, s"${oc.show} !== ${expected.show}") } + // useful for tests in the `real` context + implicit class Assertions[A](fa: IO[A]) { + def mustFailWith[E <: Throwable: ClassTag] = + fa.attempt.flatMap { res => + IO { + res must beLike { + case Left(e) => e must haveClass[E] + } + } + } + + def mustEqual(a: A) = fa.flatMap { res => IO(res must beEqualTo(a)) } + } + + + def unsafeRun[A](ioa: IO[A])(implicit ticker: Ticker): Outcome[Option, Throwable, A] = try { var results: Outcome[Option, Throwable, A] = Outcome.Succeeded(None) diff --git a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala index cf0cfee624..244d7f19ec 100644 --- a/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala +++ b/core/shared/src/test/scala/cats/effect/std/CyclicBarrierSpec.scala @@ -22,7 +22,6 @@ import cats.arrow.FunctionK import scala.concurrent.duration._ import org.specs2.specification.core.Fragments -import scala.reflect.ClassTag class CyclicBarrierSpec extends BaseSpec { @@ -33,19 +32,6 @@ class CyclicBarrierSpec extends BaseSpec { CyclicBarrier.apply[IO](_).map(_.mapK(FunctionK.id))) } - implicit class Assertions[A](fa: IO[A]) { - def mustFailWith[E <: Throwable: ClassTag] = - fa.attempt.flatMap { res => - IO { - res must beLike { - case Left(e) => e must haveClass[E] - } - } - } - - def mustEqual(a: A) = fa.flatMap { res => IO(res must beEqualTo(a)) } - } - private def cyclicBarrierTests( name: String, newBarrier: Int => IO[CyclicBarrier[IO]]): Fragments = { From 501d5686175f79489709085d5889a562333126bb Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Sat, 14 Nov 2020 19:39:27 +0000 Subject: [PATCH 15/16] Formatting --- .../scala/cats/effect/std/CyclicBarrier.scala | 49 +++++++++---------- 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala b/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala index 3ba8eda79f..c4de61c7fd 100644 --- a/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala +++ b/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala @@ -61,36 +61,33 @@ object CyclicBarrier { case class State[F[_]](awaiting: Int, epoch: Long, unblock: Deferred[F, Unit]) - F.deferred[Unit] - .map(gate => State(capacity,0, gate)) - .flatMap(F.ref) - .map { state => - new CyclicBarrier[F] { - val await: F[Unit] = - F.deferred[Unit].flatMap { gate => - F.uncancelable { poll => - state.modify { - case State(awaiting, epoch, unblock) => - val awaitingNow = awaiting - 1 + F.deferred[Unit].map(State(capacity, 0, _)).flatMap(F.ref).map { state => + new CyclicBarrier[F] { + val await: F[Unit] = + F.deferred[Unit].flatMap { gate => + F.uncancelable { poll => + state.modify { + case State(awaiting, epoch, unblock) => + val awaitingNow = awaiting - 1 - if (awaitingNow == 0) - State(capacity, epoch + 1, gate) -> unblock.complete(()).void - else { - val newState = State(awaitingNow, epoch, unblock) - // reincrement count if this await gets canceled, - // but only if the barrier hasn't reset in the meantime - val cleanup = state.update { s => - if (s.epoch == epoch) s.copy(awaiting = s.awaiting + 1) - else s - } - - newState -> poll(unblock.get).onCancel(cleanup) + if (awaitingNow == 0) + State(capacity, epoch + 1, gate) -> unblock.complete(()).void + else { + val newState = State(awaitingNow, epoch, unblock) + // reincrement count if this await gets canceled, + // but only if the barrier hasn't reset in the meantime + val cleanup = state.update { s => + if (s.epoch == epoch) s.copy(awaiting = s.awaiting + 1) + else s } - }.flatten - } + newState -> poll(unblock.get).onCancel(cleanup) + } + + }.flatten } - } + } } + } } } From 20577221a4a327c85a2529d62e148a03c2413ef0 Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Sat, 14 Nov 2020 19:43:31 +0000 Subject: [PATCH 16/16] Remove redundant type parameter --- std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala b/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala index c4de61c7fd..bb20826240 100644 --- a/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala +++ b/std/shared/src/main/scala/cats/effect/std/CyclicBarrier.scala @@ -59,7 +59,7 @@ object CyclicBarrier { throw new IllegalArgumentException( s"Cyclic barrier constructed with capacity $capacity. Must be > 0") - case class State[F[_]](awaiting: Int, epoch: Long, unblock: Deferred[F, Unit]) + case class State(awaiting: Int, epoch: Long, unblock: Deferred[F, Unit]) F.deferred[Unit].map(State(capacity, 0, _)).flatMap(F.ref).map { state => new CyclicBarrier[F] {