Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First cut at Dispatcher #1303

Merged
merged 18 commits into from
Oct 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions core/js/src/main/scala/cats/effect/IOApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ trait IOApp {

protected val runtime: unsafe.IORuntime = unsafe.IORuntime.global

protected implicit val unsafeRunForIO: unsafe.UnsafeRun[IO] = runtime.unsafeRunForIO

final def main(args: Array[String]): Unit = {
// An infinite heartbeat to keep main alive. This is similar to
// `IO.never`, except `IO.never` doesn't schedule any tasks and is
Expand Down
2 changes: 0 additions & 2 deletions core/jvm/src/main/scala/cats/effect/IOApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ trait IOApp {

protected val runtime: unsafe.IORuntime = unsafe.IORuntime.global

protected implicit val unsafeRunForIO: unsafe.UnsafeRun[IO] = runtime.unsafeRunForIO

final def main(args: Array[String]): Unit = {

val rt = Runtime.getRuntime()
Expand Down
3 changes: 0 additions & 3 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -556,9 +556,6 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {

implicit def asyncForIO: kernel.Async[IO] = _asyncForIO

implicit def unsafeRunForIO(implicit runtime: unsafe.IORuntime): unsafe.UnsafeRun[IO] =
runtime.unsafeRunForIO

private[this] val _parallelForIO: Parallel.Aux[IO, ParallelF[IO, *]] =
parallelForGenSpawn[IO, Throwable]

Expand Down
16 changes: 1 addition & 15 deletions core/shared/src/main/scala/cats/effect/unsafe/IORuntime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package cats.effect
package unsafe

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.ExecutionContext

@annotation.implicitNotFound("""Could not find an implicit IORuntime.

Expand All @@ -39,20 +39,6 @@ final class IORuntime private[effect] (
val scheduler: Scheduler,
val shutdown: () => Unit) {

private implicit val self: IORuntime = this

val unsafeRunForIO: UnsafeRun[IO] =
new UnsafeRun[IO] {
def unsafeRunFutureCancelable[A](fa: IO[A]): (Future[A], () => Future[Unit]) = {
val p = Promise[A]()
val fiber = fa.unsafeRunFiber(true) {
case Left(t) => p.failure(t)
case Right(a) => p.success(a)
}
(p.future, () => fiber.cancel.unsafeToFuture())
}
}

override def toString: String = s"IORuntime($compute, $scheduler)"
}

Expand Down
132 changes: 132 additions & 0 deletions core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright 2020 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect
package std

import cats.effect.kernel.Deferred
import cats.syntax.all._

import scala.concurrent.duration._

class DispatcherSpec extends BaseSpec {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having some examples of this being used with a callback-driven API would be nice (not necessarily in the form of tests)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I think this needs some more documentation. Probably just scaladoc alone is sufficient.


"async dispatcher" should {
"run a synchronous IO" in real {
val ioa = IO(1).map(_ + 2)
val rec = Dispatcher[IO].flatMap(runner =>
Resource.liftF(IO.fromFuture(IO(runner.unsafeToFuture(ioa)))))
rec.use(i => IO(i mustEqual 3))
}

"run an asynchronous IO" in real {
val ioa = (IO(1) <* IO.cede).map(_ + 2)
val rec = Dispatcher[IO].flatMap(runner =>
Resource.liftF(IO.fromFuture(IO(runner.unsafeToFuture(ioa)))))
rec.use(i => IO(i mustEqual 3))
}

"run several IOs back to back" in real {
@volatile
var counter = 0
val increment = IO(counter += 1)

val num = 10

val rec = Dispatcher[IO] flatMap { runner =>
Resource.liftF(IO.fromFuture(IO(runner.unsafeToFuture(increment))).replicateA(num).void)
}

rec.use(_ => IO(counter mustEqual num))
}

"run multiple IOs in parallel" in real {
val num = 10

for {
latches <- (0 until num).toList.traverse(_ => Deferred[IO, Unit])
awaitAll = latches.parTraverse_(_.get)

// engineer a deadlock: all subjects must be run in parallel or this will hang
subjects = latches.map(latch => latch.complete(()) >> awaitAll)

_ <- {
val rec = Dispatcher[IO] flatMap { runner =>
Resource.liftF(subjects.parTraverse_(act => IO(runner.unsafeRunAndForget(act))))
}

rec.use(_ => IO.unit)
}
} yield ok
}

"forward cancelation onto the inner action" in real {
var canceled = false

val rec = Dispatcher[IO] flatMap { runner =>
val run = IO {
runner.unsafeToFutureCancelable(IO.never.onCancel(IO { canceled = true }))._2
}

Resource liftF {
run.flatMap(ct => IO.sleep(100.millis) >> IO.fromFuture(IO(ct())))
}
}

rec.use(_ => IO(canceled must beTrue))
}

"cancel all inner effects when canceled" in real {
@volatile
var canceledA = false
@volatile
var canceledB = false

val rec = Dispatcher[IO] flatMap { runner =>
Resource liftF {
IO {
// these finalizers never return, so this test is intentionally designed to hang
// they flip their booleans first though; this is just testing that both run in parallel
val a = IO.never.onCancel(IO { canceledA = true } *> IO.never)
val b = IO.never.onCancel(IO { canceledB = true } *> IO.never)

runner.unsafeRunAndForget(a)
runner.unsafeRunAndForget(b)
}
}
}

for {
_ <- rec.use(_ => IO.sleep(50.millis)).start
_ <- IO.sleep(100.millis) // scope should be closed by now

r <- IO {
// if we don't run the finalizers in parallel, one of these will be false
canceledA must beTrue
canceledB must beTrue
}
} yield r
}

"raise an error on leaked runner" in real {
Dispatcher[IO].use(IO.pure(_)) flatMap { runner =>
IO {
runner.unsafeRunAndForget(IO(ko)) must throwAn[IllegalStateException]
}
}
}
}
}
57 changes: 0 additions & 57 deletions core/shared/src/test/scala/cats/effect/unsafe/UnsafeRunSpec.scala

This file was deleted.

32 changes: 0 additions & 32 deletions kernel/shared/src/main/scala/cats/effect/unsafe/UnsafeRun.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@
* limitations under the License.
*/

package cats.effect.unsafe
package cats.effect.std

private[unsafe] trait UnsafeRunPlatform[F[_]] {}
private[std] trait DispatcherPlatform[F[_]] { this: Dispatcher[F] => }
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@
* limitations under the License.
*/

package cats.effect.unsafe
package cats.effect.std

import scala.concurrent.{Await, TimeoutException}
import scala.concurrent.duration.Duration

private[unsafe] trait UnsafeRunPlatform[F[_]] { self: UnsafeRun[F] =>
private[std] trait DispatcherPlatform[F[_]] { this: Dispatcher[F] =>

def unsafeRunSync[A](fa: F[A]): A =
unsafeRunTimed(fa, Duration.Inf)

def unsafeRunTimed[A](fa: F[A], timeout: Duration): A = {
val (fut, cancel) = unsafeRunFutureCancelable(fa)
val (fut, cancel) = unsafeToFutureCancelable(fa)
try Await.result(fut, timeout)
catch {
case t: TimeoutException =>
Expand Down
Loading