Skip to content

Commit

Permalink
Merge pull request #1695 from etspaceman/supervisorBackpressure
Browse files Browse the repository at this point in the history
Add Backpressure / Supervisor
  • Loading branch information
djspiewak authored Feb 20, 2021
2 parents 9b6ae8d + 1eeb45c commit c2e9db5
Show file tree
Hide file tree
Showing 4 changed files with 331 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (c) 2017-2021 The Typelevel Cats-effect Project Developers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.concurrent

import cats.effect._
import cats.effect.implicits._
import cats.implicits._

/**
* Utility to apply backpressure semantics to the execution of an Effect.
* Backpressure instances will apply a [[Backpressure.Strategy]] to the
* execution where each strategy works as follows:
*
* [[Backpressure.Strategy.Lossy]] will mean that effects will not be run in
* the presence of backpressure, meaning the result will be None
*
* [[Backpressure.Strategy.Lossless]] will mean that effects will run in the
* presence of backpressure, meaning the effect will semantically block until
* backpressure is alleviated
*/
trait Backpressure[F[_]] {

/**
* Applies rate limiting to an effect based on backpressure semantics
*
* @param f the effect that backpressure is applied to
* @return an Option where Option denotes if the effect was run or not
* according to backpressure semantics
*/
def metered[A](f: F[A]): F[Option[A]]
}

object Backpressure {

/**
* Creates an instance of Backpressure that can be used to rate limit effects
* @param strategy strategy to apply for this backpressure instance
* @param bound depth of the queue that the backpressure instance should manage
* @return a [[Backpressure]] instance
*/
def apply[F[_]: Concurrent](
strategy: Strategy,
bound: Int
): F[Backpressure[F]] = {
require(bound > 0)
Semaphore[F](bound.toLong).map(sem =>
strategy match {
case Strategy.Lossy =>
new Backpressure[F] {
override def metered[A](f: F[A]): F[Option[A]] =
sem.tryAcquire.bracket {
case true => f.map(_.some)
case false => none[A].pure[F]
} {
case true => sem.release
case false => Concurrent[F].unit
}
}
case Strategy.Lossless =>
new Backpressure[F] {
override def metered[A](f: F[A]): F[Option[A]] =
sem.withPermit(f).map(_.some)
}
}
)

}

sealed trait Strategy
object Strategy {
case object Lossy extends Strategy
case object Lossless extends Strategy
}
}
136 changes: 136 additions & 0 deletions core/shared/src/main/scala/cats/effect/concurrent/Supervisor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright (c) 2017-2021 The Typelevel Cats-effect Project Developers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.concurrent

import cats.Parallel
import cats.effect._
import cats.effect.implicits._
import cats.implicits._

/**
* A fiber-based supervisor that monitors the lifecycle of all fibers
* that are started via its interface. The supervisor is managed by a singular
* fiber to which the lifecycles of all spawned fibers are bound.
*
* Whereas [[Concurrent.background]] links the lifecycle of the spawned fiber to
* the calling fiber, starting a fiber via a [[Supervisor]] links the lifecycle
* of the spawned fiber to the supervisor fiber. This is useful when the scope
* of some fiber must survive the spawner, but should still be confined within
* some "larger" scope.
*
* The fibers started via the supervisor are guaranteed to be terminated when
* the supervisor fiber is terminated. When a supervisor fiber is canceled, all
* active and queued fibers will be safely finalized before finalization of
* the supervisor is complete.
*
* The following diagrams illustrate the lifecycle of a fiber spawned via
* [[Concurrent.start]], [[Concurrent.background]], and [[Supervisor]]. In each
* example, some fiber A is spawning another fiber B. Each box represents the
* lifecycle of a fiber. If a box is enclosed within another box, it means that
* the lifecycle of the former is confined within the lifecycle of the latter.
* In other words, if an outer fiber terminates, the inner fibers are
* guaranteed to be terminated as well.
*
* start:
* {{{
* Fiber A lifecycle
* +---------------------+
* | | |
* +-----------------|---+
* |
* |A starts B
* Fiber B lifecycle |
* +-----------------|---+
* | + |
* +---------------------+
* }}}
*
* background:
* {{{
* Fiber A lifecycle
* +------------------------+
* | | |
* | Fiber B lifecycle |A starts B
* | +------------------|-+ |
* | | | | |
* | +--------------------+ |
* +------------------------+
* }}}
*
* Supervisor:
* {{{
* Supervisor lifecycle
* +---------------------+
* | Fiber B lifecycle |
* | +-----------------+ |
* | | + | |
* | +---------------|-+ |
* +-----------------|---+
* |
* | A starts B
* Fiber A lifecycle |
* +-----------------|---+
* | | |
* +---------------------+
* }}}
*
* [[Supervisor]] should be used when fire-and-forget semantics are desired.
*/
trait Supervisor[F[_]] {

/**
* Starts the supplied effect `fa` on the supervisor.
*
* @return a [[Fiber]] that represents a handle to the started fiber.
*/
def supervise[A](fa: F[A]): F[Fiber[F, A]]
}

