Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First cut at Dispatcher #1303

Merged
merged 18 commits into from
Oct 18, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions core/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2020 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect
package std

import cats.effect.kernel.Deferred
import cats.syntax.all._

import scala.concurrent.duration._

class DispatcherSpec extends BaseSpec {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having some examples of this being used with a callback-driven API would be nice (not necessarily in the form of tests)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I think this needs some more documentation. Probably just scaladoc alone is sufficient.


"async dispatcher" should {
"run a synchronous IO" in real {
val ioa = IO(1).map(_ + 2)
val rec = Dispatcher[IO, Int](runner => IO.fromFuture(IO(runner.unsafeToFuture(ioa))))
rec.use(i => IO(i mustEqual 3))
}

"run an asynchronous IO" in real {
val ioa = (IO(1) <* IO.cede).map(_ + 2)
val rec = Dispatcher[IO, Int](runner => IO.fromFuture(IO(runner.unsafeToFuture(ioa))))
rec.use(i => IO(i mustEqual 3))
}

"run several IOs back to back" in real {
@volatile
var counter = 0
val increment = IO(counter += 1)

val num = 10

val rec = Dispatcher[IO, Unit] { runner =>
IO.fromFuture(IO(runner.unsafeToFuture(increment))).replicateA(num).void
}

rec.use(_ => IO(counter mustEqual num))
}

"run multiple IOs in parallel" in real {
val num = 10

for {
latches <- (0 until num).toList.traverse(_ => Deferred[IO, Unit])
awaitAll = latches.parTraverse_(_.get)

// engineer a deadlock: all subjects must be run in parallel or this will hang
subjects = latches.map(latch => latch.complete(()) >> awaitAll)

_ <- {
val rec = Dispatcher[IO, Unit] { runner =>
subjects.parTraverse_(act => IO(runner.unsafeRunAndForget(act)))
}

rec.use(_ => IO.unit)
}
} yield ok
}

"forward cancelation onto the inner action" in real {
var canceled = false

val rec = Dispatcher[IO, Unit] { runner =>
val run = IO {
runner.unsafeToFutureCancelable(IO.never.onCancel(IO { canceled = true }))._2
}

run.flatMap(ct => IO.sleep(100.millis) >> IO.fromFuture(IO(ct())))
}

rec.use(_ => IO(canceled must beTrue))
}
}
}
22 changes: 22 additions & 0 deletions std/js/src/main/scala/cats/effect/std/DispatcherPlatform.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2020 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.std

private[std] trait DispatcherPlatform { this: Dispatcher.type =>

protected trait RunnerPlatform[F[_]] { this: Runner[F] => }
}
39 changes: 39 additions & 0 deletions std/jvm/src/main/scala/cats/effect/std/DispatcherPlatform.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2020 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.std

import scala.concurrent.{Await, TimeoutException}
import scala.concurrent.duration.Duration

private[std] trait DispatcherPlatform { this: Dispatcher.type =>

protected trait RunnerPlatform[F[_]] { this: Runner[F] =>

def unsafeRunSync[A](fa: F[A]): A =
unsafeRunTimed(fa, Duration.Inf)

def unsafeRunTimed[A](fa: F[A], timeout: Duration): A = {
val (fut, cancel) = unsafeToFutureCancelable(fa)
try Await.result(fut, timeout)
catch {
case t: TimeoutException =>
cancel()
throw t
}
}
}
}
168 changes: 168 additions & 0 deletions std/shared/src/main/scala/cats/effect/std/Dispatcher.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Copyright 2020 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.std

import cats.effect.kernel.{Async, Deferred, Fiber, MonadCancel, Ref, Resource, Sync}
import cats.effect.kernel.syntax.all._
import cats.syntax.all._

import scala.annotation.tailrec
import scala.collection.immutable.LongMap
import scala.concurrent.{Future, Promise}

import java.util.concurrent.atomic.AtomicReference

