From 92bc61e5e5458b6c55eb9b719ab55e16b0dfc74a Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sat, 10 Oct 2020 22:54:46 -0600 Subject: [PATCH 01/18] Added first cut of unsafeToFuture with Async --- .../scala/cats/effect/std/Dispatcher.scala | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 std/shared/src/main/scala/cats/effect/std/Dispatcher.scala diff --git a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala new file mode 100644 index 0000000000..59450a8050 --- /dev/null +++ b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala @@ -0,0 +1,65 @@ +/* + * Copyright 2020 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.std + +import cats.~> +import cats.effect.kernel.{Async, Resource, Sync} +import cats.effect.kernel.syntax.all._ +import cats.syntax.all._ + +import scala.concurrent.{Future, Promise} + +import java.util.concurrent.{Semaphore => JSemaphore} +import java.util.concurrent.atomic.AtomicReference + +object Dispatcher { + + def apply[F[_]: Async, A](unsafe: (F ~> Future) => F[A]): Resource[F, A] = + for { + invokeRef <- Resource.liftF(Sync[F].delay(new AtomicReference[F[Unit] => Unit])) + invokeLatch <- Resource.liftF(Sync[F].delay(new JSemaphore(1))) + _ <- Resource.liftF(Sync[F].delay(invokeLatch.acquire())) + + runner = { + val cont: F[F[Unit]] = Async[F] async_ { cb => + invokeRef.set(fu => cb(Right(fu))) + invokeLatch.release() + } + + cont.flatten + } + + _ <- runner.foreverM[Unit].background + + back <- Resource liftF { + def unsafeToFuture[E](fe: F[E]): Future[E] = { + val promise = Promise[E]() + + invokeLatch.acquire() + invokeRef.get() { + fe.flatMap(e => Sync[F].delay(promise.success(e))) + .onError { case t => Sync[F].delay(promise.failure(t)) } + .void + } + + promise.future + } + + unsafe(new (F ~> Future) { def apply[E](fe: F[E]) = unsafeToFuture(fe) }) + } + } yield back +} From ee6720957c862290db03642b0b6c27d963a62cee Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sun, 11 Oct 2020 11:05:55 -0600 Subject: [PATCH 02/18] Added some simple tests --- .../cats/effect/std/DispatcherSpec.scala | 52 +++++++++++++++++++ .../scala/cats/effect/std/Dispatcher.scala | 44 +++++++++++----- 2 files changed, 83 insertions(+), 13 deletions(-) create mode 100644 core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala diff --git a/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala b/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala new file mode 100644 index 0000000000..a487f74662 --- /dev/null +++ b/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala @@ -0,0 +1,52 @@ +/* + * Copyright 2020 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package std + +import cats.syntax.all._ + +class DispatcherSpec extends BaseSpec { + + "async dispatcher" should { + "run a synchronous IO" in real { + val ioa = IO(1).map(_ + 2) + val rec = Dispatcher[IO, Int](runner => IO.fromFuture(IO(runner.unsafeToFuture(ioa)))) + rec.use(i => IO(i mustEqual 3)) + } + + "run an asynchronous IO" in real { + val ioa = (IO(1) <* IO.cede).map(_ + 2) + val rec = Dispatcher[IO, Int](runner => IO.fromFuture(IO(runner.unsafeToFuture(ioa)))) + rec.use(i => IO(i mustEqual 3)) + } + + "run several IOs back to back" in real { + @volatile + var counter = 0 + val increment = IO(counter += 1) + + val num = 10 + + val rec = Dispatcher[IO, Unit] { runner => + val act = IO(runner.unsafeRunAndForget(increment)) + (0 until num).toList.traverse_(_ => act) + } + + rec.use(_ => IO(counter mustEqual num)) + } + } +} diff --git a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala index 59450a8050..d12be7cda7 100644 --- a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala +++ b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala @@ -28,8 +28,9 @@ import java.util.concurrent.atomic.AtomicReference object Dispatcher { - def apply[F[_]: Async, A](unsafe: (F ~> Future) => F[A]): Resource[F, A] = + def apply[F[_]: Async, A](unsafe: Runner[F] => F[A]): Resource[F, A] = for { + // TODO we can make this non-blocking if we encode an inline async queue invokeRef <- Resource.liftF(Sync[F].delay(new AtomicReference[F[Unit] => Unit])) invokeLatch <- Resource.liftF(Sync[F].delay(new JSemaphore(1))) _ <- Resource.liftF(Sync[F].delay(invokeLatch.acquire())) @@ -40,26 +41,43 @@ object Dispatcher { invokeLatch.release() } + // TODO spawn a fiber here to manage the runtime cont.flatten } _ <- runner.foreverM[Unit].background back <- Resource liftF { - def unsafeToFuture[E](fe: F[E]): Future[E] = { - val promise = Promise[E]() - - invokeLatch.acquire() - invokeRef.get() { - fe.flatMap(e => Sync[F].delay(promise.success(e))) - .onError { case t => Sync[F].delay(promise.failure(t)) } - .void - } + unsafe { + new Runner[F] { + def unsafeToFutureCancelable[E](fe: F[E]): (Future[E], () => Future[Unit]) = { + val promise = Promise[E]() - promise.future - } + invokeLatch.acquire() + invokeRef.get() { + fe.flatMap(e => Sync[F].delay(promise.success(e))) + .onError { case t => Sync[F].delay(promise.failure(t)) } + .void + } - unsafe(new (F ~> Future) { def apply[E](fe: F[E]) = unsafeToFuture(fe) }) + // TODO cancel token + (promise.future, () => Future.successful(())) + } + } + } } } yield back + + sealed trait Runner[F[_]] { + + def unsafeToFutureCancelable[A](fa: F[A]): (Future[A], () => Future[Unit]) + + def unsafeToFuture[A](fa: F[A]): Future[A] = + unsafeToFutureCancelable(fa)._1 + + def unsafeRunAndForget[A](fa: F[A]): Unit = { + unsafeToFutureCancelable(fa) + () + } + } } From d595201fae4c3f9e25cbd763b5b6a0620484333e Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sun, 11 Oct 2020 11:22:47 -0600 Subject: [PATCH 03/18] Added dispatcher platforms --- .../cats/effect/std/DispatcherSpec.scala | 2 +- .../cats/effect/std/DispatcherPlatform.scala | 23 +++++++++++ .../cats/effect/std/DispatcherPlatform.scala | 39 +++++++++++++++++++ .../scala/cats/effect/std/Dispatcher.scala | 4 +- 4 files changed, 65 insertions(+), 3 deletions(-) create mode 100644 std/js/src/main/scala/cats/effect/std/DispatcherPlatform.scala create mode 100644 std/jvm/src/main/scala/cats/effect/std/DispatcherPlatform.scala diff --git a/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala b/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala index a487f74662..e6c81e8a1c 100644 --- a/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala +++ b/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala @@ -43,7 +43,7 @@ class DispatcherSpec extends BaseSpec { val rec = Dispatcher[IO, Unit] { runner => val act = IO(runner.unsafeRunAndForget(increment)) - (0 until num).toList.traverse_(_ => act) + act.replicateA(num).void } rec.use(_ => IO(counter mustEqual num)) diff --git a/std/js/src/main/scala/cats/effect/std/DispatcherPlatform.scala b/std/js/src/main/scala/cats/effect/std/DispatcherPlatform.scala new file mode 100644 index 0000000000..e58fd4ce6d --- /dev/null +++ b/std/js/src/main/scala/cats/effect/std/DispatcherPlatform.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2020 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.std + +private trait DispatcherPlatform { this: Dispatcher.type => + + protected trait RunnerPlatform[F[_]] { this: Runner[F] => + } +} diff --git a/std/jvm/src/main/scala/cats/effect/std/DispatcherPlatform.scala b/std/jvm/src/main/scala/cats/effect/std/DispatcherPlatform.scala new file mode 100644 index 0000000000..ae2cbd4c89 --- /dev/null +++ b/std/jvm/src/main/scala/cats/effect/std/DispatcherPlatform.scala @@ -0,0 +1,39 @@ +/* + * Copyright 2020 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.std + +import scala.concurrent.{Await, TimeoutException} +import scala.concurrent.duration.Duration + +private trait DispatcherPlatform { this: Dispatcher.type => + + protected trait RunnerPlatform[F[_]] { this: Runner[F] => + + def unsafeRunSync[A](fa: F[A]): A = + unsafeRunTimed(fa, Duration.Inf) + + def unsafeRunTimed[A](fa: F[A], timeout: Duration): A = { + val (fut, cancel) = unsafeToFutureCancelable(fa) + try Await.result(fut, timeout) + catch { + case t: TimeoutException => + cancel() + throw t + } + } + } +} diff --git a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala index d12be7cda7..282ccb7ff1 100644 --- a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala +++ b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala @@ -26,7 +26,7 @@ import scala.concurrent.{Future, Promise} import java.util.concurrent.{Semaphore => JSemaphore} import java.util.concurrent.atomic.AtomicReference -object Dispatcher { +object Dispatcher extends DispatcherPlatform { def apply[F[_]: Async, A](unsafe: Runner[F] => F[A]): Resource[F, A] = for { @@ -68,7 +68,7 @@ object Dispatcher { } } yield back - sealed trait Runner[F[_]] { + sealed trait Runner[F[_]] extends RunnerPlatform[F] { def unsafeToFutureCancelable[A](fa: F[A]): (Future[A], () => Future[Unit]) From 270c87765a48a67522c145b7a5be2c72fe15ff86 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sun, 11 Oct 2020 21:51:30 -0600 Subject: [PATCH 04/18] Reimplemented dispatcher to be nonblocking and otherwise awesome --- .../cats/effect/std/DispatcherSpec.scala | 24 +++- .../scala/cats/effect/std/Dispatcher.scala | 122 +++++++++++++++--- 2 files changed, 124 insertions(+), 22 deletions(-) diff --git a/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala b/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala index e6c81e8a1c..2c37146e6b 100644 --- a/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala +++ b/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala @@ -17,6 +17,7 @@ package cats.effect package std +import cats.effect.kernel.Deferred import cats.syntax.all._ class DispatcherSpec extends BaseSpec { @@ -42,11 +43,30 @@ class DispatcherSpec extends BaseSpec { val num = 10 val rec = Dispatcher[IO, Unit] { runner => - val act = IO(runner.unsafeRunAndForget(increment)) - act.replicateA(num).void + IO.fromFuture(IO(runner.unsafeToFuture(increment))).replicateA(num).void } rec.use(_ => IO(counter mustEqual num)) } + + "run multiple IOs in parallel" in real { + val num = 10 + + for { + latches <- (0 until num).toList.traverse(_ => Deferred[IO, Unit]) + awaitAll = latches.parTraverse_(_.get) + + // engineer a deadlock: all subjects must be run in parallel or this will hang + subjects = latches.map(latch => latch.complete(()) >> awaitAll) + + result <- { + val rec = Dispatcher[IO, Unit] { runner => + subjects.parTraverse_(act => IO(runner.unsafeRunAndForget(act))) + } + + rec.use(_ => IO.unit) + } + } yield ok + } } } diff --git a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala index 282ccb7ff1..610f7eaa53 100644 --- a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala +++ b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala @@ -17,10 +17,12 @@ package cats.effect.std import cats.~> -import cats.effect.kernel.{Async, Resource, Sync} +import cats.effect.kernel.{Async, Fiber, Deferred, MonadCancel, Ref, Resource, Sync} import cats.effect.kernel.syntax.all._ import cats.syntax.all._ +import scala.annotation.tailrec +import scala.collection.immutable.LongMap import scala.concurrent.{Future, Promise} import java.util.concurrent.{Semaphore => JSemaphore} @@ -28,24 +30,63 @@ import java.util.concurrent.atomic.AtomicReference object Dispatcher extends DispatcherPlatform { - def apply[F[_]: Async, A](unsafe: Runner[F] => F[A]): Resource[F, A] = + def apply[F[_]: Async, A](unsafe: Runner[F] => F[A]): Resource[F, A] = { + final case class State(begin: Long, end: Long, registry: LongMap[(F[Unit], F[Unit] => Unit)]) { + // efficiency on the CAS + override def equals(that: Any) = this eq that.asInstanceOf[AnyRef] + override def hashCode = System.identityHashCode(this) + } + + val Open = () => () + val Empty = State(0, 0, LongMap()) + for { - // TODO we can make this non-blocking if we encode an inline async queue - invokeRef <- Resource.liftF(Sync[F].delay(new AtomicReference[F[Unit] => Unit])) - invokeLatch <- Resource.liftF(Sync[F].delay(new JSemaphore(1))) - _ <- Resource.liftF(Sync[F].delay(invokeLatch.acquire())) - - runner = { - val cont: F[F[Unit]] = Async[F] async_ { cb => - invokeRef.set(fu => cb(Right(fu))) - invokeLatch.release() - } + latch <- Resource.liftF(Sync[F].delay(new AtomicReference[() => Unit])) + state <- Resource.liftF(Sync[F].delay(new AtomicReference[State](Empty))) - // TODO spawn a fiber here to manage the runtime - cont.flatten + active <- Resource.make(Ref[F].of(Set[Fiber[F, Throwable, Unit]]())) { ref => + ref.get.flatMap(_.toList.traverse_(_.cancel)) } - _ <- runner.foreverM[Unit].background + dispatcher = for { + _ <- Sync[F].delay(latch.set(null)) // reset to null + s <- Sync[F].delay(state.getAndSet(Empty)) + + State(begin, end, registry) = s + pairs = (begin until end).toList.flatMap(registry.get) + + _ <- if (pairs.isEmpty) { + Async[F].async_[Unit] { cb => + if (!latch.compareAndSet(null, () => cb(Right(())))) { + // state was changed between when we last set the latch and now; complete the callback immediately + cb(Right(())) + } + } + } else { + MonadCancel[F] uncancelable { _ => + for { + fibers <- pairs traverse { + case (action, f) => + for { + fiberDef <- Deferred[F, Fiber[F, Throwable, Unit]] + + enriched = action guarantee { + fiberDef.get.flatMap(fiber => active.update(_ - fiber)) + } + + fiber <- enriched.start + _ <- fiberDef.complete(fiber) + _ <- Sync[F].delay(f(fiber.cancel)) + } yield fiber + } + + _ <- active.update(_ ++ fibers) + } yield () + } + } + } yield () + + _ <- dispatcher.foreverM[Unit].background back <- Resource liftF { unsafe { @@ -53,20 +94,61 @@ object Dispatcher extends DispatcherPlatform { def unsafeToFutureCancelable[E](fe: F[E]): (Future[E], () => Future[Unit]) = { val promise = Promise[E]() - invokeLatch.acquire() - invokeRef.get() { - fe.flatMap(e => Sync[F].delay(promise.success(e))) + val action = fe.flatMap(e => Sync[F].delay(promise.success(e))) .onError { case t => Sync[F].delay(promise.failure(t)) } .void + + @volatile + var cancelToken: F[Unit] = null.asInstanceOf[F[Unit]] + + def registerCancel(token: F[Unit]): Unit = + cancelToken = token + + @tailrec + def enqueue(): Long = { + val s @ State(_, end, registry) = state.get() + val registry2 = registry.updated(end, (action, registerCancel)) + + if (!state.compareAndSet(s, s.copy(end = end + 1, registry = registry2))) + enqueue() + else + end } - // TODO cancel token - (promise.future, () => Future.successful(())) + @tailrec + def dequeue(id: Long): Unit = { + val s @ State(_, _, registry) = state.get() + val registry2 = registry - id + + if (!state.compareAndSet(s, s.copy(registry = registry2))) { + dequeue(id) + } + } + + val id = enqueue() + + val f = latch.getAndSet(Open) + if (f != null) { + f() + } + + val cancel = { () => + dequeue(id) + + val token = cancelToken + if (token != null) + unsafeToFuture(token) + else + Future.unit + } + + (promise.future, cancel) } } } } } yield back + } sealed trait Runner[F[_]] extends RunnerPlatform[F] { From 9bf2efd5e3c139b361976364467207f370a4951f Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sun, 11 Oct 2020 21:57:06 -0600 Subject: [PATCH 05/18] Fixed compilation on Scala 2 --- .../src/test/scala/cats/effect/std/DispatcherSpec.scala | 2 +- .../src/main/scala/cats/effect/std/DispatcherPlatform.scala | 2 +- .../src/main/scala/cats/effect/std/DispatcherPlatform.scala | 2 +- std/shared/src/main/scala/cats/effect/std/Dispatcher.scala | 4 +--- 4 files changed, 4 insertions(+), 6 deletions(-) diff --git a/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala b/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala index 2c37146e6b..c3cd9af35b 100644 --- a/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala +++ b/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala @@ -59,7 +59,7 @@ class DispatcherSpec extends BaseSpec { // engineer a deadlock: all subjects must be run in parallel or this will hang subjects = latches.map(latch => latch.complete(()) >> awaitAll) - result <- { + _ <- { val rec = Dispatcher[IO, Unit] { runner => subjects.parTraverse_(act => IO(runner.unsafeRunAndForget(act))) } diff --git a/std/js/src/main/scala/cats/effect/std/DispatcherPlatform.scala b/std/js/src/main/scala/cats/effect/std/DispatcherPlatform.scala index e58fd4ce6d..9d75262a1d 100644 --- a/std/js/src/main/scala/cats/effect/std/DispatcherPlatform.scala +++ b/std/js/src/main/scala/cats/effect/std/DispatcherPlatform.scala @@ -16,7 +16,7 @@ package cats.effect.std -private trait DispatcherPlatform { this: Dispatcher.type => +private[std] trait DispatcherPlatform { this: Dispatcher.type => protected trait RunnerPlatform[F[_]] { this: Runner[F] => } diff --git a/std/jvm/src/main/scala/cats/effect/std/DispatcherPlatform.scala b/std/jvm/src/main/scala/cats/effect/std/DispatcherPlatform.scala index ae2cbd4c89..c0165da8d2 100644 --- a/std/jvm/src/main/scala/cats/effect/std/DispatcherPlatform.scala +++ b/std/jvm/src/main/scala/cats/effect/std/DispatcherPlatform.scala @@ -19,7 +19,7 @@ package cats.effect.std import scala.concurrent.{Await, TimeoutException} import scala.concurrent.duration.Duration -private trait DispatcherPlatform { this: Dispatcher.type => +private[std] trait DispatcherPlatform { this: Dispatcher.type => protected trait RunnerPlatform[F[_]] { this: Runner[F] => diff --git a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala index 610f7eaa53..758fb6ef76 100644 --- a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala +++ b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala @@ -16,7 +16,6 @@ package cats.effect.std -import cats.~> import cats.effect.kernel.{Async, Fiber, Deferred, MonadCancel, Ref, Resource, Sync} import cats.effect.kernel.syntax.all._ import cats.syntax.all._ @@ -25,7 +24,6 @@ import scala.annotation.tailrec import scala.collection.immutable.LongMap import scala.concurrent.{Future, Promise} -import java.util.concurrent.{Semaphore => JSemaphore} import java.util.concurrent.atomic.AtomicReference object Dispatcher extends DispatcherPlatform { @@ -107,7 +105,7 @@ object Dispatcher extends DispatcherPlatform { @tailrec def enqueue(): Long = { val s @ State(_, end, registry) = state.get() - val registry2 = registry.updated(end, (action, registerCancel)) + val registry2 = registry.updated(end, (action, registerCancel _)) if (!state.compareAndSet(s, s.copy(end = end + 1, registry = registry2))) enqueue() From bf4912c8d7be51b8c852c595d9d1c1affe1b4b70 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sun, 11 Oct 2020 22:09:08 -0600 Subject: [PATCH 06/18] Added a test for cancelation --- .../scala/cats/effect/std/DispatcherSpec.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala b/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala index c3cd9af35b..be4af1d144 100644 --- a/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala +++ b/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala @@ -20,6 +20,8 @@ package std import cats.effect.kernel.Deferred import cats.syntax.all._ +import scala.concurrent.duration._ + class DispatcherSpec extends BaseSpec { "async dispatcher" should { @@ -68,5 +70,17 @@ class DispatcherSpec extends BaseSpec { } } yield ok } + + "forward cancelation onto the inner action" in real { + var canceled = false + + val rec = Dispatcher[IO, Unit] { runner => + IO(runner.unsafeToFutureCancelable(IO.never.onCancel(IO { canceled = true }))._2) flatMap { ct => + IO.sleep(100.millis) >> IO.fromFuture(IO(ct())) + } + } + + rec.use(_ => IO(canceled must beTrue)) + } } } From 5116bf08ccbed7e6bb24bf1f24f07969b4850538 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Mon, 12 Oct 2020 08:33:33 -0600 Subject: [PATCH 07/18] Formatting --- .../src/test/scala/cats/effect/std/DispatcherSpec.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala b/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala index be4af1d144..f05680db0d 100644 --- a/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala +++ b/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala @@ -75,9 +75,11 @@ class DispatcherSpec extends BaseSpec { var canceled = false val rec = Dispatcher[IO, Unit] { runner => - IO(runner.unsafeToFutureCancelable(IO.never.onCancel(IO { canceled = true }))._2) flatMap { ct => - IO.sleep(100.millis) >> IO.fromFuture(IO(ct())) + val run = IO { + runner.unsafeToFutureCancelable(IO.never.onCancel(IO { canceled = true }))._2 } + + run.flatMap(ct => IO.sleep(100.millis) >> IO.fromFuture(IO(ct()))) } rec.use(_ => IO(canceled must beTrue)) From 0f1a4ed41a0ba73fbb9e557c9254ad329c92c10c Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Mon, 12 Oct 2020 09:02:24 -0600 Subject: [PATCH 08/18] Convinced scalafmt to apply to Dispatcher this time... --- .../cats/effect/std/DispatcherPlatform.scala | 3 +- .../scala/cats/effect/std/Dispatcher.scala | 65 ++++++++++--------- 2 files changed, 36 insertions(+), 32 deletions(-) diff --git a/std/js/src/main/scala/cats/effect/std/DispatcherPlatform.scala b/std/js/src/main/scala/cats/effect/std/DispatcherPlatform.scala index 9d75262a1d..62b1b12caf 100644 --- a/std/js/src/main/scala/cats/effect/std/DispatcherPlatform.scala +++ b/std/js/src/main/scala/cats/effect/std/DispatcherPlatform.scala @@ -18,6 +18,5 @@ package cats.effect.std private[std] trait DispatcherPlatform { this: Dispatcher.type => - protected trait RunnerPlatform[F[_]] { this: Runner[F] => - } + protected trait RunnerPlatform[F[_]] { this: Runner[F] => } } diff --git a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala index 758fb6ef76..7ec026c678 100644 --- a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala +++ b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala @@ -16,7 +16,7 @@ package cats.effect.std -import cats.effect.kernel.{Async, Fiber, Deferred, MonadCancel, Ref, Resource, Sync} +import cats.effect.kernel.{Async, Deferred, Fiber, MonadCancel, Ref, Resource, Sync} import cats.effect.kernel.syntax.all._ import cats.syntax.all._ @@ -29,7 +29,10 @@ import java.util.concurrent.atomic.AtomicReference object Dispatcher extends DispatcherPlatform { def apply[F[_]: Async, A](unsafe: Runner[F] => F[A]): Resource[F, A] = { - final case class State(begin: Long, end: Long, registry: LongMap[(F[Unit], F[Unit] => Unit)]) { + final case class State( + begin: Long, + end: Long, + registry: LongMap[(F[Unit], F[Unit] => Unit)]) { // efficiency on the CAS override def equals(that: Any) = this eq that.asInstanceOf[AnyRef] override def hashCode = System.identityHashCode(this) @@ -47,41 +50,42 @@ object Dispatcher extends DispatcherPlatform { } dispatcher = for { - _ <- Sync[F].delay(latch.set(null)) // reset to null + _ <- Sync[F].delay(latch.set(null)) // reset to null s <- Sync[F].delay(state.getAndSet(Empty)) State(begin, end, registry) = s pairs = (begin until end).toList.flatMap(registry.get) - _ <- if (pairs.isEmpty) { - Async[F].async_[Unit] { cb => - if (!latch.compareAndSet(null, () => cb(Right(())))) { - // state was changed between when we last set the latch and now; complete the callback immediately - cb(Right(())) - } - } - } else { - MonadCancel[F] uncancelable { _ => - for { - fibers <- pairs traverse { - case (action, f) => - for { - fiberDef <- Deferred[F, Fiber[F, Throwable, Unit]] - - enriched = action guarantee { - fiberDef.get.flatMap(fiber => active.update(_ - fiber)) - } - - fiber <- enriched.start - _ <- fiberDef.complete(fiber) - _ <- Sync[F].delay(f(fiber.cancel)) - } yield fiber + _ <- + if (pairs.isEmpty) { + Async[F].async_[Unit] { cb => + if (!latch.compareAndSet(null, () => cb(Right(())))) { + // state was changed between when we last set the latch and now; complete the callback immediately + cb(Right(())) } + } + } else { + MonadCancel[F] uncancelable { _ => + for { + fibers <- pairs traverse { + case (action, f) => + for { + fiberDef <- Deferred[F, Fiber[F, Throwable, Unit]] + + enriched = action guarantee { + fiberDef.get.flatMap(fiber => active.update(_ - fiber)) + } + + fiber <- enriched.start + _ <- fiberDef.complete(fiber) + _ <- Sync[F].delay(f(fiber.cancel)) + } yield fiber + } - _ <- active.update(_ ++ fibers) - } yield () + _ <- active.update(_ ++ fibers) + } yield () + } } - } } yield () _ <- dispatcher.foreverM[Unit].background @@ -92,7 +96,8 @@ object Dispatcher extends DispatcherPlatform { def unsafeToFutureCancelable[E](fe: F[E]): (Future[E], () => Future[Unit]) = { val promise = Promise[E]() - val action = fe.flatMap(e => Sync[F].delay(promise.success(e))) + val action = fe + .flatMap(e => Sync[F].delay(promise.success(e))) .onError { case t => Sync[F].delay(promise.failure(t)) } .void From 9cb567029179bab3798f5663aecaf52f76257f9b Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Wed, 14 Oct 2020 14:34:34 -0600 Subject: [PATCH 09/18] PR feedback --- .../cats/effect/std/DispatcherSpec.scala | 44 +++++- .../scala/cats/effect/std/Dispatcher.scala | 138 ++++++++++-------- 2 files changed, 116 insertions(+), 66 deletions(-) diff --git a/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala b/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala index f05680db0d..a9ce5b323b 100644 --- a/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala +++ b/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala @@ -27,13 +27,13 @@ class DispatcherSpec extends BaseSpec { "async dispatcher" should { "run a synchronous IO" in real { val ioa = IO(1).map(_ + 2) - val rec = Dispatcher[IO, Int](runner => IO.fromFuture(IO(runner.unsafeToFuture(ioa)))) + val rec = Dispatcher[IO, Int](runner => Resource.liftF(IO.fromFuture(IO(runner.unsafeToFuture(ioa))))) rec.use(i => IO(i mustEqual 3)) } "run an asynchronous IO" in real { val ioa = (IO(1) <* IO.cede).map(_ + 2) - val rec = Dispatcher[IO, Int](runner => IO.fromFuture(IO(runner.unsafeToFuture(ioa)))) + val rec = Dispatcher[IO, Int](runner => Resource.liftF(IO.fromFuture(IO(runner.unsafeToFuture(ioa))))) rec.use(i => IO(i mustEqual 3)) } @@ -45,7 +45,7 @@ class DispatcherSpec extends BaseSpec { val num = 10 val rec = Dispatcher[IO, Unit] { runner => - IO.fromFuture(IO(runner.unsafeToFuture(increment))).replicateA(num).void + Resource.liftF(IO.fromFuture(IO(runner.unsafeToFuture(increment))).replicateA(num).void) } rec.use(_ => IO(counter mustEqual num)) @@ -63,7 +63,7 @@ class DispatcherSpec extends BaseSpec { _ <- { val rec = Dispatcher[IO, Unit] { runner => - subjects.parTraverse_(act => IO(runner.unsafeRunAndForget(act))) + Resource.liftF(subjects.parTraverse_(act => IO(runner.unsafeRunAndForget(act)))) } rec.use(_ => IO.unit) @@ -79,10 +79,44 @@ class DispatcherSpec extends BaseSpec { runner.unsafeToFutureCancelable(IO.never.onCancel(IO { canceled = true }))._2 } - run.flatMap(ct => IO.sleep(100.millis) >> IO.fromFuture(IO(ct()))) + Resource liftF { + run.flatMap(ct => IO.sleep(100.millis) >> IO.fromFuture(IO(ct()))) + } } rec.use(_ => IO(canceled must beTrue)) } + + "cancel all inner effects when canceled" in real { + @volatile + var canceledA = false + @volatile + var canceledB = false + + val rec = Dispatcher[IO, Unit] { runner => + Resource liftF { + IO { + // these finalizers never return, so this test is intentionally designed to hang + // they flip their booleans first though; this is just testing that both run in parallel + val a = IO.never.onCancel(IO { canceledA = true } *> IO.never) + val b = IO.never.onCancel(IO { canceledB = true } *> IO.never) + + runner.unsafeRunAndForget(a) + runner.unsafeRunAndForget(b) + } + } + } + + for { + _ <- rec.use(_ => IO.sleep(50.millis)).start + _ <- IO.sleep(100.millis) // scope should be closed by now + + r <- IO { + // if we don't run the finalizers in parallel, one of these will be false + canceledA must beTrue + canceledB must beTrue + } + } yield r + } } } diff --git a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala index 7ec026c678..9d184c4e46 100644 --- a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala +++ b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala @@ -17,7 +17,7 @@ package cats.effect.std import cats.effect.kernel.{Async, Deferred, Fiber, MonadCancel, Ref, Resource, Sync} -import cats.effect.kernel.syntax.all._ +import cats.effect.kernel.implicits._ import cats.syntax.all._ import scala.annotation.tailrec @@ -28,36 +28,54 @@ import java.util.concurrent.atomic.AtomicReference object Dispatcher extends DispatcherPlatform { - def apply[F[_]: Async, A](unsafe: Runner[F] => F[A]): Resource[F, A] = { - final case class State( - begin: Long, - end: Long, - registry: LongMap[(F[Unit], F[Unit] => Unit)]) { - // efficiency on the CAS + private[this] val Open = () => () + + def apply[F[_]: Async, A](unsafe: Runner[F] => Resource[F, A]): Resource[F, A] = { + final case class Registration(action: F[Unit], prepareCancel: F[Unit] => Unit) + + final case class State(end: Long, registry: LongMap[Registration]) { + + /* + * We know we never need structural equality on this type. We do, however, + * perform a compare-and-swap relatively frequently, and that operation + * delegates to the `equals` implementation. I like having the case class + * for pattern matching and general convenience, so we simply override the + * equals/hashCode implementation to use pointer equality. + */ override def equals(that: Any) = this eq that.asInstanceOf[AnyRef] override def hashCode = System.identityHashCode(this) } - val Open = () => () - val Empty = State(0, 0, LongMap()) + val Empty = State(0, LongMap()) for { latch <- Resource.liftF(Sync[F].delay(new AtomicReference[() => Unit])) state <- Resource.liftF(Sync[F].delay(new AtomicReference[State](Empty))) active <- Resource.make(Ref[F].of(Set[Fiber[F, Throwable, Unit]]())) { ref => - ref.get.flatMap(_.toList.traverse_(_.cancel)) + ref.get.flatMap(_.toList.parTraverse_(_.cancel)) } dispatcher = for { _ <- Sync[F].delay(latch.set(null)) // reset to null - s <- Sync[F].delay(state.getAndSet(Empty)) - State(begin, end, registry) = s - pairs = (begin until end).toList.flatMap(registry.get) + s <- Sync[F] delay { + @tailrec + def loop(): State = { + val s = state.get() + if (!state.compareAndSet(s, s.copy(registry = s.registry.empty))) + loop() + else + s + } + + loop() + } + + State(end, registry) = s _ <- - if (pairs.isEmpty) { + if (registry.isEmpty) { Async[F].async_[Unit] { cb => if (!latch.compareAndSet(null, () => cb(Right(())))) { // state was changed between when we last set the latch and now; complete the callback immediately @@ -67,8 +85,8 @@ object Dispatcher extends DispatcherPlatform { } else { MonadCancel[F] uncancelable { _ => for { - fibers <- pairs traverse { - case (action, f) => + fibers <- registry.values.toList traverse { + case Registration(action, prepareCancel) => for { fiberDef <- Deferred[F, Fiber[F, Throwable, Unit]] @@ -78,7 +96,7 @@ object Dispatcher extends DispatcherPlatform { fiber <- enriched.start _ <- fiberDef.complete(fiber) - _ <- Sync[F].delay(f(fiber.cancel)) + _ <- Sync[F].delay(prepareCancel(fiber.cancel)) } yield fiber } @@ -90,63 +108,61 @@ object Dispatcher extends DispatcherPlatform { _ <- dispatcher.foreverM[Unit].background - back <- Resource liftF { - unsafe { - new Runner[F] { - def unsafeToFutureCancelable[E](fe: F[E]): (Future[E], () => Future[Unit]) = { - val promise = Promise[E]() + back <- unsafe { + new Runner[F] { + def unsafeToFutureCancelable[E](fe: F[E]): (Future[E], () => Future[Unit]) = { + val promise = Promise[E]() - val action = fe - .flatMap(e => Sync[F].delay(promise.success(e))) - .onError { case t => Sync[F].delay(promise.failure(t)) } - .void + val action = fe + .flatMap(e => Sync[F].delay(promise.success(e))) + .onError { case t => Sync[F].delay(promise.failure(t)) } + .void - @volatile - var cancelToken: F[Unit] = null.asInstanceOf[F[Unit]] + @volatile + var cancelToken: F[Unit] = null.asInstanceOf[F[Unit]] - def registerCancel(token: F[Unit]): Unit = - cancelToken = token + def registerCancel(token: F[Unit]): Unit = + cancelToken = token - @tailrec - def enqueue(): Long = { - val s @ State(_, end, registry) = state.get() - val registry2 = registry.updated(end, (action, registerCancel _)) + @tailrec + def enqueue(): Long = { + val s @ State(end, registry) = state.get() + val registry2 = registry.updated(end, Registration(action, registerCancel _)) - if (!state.compareAndSet(s, s.copy(end = end + 1, registry = registry2))) - enqueue() - else - end - } + if (!state.compareAndSet(s, State(end + 1, registry2))) + enqueue() + else + end + } - @tailrec - def dequeue(id: Long): Unit = { - val s @ State(_, _, registry) = state.get() - val registry2 = registry - id + @tailrec + def dequeue(id: Long): Unit = { + val s @ State(_, registry) = state.get() + val registry2 = registry - id - if (!state.compareAndSet(s, s.copy(registry = registry2))) { - dequeue(id) - } + if (!state.compareAndSet(s, s.copy(registry = registry2))) { + dequeue(id) } + } - val id = enqueue() - - val f = latch.getAndSet(Open) - if (f != null) { - f() - } + val id = enqueue() - val cancel = { () => - dequeue(id) + val f = latch.getAndSet(Open) + if (f != null) { + f() + } - val token = cancelToken - if (token != null) - unsafeToFuture(token) - else - Future.unit - } + val cancel = { () => + dequeue(id) - (promise.future, cancel) + val token = cancelToken + if (token != null) + unsafeToFuture(token) + else + Future.unit } + + (promise.future, cancel) } } } From 6c9cd9c7b89a7c80c9eff47d0cde8dd2c45904f2 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Wed, 14 Oct 2020 14:36:15 -0600 Subject: [PATCH 10/18] Slightly cleaner syntax --- .../scala/cats/effect/std/Dispatcher.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala index 9d184c4e46..4f8e9ad9be 100644 --- a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala +++ b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala @@ -16,7 +16,7 @@ package cats.effect.std -import cats.effect.kernel.{Async, Deferred, Fiber, MonadCancel, Ref, Resource, Sync} +import cats.effect.kernel.{Async, Deferred, Fiber, Ref, Resource} import cats.effect.kernel.implicits._ import cats.syntax.all._ @@ -30,7 +30,7 @@ object Dispatcher extends DispatcherPlatform { private[this] val Open = () => () - def apply[F[_]: Async, A](unsafe: Runner[F] => Resource[F, A]): Resource[F, A] = { + def apply[F[_], A](unsafe: Runner[F] => Resource[F, A])(implicit F: Async[F]): Resource[F, A] = { final case class Registration(action: F[Unit], prepareCancel: F[Unit] => Unit) final case class State(end: Long, registry: LongMap[Registration]) { @@ -49,17 +49,17 @@ object Dispatcher extends DispatcherPlatform { val Empty = State(0, LongMap()) for { - latch <- Resource.liftF(Sync[F].delay(new AtomicReference[() => Unit])) - state <- Resource.liftF(Sync[F].delay(new AtomicReference[State](Empty))) + latch <- Resource.liftF(F.delay(new AtomicReference[() => Unit])) + state <- Resource.liftF(F.delay(new AtomicReference[State](Empty))) active <- Resource.make(Ref[F].of(Set[Fiber[F, Throwable, Unit]]())) { ref => ref.get.flatMap(_.toList.parTraverse_(_.cancel)) } dispatcher = for { - _ <- Sync[F].delay(latch.set(null)) // reset to null + _ <- F.delay(latch.set(null)) // reset to null - s <- Sync[F] delay { + s <- F delay { @tailrec def loop(): State = { val s = state.get() @@ -76,14 +76,14 @@ object Dispatcher extends DispatcherPlatform { _ <- if (registry.isEmpty) { - Async[F].async_[Unit] { cb => + F.async_[Unit] { cb => if (!latch.compareAndSet(null, () => cb(Right(())))) { // state was changed between when we last set the latch and now; complete the callback immediately cb(Right(())) } } } else { - MonadCancel[F] uncancelable { _ => + F uncancelable { _ => for { fibers <- registry.values.toList traverse { case Registration(action, prepareCancel) => @@ -96,7 +96,7 @@ object Dispatcher extends DispatcherPlatform { fiber <- enriched.start _ <- fiberDef.complete(fiber) - _ <- Sync[F].delay(prepareCancel(fiber.cancel)) + _ <- F.delay(prepareCancel(fiber.cancel)) } yield fiber } @@ -114,8 +114,8 @@ object Dispatcher extends DispatcherPlatform { val promise = Promise[E]() val action = fe - .flatMap(e => Sync[F].delay(promise.success(e))) - .onError { case t => Sync[F].delay(promise.failure(t)) } + .flatMap(e => F.delay(promise.success(e))) + .onError { case t => F.delay(promise.failure(t)) } .void @volatile From 662575d4cb076e9a5042cc412942a415faf31a4c Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Wed, 14 Oct 2020 14:51:05 -0600 Subject: [PATCH 11/18] Revised active state management to eliminate Deferred and shut off race conditions --- .../scala/cats/effect/std/Dispatcher.scala | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala index 4f8e9ad9be..a288130707 100644 --- a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala +++ b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala @@ -16,7 +16,7 @@ package cats.effect.std -import cats.effect.kernel.{Async, Deferred, Fiber, Ref, Resource} +import cats.effect.kernel.{Async, Fiber, Resource} import cats.effect.kernel.implicits._ import cats.syntax.all._ @@ -52,8 +52,11 @@ object Dispatcher extends DispatcherPlatform { latch <- Resource.liftF(F.delay(new AtomicReference[() => Unit])) state <- Resource.liftF(F.delay(new AtomicReference[State](Empty))) - active <- Resource.make(Ref[F].of(Set[Fiber[F, Throwable, Unit]]())) { ref => - ref.get.flatMap(_.toList.parTraverse_(_.cancel)) + active <- Resource.make(F.ref(LongMap[Fiber[F, Throwable, Unit]]())) { active => + for { + fibers <- active.get + _ <- fibers.values.toList.parTraverse_(_.cancel) + } yield () } dispatcher = for { @@ -85,22 +88,26 @@ object Dispatcher extends DispatcherPlatform { } else { F uncancelable { _ => for { - fibers <- registry.values.toList traverse { - case Registration(action, prepareCancel) => - for { - fiberDef <- Deferred[F, Fiber[F, Throwable, Unit]] + // for catching race conditions where we finished before we were in the map + completed <- F.ref(LongMap[Unit]()) - enriched = action guarantee { - fiberDef.get.flatMap(fiber => active.update(_ - fiber)) - } + identifiedFibers <- registry.toList traverse { + case (id, Registration(action, prepareCancel)) => + val enriched = action guarantee { + completed.update(_.updated(id, ())) *> active.update(_ - id) + } + for { fiber <- enriched.start - _ <- fiberDef.complete(fiber) _ <- F.delay(prepareCancel(fiber.cancel)) - } yield fiber + } yield (id, fiber) } - _ <- active.update(_ ++ fibers) + _ <- active.update(_ ++ identifiedFibers) + + // some actions ran too fast, so we need to remove their fibers from the map + winners <- completed.get + _ <- active.update(_ -- winners.keys) } yield () } } From 35ba243e0bcc1e95770017ff9b821abd59ae22ba Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Wed, 14 Oct 2020 14:55:52 -0600 Subject: [PATCH 12/18] Resolve race condition where cancelToken could be missed --- .../scala/cats/effect/std/Dispatcher.scala | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala index a288130707..20774efde4 100644 --- a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala +++ b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala @@ -126,10 +126,20 @@ object Dispatcher extends DispatcherPlatform { .void @volatile - var cancelToken: F[Unit] = null.asInstanceOf[F[Unit]] + var cancelToken: () => Future[Unit] = null - def registerCancel(token: F[Unit]): Unit = - cancelToken = token + @volatile + var canceled = false + + def registerCancel(token: F[Unit]): Unit = { + cancelToken = () => unsafeToFuture(token) + + // double-check to resolve race condition here + if (canceled) { + cancelToken() + () + } + } @tailrec def enqueue(): Long = { @@ -160,11 +170,12 @@ object Dispatcher extends DispatcherPlatform { } val cancel = { () => + canceled = true dequeue(id) val token = cancelToken if (token != null) - unsafeToFuture(token) + token() else Future.unit } From 3fd496b73578bff8a1486923a072239cc557952b Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Wed, 14 Oct 2020 15:06:58 -0600 Subject: [PATCH 13/18] Handle error case where the runner is leaked --- .../cats/effect/std/DispatcherSpec.scala | 8 ++++ .../scala/cats/effect/std/Dispatcher.scala | 44 ++++++++++++------- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala b/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala index a9ce5b323b..0d95fa6878 100644 --- a/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala +++ b/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala @@ -118,5 +118,13 @@ class DispatcherSpec extends BaseSpec { } } yield r } + + "raise an error on leaked runner" in real { + Dispatcher[IO, Dispatcher.Runner[IO]](Resource.pure(_)).use(IO.pure(_)) flatMap { runner => + IO { + runner.unsafeRunAndForget(IO(ko)) must throwAn[IllegalStateException] + } + } + } } } diff --git a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala index 20774efde4..7b281bd7f6 100644 --- a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala +++ b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala @@ -24,7 +24,7 @@ import scala.annotation.tailrec import scala.collection.immutable.LongMap import scala.concurrent.{Future, Promise} -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} object Dispatcher extends DispatcherPlatform { @@ -52,6 +52,7 @@ object Dispatcher extends DispatcherPlatform { latch <- Resource.liftF(F.delay(new AtomicReference[() => Unit])) state <- Resource.liftF(F.delay(new AtomicReference[State](Empty))) + alive <- Resource.make(F.delay(new AtomicBoolean(true)))(ref => F.delay(ref.set(false))) active <- Resource.make(F.ref(LongMap[Fiber[F, Throwable, Unit]]())) { active => for { fibers <- active.get @@ -162,25 +163,36 @@ object Dispatcher extends DispatcherPlatform { } } - val id = enqueue() + if (alive.get()) { + val id = enqueue() - val f = latch.getAndSet(Open) - if (f != null) { - f() - } + val f = latch.getAndSet(Open) + if (f != null) { + f() + } - val cancel = { () => - canceled = true - dequeue(id) + val cancel = { () => + canceled = true + dequeue(id) - val token = cancelToken - if (token != null) - token() - else - Future.unit - } + val token = cancelToken + if (token != null) + token() + else + Future.unit + } - (promise.future, cancel) + // double-check after we already put things in the structure + if (alive.get()) { + (promise.future, cancel) + } else { + // we were shutdown *during* the enqueue + cancel() + throw new IllegalStateException("dispatcher already shutdown") + } + } else { + throw new IllegalStateException("dispatcher already shutdown") + } } } } From a8b98bd2b0acc28e9b246e43707f5d1ef84f885c Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Wed, 14 Oct 2020 15:07:58 -0600 Subject: [PATCH 14/18] Scalafmt --- .../scala/cats/effect/std/DispatcherSpec.scala | 17 ++++++++++------- .../main/scala/cats/effect/std/Dispatcher.scala | 3 ++- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala b/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala index 0d95fa6878..7312cfdd52 100644 --- a/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala +++ b/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala @@ -27,13 +27,15 @@ class DispatcherSpec extends BaseSpec { "async dispatcher" should { "run a synchronous IO" in real { val ioa = IO(1).map(_ + 2) - val rec = Dispatcher[IO, Int](runner => Resource.liftF(IO.fromFuture(IO(runner.unsafeToFuture(ioa))))) + val rec = Dispatcher[IO, Int](runner => + Resource.liftF(IO.fromFuture(IO(runner.unsafeToFuture(ioa))))) rec.use(i => IO(i mustEqual 3)) } "run an asynchronous IO" in real { val ioa = (IO(1) <* IO.cede).map(_ + 2) - val rec = Dispatcher[IO, Int](runner => Resource.liftF(IO.fromFuture(IO(runner.unsafeToFuture(ioa))))) + val rec = Dispatcher[IO, Int](runner => + Resource.liftF(IO.fromFuture(IO(runner.unsafeToFuture(ioa))))) rec.use(i => IO(i mustEqual 3)) } @@ -109,7 +111,7 @@ class DispatcherSpec extends BaseSpec { for { _ <- rec.use(_ => IO.sleep(50.millis)).start - _ <- IO.sleep(100.millis) // scope should be closed by now + _ <- IO.sleep(100.millis) // scope should be closed by now r <- IO { // if we don't run the finalizers in parallel, one of these will be false @@ -120,10 +122,11 @@ class DispatcherSpec extends BaseSpec { } "raise an error on leaked runner" in real { - Dispatcher[IO, Dispatcher.Runner[IO]](Resource.pure(_)).use(IO.pure(_)) flatMap { runner => - IO { - runner.unsafeRunAndForget(IO(ko)) must throwAn[IllegalStateException] - } + Dispatcher[IO, Dispatcher.Runner[IO]](Resource.pure(_)).use(IO.pure(_)) flatMap { + runner => + IO { + runner.unsafeRunAndForget(IO(ko)) must throwAn[IllegalStateException] + } } } } diff --git a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala index 7b281bd7f6..43bceeaefd 100644 --- a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala +++ b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala @@ -30,7 +30,8 @@ object Dispatcher extends DispatcherPlatform { private[this] val Open = () => () - def apply[F[_], A](unsafe: Runner[F] => Resource[F, A])(implicit F: Async[F]): Resource[F, A] = { + def apply[F[_], A](unsafe: Runner[F] => Resource[F, A])( + implicit F: Async[F]): Resource[F, A] = { final case class Registration(action: F[Unit], prepareCancel: F[Unit] => Unit) final case class State(end: Long, registry: LongMap[Registration]) { From 80f5cc51fb036d7ff367fd6d1f9d887cf5962317 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Wed, 14 Oct 2020 15:49:49 -0600 Subject: [PATCH 15/18] Removed UnsafeRun --- .../js/src/main/scala/cats/effect/IOApp.scala | 2 - .../src/main/scala/cats/effect/IOApp.scala | 2 - .../src/main/scala/cats/effect/IO.scala | 3 - .../scala/cats/effect/unsafe/IORuntime.scala | 16 +----- .../cats/effect/unsafe/UnsafeRunSpec.scala | 57 ------------------- .../effect/unsafe/UnsafeRunPlatform.scala | 19 ------- .../effect/unsafe/UnsafeRunPlatform.scala | 36 ------------ .../scala/cats/effect/unsafe/UnsafeRun.scala | 32 ----------- 8 files changed, 1 insertion(+), 166 deletions(-) delete mode 100644 core/shared/src/test/scala/cats/effect/unsafe/UnsafeRunSpec.scala delete mode 100644 kernel/js/src/main/scala/cats/effect/unsafe/UnsafeRunPlatform.scala delete mode 100644 kernel/jvm/src/main/scala/cats/effect/unsafe/UnsafeRunPlatform.scala delete mode 100644 kernel/shared/src/main/scala/cats/effect/unsafe/UnsafeRun.scala diff --git a/core/js/src/main/scala/cats/effect/IOApp.scala b/core/js/src/main/scala/cats/effect/IOApp.scala index dd995b7c12..937fef5615 100644 --- a/core/js/src/main/scala/cats/effect/IOApp.scala +++ b/core/js/src/main/scala/cats/effect/IOApp.scala @@ -25,8 +25,6 @@ trait IOApp { protected val runtime: unsafe.IORuntime = unsafe.IORuntime.global - protected implicit val unsafeRunForIO: unsafe.UnsafeRun[IO] = runtime.unsafeRunForIO - final def main(args: Array[String]): Unit = { // An infinite heartbeat to keep main alive. This is similar to // `IO.never`, except `IO.never` doesn't schedule any tasks and is diff --git a/core/jvm/src/main/scala/cats/effect/IOApp.scala b/core/jvm/src/main/scala/cats/effect/IOApp.scala index 46b4229632..5fa47034c3 100644 --- a/core/jvm/src/main/scala/cats/effect/IOApp.scala +++ b/core/jvm/src/main/scala/cats/effect/IOApp.scala @@ -24,8 +24,6 @@ trait IOApp { protected val runtime: unsafe.IORuntime = unsafe.IORuntime.global - protected implicit val unsafeRunForIO: unsafe.UnsafeRun[IO] = runtime.unsafeRunForIO - final def main(args: Array[String]): Unit = { val rt = Runtime.getRuntime() diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index d4fa45ed40..40b8b21060 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -556,9 +556,6 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits { implicit def asyncForIO: kernel.Async[IO] = _asyncForIO - implicit def unsafeRunForIO(implicit runtime: unsafe.IORuntime): unsafe.UnsafeRun[IO] = - runtime.unsafeRunForIO - private[this] val _parallelForIO: Parallel.Aux[IO, ParallelF[IO, *]] = parallelForGenSpawn[IO, Throwable] diff --git a/core/shared/src/main/scala/cats/effect/unsafe/IORuntime.scala b/core/shared/src/main/scala/cats/effect/unsafe/IORuntime.scala index 2b9dd6e8a1..28c10cb645 100644 --- a/core/shared/src/main/scala/cats/effect/unsafe/IORuntime.scala +++ b/core/shared/src/main/scala/cats/effect/unsafe/IORuntime.scala @@ -17,7 +17,7 @@ package cats.effect package unsafe -import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.concurrent.ExecutionContext @annotation.implicitNotFound("""Could not find an implicit IORuntime. @@ -39,20 +39,6 @@ final class IORuntime private[effect] ( val scheduler: Scheduler, val shutdown: () => Unit) { - private implicit val self: IORuntime = this - - val unsafeRunForIO: UnsafeRun[IO] = - new UnsafeRun[IO] { - def unsafeRunFutureCancelable[A](fa: IO[A]): (Future[A], () => Future[Unit]) = { - val p = Promise[A]() - val fiber = fa.unsafeRunFiber(true) { - case Left(t) => p.failure(t) - case Right(a) => p.success(a) - } - (p.future, () => fiber.cancel.unsafeToFuture()) - } - } - override def toString: String = s"IORuntime($compute, $scheduler)" } diff --git a/core/shared/src/test/scala/cats/effect/unsafe/UnsafeRunSpec.scala b/core/shared/src/test/scala/cats/effect/unsafe/UnsafeRunSpec.scala deleted file mode 100644 index 4027f73642..0000000000 --- a/core/shared/src/test/scala/cats/effect/unsafe/UnsafeRunSpec.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright 2020 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cats.effect -package unsafe - -class UnsafeRunSpec extends BaseSpec { - - "unsafe run" should { - "be able to execute a synchronous IO" in real { - var i = 42 - - val io = IO { - i += 1 - i - } - - val (future, _) = runtime().unsafeRunForIO.unsafeRunFutureCancelable(io) - IO.fromFuture(IO(future)).map(_ must beEqualTo(43)) - } - - "be able to execute an asynchronous IO" in real { - val io = IO.async_[Int](cb => cb(Right(42))) - - val (future, _) = runtime().unsafeRunForIO.unsafeRunFutureCancelable(io) - IO.fromFuture(IO(future)).map(_ must beEqualTo(42)) - } - - "be able to cancel the execution of an IO" in real { - var canceled = false - - val io = IO - .never - .onCancel(IO { - canceled = true - }) - - val (_, canceler) = runtime().unsafeRunForIO.unsafeRunFutureCancelable(io) - val future = canceler() - IO.fromFuture(IO(future)) - .map(_ => { val capture = canceled; capture must beEqualTo(true) }) - } - } -} diff --git a/kernel/js/src/main/scala/cats/effect/unsafe/UnsafeRunPlatform.scala b/kernel/js/src/main/scala/cats/effect/unsafe/UnsafeRunPlatform.scala deleted file mode 100644 index b81f58d3c8..0000000000 --- a/kernel/js/src/main/scala/cats/effect/unsafe/UnsafeRunPlatform.scala +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright 2020 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cats.effect.unsafe - -private[unsafe] trait UnsafeRunPlatform[F[_]] {} diff --git a/kernel/jvm/src/main/scala/cats/effect/unsafe/UnsafeRunPlatform.scala b/kernel/jvm/src/main/scala/cats/effect/unsafe/UnsafeRunPlatform.scala deleted file mode 100644 index fecdea0158..0000000000 --- a/kernel/jvm/src/main/scala/cats/effect/unsafe/UnsafeRunPlatform.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2020 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cats.effect.unsafe - -import scala.concurrent.{Await, TimeoutException} -import scala.concurrent.duration.Duration - -private[unsafe] trait UnsafeRunPlatform[F[_]] { self: UnsafeRun[F] => - - def unsafeRunSync[A](fa: F[A]): A = - unsafeRunTimed(fa, Duration.Inf) - - def unsafeRunTimed[A](fa: F[A], timeout: Duration): A = { - val (fut, cancel) = unsafeRunFutureCancelable(fa) - try Await.result(fut, timeout) - catch { - case t: TimeoutException => - cancel() - throw t - } - } -} diff --git a/kernel/shared/src/main/scala/cats/effect/unsafe/UnsafeRun.scala b/kernel/shared/src/main/scala/cats/effect/unsafe/UnsafeRun.scala deleted file mode 100644 index ebf0b4aa22..0000000000 --- a/kernel/shared/src/main/scala/cats/effect/unsafe/UnsafeRun.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2020 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cats.effect.unsafe - -import scala.concurrent.Future - -trait UnsafeRun[F[_]] extends UnsafeRunPlatform[F] { - def unsafeRunFutureCancelable[A](fa: F[A]): (Future[A], () => Future[Unit]) - - def unsafeRunAndForget[A](fa: F[A]): Unit = { - unsafeRunFutureCancelable(fa) - () - } -} - -object UnsafeRun { - def apply[F[_]](implicit F: UnsafeRun[F]): F.type = F -} From 381bfc8cb42d68f7c36d99d2565d0b6945bc18fb Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sat, 17 Oct 2020 16:24:13 -0600 Subject: [PATCH 16/18] Removed unnecessary nesting --- .../cats/effect/std/DispatcherSpec.scala | 21 ++- .../scala/cats/effect/std/Dispatcher.scala | 121 +++++++++--------- 2 files changed, 69 insertions(+), 73 deletions(-) diff --git a/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala b/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala index 7312cfdd52..438bd24518 100644 --- a/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala +++ b/core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala @@ -27,14 +27,14 @@ class DispatcherSpec extends BaseSpec { "async dispatcher" should { "run a synchronous IO" in real { val ioa = IO(1).map(_ + 2) - val rec = Dispatcher[IO, Int](runner => + val rec = Dispatcher[IO].flatMap(runner => Resource.liftF(IO.fromFuture(IO(runner.unsafeToFuture(ioa))))) rec.use(i => IO(i mustEqual 3)) } "run an asynchronous IO" in real { val ioa = (IO(1) <* IO.cede).map(_ + 2) - val rec = Dispatcher[IO, Int](runner => + val rec = Dispatcher[IO].flatMap(runner => Resource.liftF(IO.fromFuture(IO(runner.unsafeToFuture(ioa))))) rec.use(i => IO(i mustEqual 3)) } @@ -46,7 +46,7 @@ class DispatcherSpec extends BaseSpec { val num = 10 - val rec = Dispatcher[IO, Unit] { runner => + val rec = Dispatcher[IO] flatMap { runner => Resource.liftF(IO.fromFuture(IO(runner.unsafeToFuture(increment))).replicateA(num).void) } @@ -64,7 +64,7 @@ class DispatcherSpec extends BaseSpec { subjects = latches.map(latch => latch.complete(()) >> awaitAll) _ <- { - val rec = Dispatcher[IO, Unit] { runner => + val rec = Dispatcher[IO] flatMap { runner => Resource.liftF(subjects.parTraverse_(act => IO(runner.unsafeRunAndForget(act)))) } @@ -76,7 +76,7 @@ class DispatcherSpec extends BaseSpec { "forward cancelation onto the inner action" in real { var canceled = false - val rec = Dispatcher[IO, Unit] { runner => + val rec = Dispatcher[IO] flatMap { runner => val run = IO { runner.unsafeToFutureCancelable(IO.never.onCancel(IO { canceled = true }))._2 } @@ -95,7 +95,7 @@ class DispatcherSpec extends BaseSpec { @volatile var canceledB = false - val rec = Dispatcher[IO, Unit] { runner => + val rec = Dispatcher[IO] flatMap { runner => Resource liftF { IO { // these finalizers never return, so this test is intentionally designed to hang @@ -122,11 +122,10 @@ class DispatcherSpec extends BaseSpec { } "raise an error on leaked runner" in real { - Dispatcher[IO, Dispatcher.Runner[IO]](Resource.pure(_)).use(IO.pure(_)) flatMap { - runner => - IO { - runner.unsafeRunAndForget(IO(ko)) must throwAn[IllegalStateException] - } + Dispatcher[IO].use(IO.pure(_)) flatMap { runner => + IO { + runner.unsafeRunAndForget(IO(ko)) must throwAn[IllegalStateException] + } } } } diff --git a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala index 43bceeaefd..26ef8dfc8f 100644 --- a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala +++ b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala @@ -30,8 +30,7 @@ object Dispatcher extends DispatcherPlatform { private[this] val Open = () => () - def apply[F[_], A](unsafe: Runner[F] => Resource[F, A])( - implicit F: Async[F]): Resource[F, A] = { + def apply[F[_]](implicit F: Async[F]): Resource[F, Runner[F]] = { final case class Registration(action: F[Unit], prepareCancel: F[Unit] => Unit) final case class State(end: Long, registry: LongMap[Registration]) { @@ -116,88 +115,86 @@ object Dispatcher extends DispatcherPlatform { } yield () _ <- dispatcher.foreverM[Unit].background + } yield { + new Runner[F] { + def unsafeToFutureCancelable[E](fe: F[E]): (Future[E], () => Future[Unit]) = { + val promise = Promise[E]() - back <- unsafe { - new Runner[F] { - def unsafeToFutureCancelable[E](fe: F[E]): (Future[E], () => Future[Unit]) = { - val promise = Promise[E]() + val action = fe + .flatMap(e => F.delay(promise.success(e))) + .onError { case t => F.delay(promise.failure(t)) } + .void - val action = fe - .flatMap(e => F.delay(promise.success(e))) - .onError { case t => F.delay(promise.failure(t)) } - .void + @volatile + var cancelToken: () => Future[Unit] = null - @volatile - var cancelToken: () => Future[Unit] = null + @volatile + var canceled = false - @volatile - var canceled = false + def registerCancel(token: F[Unit]): Unit = { + cancelToken = () => unsafeToFuture(token) - def registerCancel(token: F[Unit]): Unit = { - cancelToken = () => unsafeToFuture(token) - - // double-check to resolve race condition here - if (canceled) { - cancelToken() - () - } + // double-check to resolve race condition here + if (canceled) { + cancelToken() + () } + } - @tailrec - def enqueue(): Long = { - val s @ State(end, registry) = state.get() - val registry2 = registry.updated(end, Registration(action, registerCancel _)) + @tailrec + def enqueue(): Long = { + val s @ State(end, registry) = state.get() + val registry2 = registry.updated(end, Registration(action, registerCancel _)) - if (!state.compareAndSet(s, State(end + 1, registry2))) - enqueue() - else - end - } + if (!state.compareAndSet(s, State(end + 1, registry2))) + enqueue() + else + end + } - @tailrec - def dequeue(id: Long): Unit = { - val s @ State(_, registry) = state.get() - val registry2 = registry - id + @tailrec + def dequeue(id: Long): Unit = { + val s @ State(_, registry) = state.get() + val registry2 = registry - id - if (!state.compareAndSet(s, s.copy(registry = registry2))) { - dequeue(id) - } + if (!state.compareAndSet(s, s.copy(registry = registry2))) { + dequeue(id) } + } - if (alive.get()) { - val id = enqueue() + if (alive.get()) { + val id = enqueue() - val f = latch.getAndSet(Open) - if (f != null) { - f() - } + val f = latch.getAndSet(Open) + if (f != null) { + f() + } - val cancel = { () => - canceled = true - dequeue(id) + val cancel = { () => + canceled = true + dequeue(id) - val token = cancelToken - if (token != null) - token() - else - Future.unit - } + val token = cancelToken + if (token != null) + token() + else + Future.unit + } - // double-check after we already put things in the structure - if (alive.get()) { - (promise.future, cancel) - } else { - // we were shutdown *during* the enqueue - cancel() - throw new IllegalStateException("dispatcher already shutdown") - } + // double-check after we already put things in the structure + if (alive.get()) { + (promise.future, cancel) } else { + // we were shutdown *during* the enqueue + cancel() throw new IllegalStateException("dispatcher already shutdown") } + } else { + throw new IllegalStateException("dispatcher already shutdown") } } } - } yield back + } } sealed trait Runner[F[_]] extends RunnerPlatform[F] { From 219c2a196144a20b1998bf21906a928367462622 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sat, 17 Oct 2020 16:33:00 -0600 Subject: [PATCH 17/18] s/Runner/Dispatcher/ --- .../cats/effect/std/DispatcherPlatform.scala | 4 +-- .../cats/effect/std/DispatcherPlatform.scala | 23 ++++++------- .../scala/cats/effect/std/Dispatcher.scala | 33 ++++++++++--------- 3 files changed, 28 insertions(+), 32 deletions(-) diff --git a/std/js/src/main/scala/cats/effect/std/DispatcherPlatform.scala b/std/js/src/main/scala/cats/effect/std/DispatcherPlatform.scala index 62b1b12caf..7761eaa05a 100644 --- a/std/js/src/main/scala/cats/effect/std/DispatcherPlatform.scala +++ b/std/js/src/main/scala/cats/effect/std/DispatcherPlatform.scala @@ -16,7 +16,5 @@ package cats.effect.std -private[std] trait DispatcherPlatform { this: Dispatcher.type => - - protected trait RunnerPlatform[F[_]] { this: Runner[F] => } +private[std] trait DispatcherPlatform[F[_]] { this: Dispatcher[F] => } diff --git a/std/jvm/src/main/scala/cats/effect/std/DispatcherPlatform.scala b/std/jvm/src/main/scala/cats/effect/std/DispatcherPlatform.scala index c0165da8d2..7f366ed3c9 100644 --- a/std/jvm/src/main/scala/cats/effect/std/DispatcherPlatform.scala +++ b/std/jvm/src/main/scala/cats/effect/std/DispatcherPlatform.scala @@ -19,21 +19,18 @@ package cats.effect.std import scala.concurrent.{Await, TimeoutException} import scala.concurrent.duration.Duration -private[std] trait DispatcherPlatform { this: Dispatcher.type => +private[std] trait DispatcherPlatform[F[_]] { this: Dispatcher[F] => - protected trait RunnerPlatform[F[_]] { this: Runner[F] => + def unsafeRunSync[A](fa: F[A]): A = + unsafeRunTimed(fa, Duration.Inf) - def unsafeRunSync[A](fa: F[A]): A = - unsafeRunTimed(fa, Duration.Inf) - - def unsafeRunTimed[A](fa: F[A], timeout: Duration): A = { - val (fut, cancel) = unsafeToFutureCancelable(fa) - try Await.result(fut, timeout) - catch { - case t: TimeoutException => - cancel() - throw t - } + def unsafeRunTimed[A](fa: F[A], timeout: Duration): A = { + val (fut, cancel) = unsafeToFutureCancelable(fa) + try Await.result(fut, timeout) + catch { + case t: TimeoutException => + cancel() + throw t } } } diff --git a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala index 26ef8dfc8f..3a6d62f8f3 100644 --- a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala +++ b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala @@ -26,11 +26,25 @@ import scala.concurrent.{Future, Promise} import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} -object Dispatcher extends DispatcherPlatform { +sealed trait Dispatcher[F[_]] extends DispatcherPlatform[F] { + + def unsafeToFutureCancelable[A](fa: F[A]): (Future[A], () => Future[Unit]) + + def unsafeToFuture[A](fa: F[A]): Future[A] = + unsafeToFutureCancelable(fa)._1 + + def unsafeRunAndForget[A](fa: F[A]): Unit = { + unsafeToFutureCancelable(fa) + () + } +} + + +object Dispatcher { private[this] val Open = () => () - def apply[F[_]](implicit F: Async[F]): Resource[F, Runner[F]] = { + def apply[F[_]](implicit F: Async[F]): Resource[F, Dispatcher[F]] = { final case class Registration(action: F[Unit], prepareCancel: F[Unit] => Unit) final case class State(end: Long, registry: LongMap[Registration]) { @@ -116,7 +130,7 @@ object Dispatcher extends DispatcherPlatform { _ <- dispatcher.foreverM[Unit].background } yield { - new Runner[F] { + new Dispatcher[F] { def unsafeToFutureCancelable[E](fe: F[E]): (Future[E], () => Future[Unit]) = { val promise = Promise[E]() @@ -196,17 +210,4 @@ object Dispatcher extends DispatcherPlatform { } } } - - sealed trait Runner[F[_]] extends RunnerPlatform[F] { - - def unsafeToFutureCancelable[A](fa: F[A]): (Future[A], () => Future[Unit]) - - def unsafeToFuture[A](fa: F[A]): Future[A] = - unsafeToFutureCancelable(fa)._1 - - def unsafeRunAndForget[A](fa: F[A]): Unit = { - unsafeToFutureCancelable(fa) - () - } - } } From 65103ae20fd0b4605b5109e77e369f1922f5eea2 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sun, 18 Oct 2020 13:38:37 -0600 Subject: [PATCH 18/18] Scalafmt --- std/js/src/main/scala/cats/effect/std/DispatcherPlatform.scala | 3 +-- std/shared/src/main/scala/cats/effect/std/Dispatcher.scala | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/std/js/src/main/scala/cats/effect/std/DispatcherPlatform.scala b/std/js/src/main/scala/cats/effect/std/DispatcherPlatform.scala index 7761eaa05a..37faffb495 100644 --- a/std/js/src/main/scala/cats/effect/std/DispatcherPlatform.scala +++ b/std/js/src/main/scala/cats/effect/std/DispatcherPlatform.scala @@ -16,5 +16,4 @@ package cats.effect.std -private[std] trait DispatcherPlatform[F[_]] { this: Dispatcher[F] => -} +private[std] trait DispatcherPlatform[F[_]] { this: Dispatcher[F] => } diff --git a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala index 3a6d62f8f3..c6b6672aa3 100644 --- a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala +++ b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala @@ -39,7 +39,6 @@ sealed trait Dispatcher[F[_]] extends DispatcherPlatform[F] { } } - object Dispatcher { private[this] val Open = () => ()