object Supervisor {

private class Token

/**
* Creates a [[Resource]] scope within which fibers can be monitored. When
* this scope exits, all supervised fibers will be finalized.
*/
def apply[F[_]](
implicit F: Concurrent[F],
P: Parallel[F]
): Resource[F, Supervisor[F]] =
for {
stateRef <- Resource.make(Ref.of[F, Map[Token, F[Unit]]](Map())) { state =>
state.get.flatMap { fibers =>
fibers.values.toList.parSequence
}.void
}
} yield {
new Supervisor[F] {
override def supervise[A](fa: F[A]): F[Fiber[F, A]] =
F.uncancelable {
Deferred[F, Unit].flatMap { gate =>
val token = new Token
val action = fa.guarantee(gate.get *> stateRef.update(_ - token))
F.start(action).flatMap { fiber =>
stateRef.update(_ + (token -> fiber.cancel)).as(fiber) <* gate
.complete(())
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2017-2021 The Typelevel Cats-effect Project Developers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.concurrent

import cats.effect._
import cats.syntax.all._

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

class BackpressureTests extends CatsEffectSuite {

implicit val executionContext: ExecutionContext = ExecutionContext.Implicits.global
implicit val timer: cats.effect.Timer[IO] = IO.timer(executionContext)
implicit val cs: ContextShift[IO] = IO.contextShift(executionContext)

test("Lossy Strategy should return IO[None] when no permits are available") {
for {
backpressure <- Backpressure[IO](Backpressure.Strategy.Lossy, 1)
never = backpressure.metered(IO.never)
lost <- IO.race(never, never)
} yield assert(lost.fold(identity, identity).isEmpty)
}

test("Lossless Strategy should complete effects even when no permits are available") {
for {
backpressure <- Backpressure[IO](Backpressure.Strategy.Lossless, 1)
f1 <- backpressure.metered(IO.sleep(1.second) *> 1.pure[IO]).start
f2 <- backpressure.metered(IO.sleep(1.second) *> 2.pure[IO]).start
tup <- (f1, f2).tupled.join
(res1, res2) = tup
} yield assertEquals((res1, res2), (Some(1), Some(2)))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2017-2021 The Typelevel Cats-effect Project Developers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.concurrent

import cats.effect._

import scala.concurrent.ExecutionContext

class SupervisorTests extends CatsEffectSuite {

implicit val executionContext: ExecutionContext = ExecutionContext.Implicits.global
implicit val timer: cats.effect.Timer[IO] = IO.timer(executionContext)
implicit val cs: ContextShift[IO] = IO.contextShift(executionContext)

test("start a fiber that completes successfully") {
Supervisor[IO]
.use { supervisor =>
supervisor.supervise(IO(1)).flatMap(_.join)
}
.map(x => assertEquals(x, 1))
}

test("start a fiber that raises an error") {
val t = new Throwable("failed")
Supervisor[IO]
.use { supervisor =>
supervisor.supervise(IO.raiseError[Unit](t)).flatMap(_.join)
}
.attempt
.map(x => assertEquals(x, Left(t)))
}

test("cancel active fibers when supervisor exits") {
for {
testPassed <- Deferred[IO, Boolean]
gate <- Deferred[IO, Unit]
_ <- Supervisor[IO].use { supervisor =>
supervisor.supervise(
(gate.complete(()) *> IO.never).guarantee(testPassed.complete(true))
) *> gate.get
}
result <- testPassed.get
} yield assert(result)
}
}

0 comments on commit c2e9db5

Please sign in to comment.