object Dispatcher extends DispatcherPlatform {

def apply[F[_]: Async, A](unsafe: Runner[F] => F[A]): Resource[F, A] = {
djspiewak marked this conversation as resolved.
Show resolved Hide resolved
final case class State(
begin: Long,
end: Long,
registry: LongMap[(F[Unit], F[Unit] => Unit)]) {
// efficiency on the CAS
override def equals(that: Any) = this eq that.asInstanceOf[AnyRef]
djspiewak marked this conversation as resolved.
Show resolved Hide resolved
override def hashCode = System.identityHashCode(this)
}

val Open = () => ()
val Empty = State(0, 0, LongMap())
djspiewak marked this conversation as resolved.
Show resolved Hide resolved

for {
latch <- Resource.liftF(Sync[F].delay(new AtomicReference[() => Unit]))
state <- Resource.liftF(Sync[F].delay(new AtomicReference[State](Empty)))

active <- Resource.make(Ref[F].of(Set[Fiber[F, Throwable, Unit]]())) { ref =>
ref.get.flatMap(_.toList.traverse_(_.cancel))
djspiewak marked this conversation as resolved.
Show resolved Hide resolved
djspiewak marked this conversation as resolved.
Show resolved Hide resolved
}

dispatcher = for {
_ <- Sync[F].delay(latch.set(null)) // reset to null
s <- Sync[F].delay(state.getAndSet(Empty))

State(begin, end, registry) = s
pairs = (begin until end).toList.flatMap(registry.get)
djspiewak marked this conversation as resolved.
Show resolved Hide resolved

_ <-
if (pairs.isEmpty) {
Async[F].async_[Unit] { cb =>
if (!latch.compareAndSet(null, () => cb(Right(())))) {
// state was changed between when we last set the latch and now; complete the callback immediately
cb(Right(()))
}
}
} else {
MonadCancel[F] uncancelable { _ =>
for {
fibers <- pairs traverse {
case (action, f) =>
for {
fiberDef <- Deferred[F, Fiber[F, Throwable, Unit]]

enriched = action guarantee {
fiberDef.get.flatMap(fiber => active.update(_ - fiber))
}

fiber <- enriched.start
_ <- fiberDef.complete(fiber)
_ <- Sync[F].delay(f(fiber.cancel))
} yield fiber
}

_ <- active.update(_ ++ fibers)
djspiewak marked this conversation as resolved.
Show resolved Hide resolved
} yield ()
}
}
} yield ()

_ <- dispatcher.foreverM[Unit].background

back <- Resource liftF {
unsafe {
new Runner[F] {
def unsafeToFutureCancelable[E](fe: F[E]): (Future[E], () => Future[Unit]) = {
val promise = Promise[E]()

val action = fe
.flatMap(e => Sync[F].delay(promise.success(e)))
.onError { case t => Sync[F].delay(promise.failure(t)) }
.void

@volatile
var cancelToken: F[Unit] = null.asInstanceOf[F[Unit]]

def registerCancel(token: F[Unit]): Unit =
cancelToken = token

@tailrec
def enqueue(): Long = {
val s @ State(_, end, registry) = state.get()
val registry2 = registry.updated(end, (action, registerCancel _))

if (!state.compareAndSet(s, s.copy(end = end + 1, registry = registry2)))
enqueue()
else
end
}

@tailrec
def dequeue(id: Long): Unit = {
val s @ State(_, _, registry) = state.get()
val registry2 = registry - id

if (!state.compareAndSet(s, s.copy(registry = registry2))) {
dequeue(id)
}
}

val id = enqueue()

val f = latch.getAndSet(Open)
if (f != null) {
f()
}

val cancel = { () =>
dequeue(id)

val token = cancelToken
if (token != null)
unsafeToFuture(token)
else
Future.unit
}

(promise.future, cancel)
}
}
}
}
} yield back
}

sealed trait Runner[F[_]] extends RunnerPlatform[F] {

def unsafeToFutureCancelable[A](fa: F[A]): (Future[A], () => Future[Unit])

def unsafeToFuture[A](fa: F[A]): Future[A] =
unsafeToFutureCancelable(fa)._1

def unsafeRunAndForget[A](fa: F[A]): Unit = {
unsafeToFutureCancelable(fa)
()
}
}
}