Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Backpressure / Supervisor #1695

Merged
merged 7 commits into from
Feb 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (c) 2017-2021 The Typelevel Cats-effect Project Developers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.concurrent

import cats.effect._
import cats.effect.implicits._
import cats.implicits._

/**
* Utility to apply backpressure semantics to the execution of an Effect.
* Backpressure instances will apply a [[Backpressure.Strategy]] to the
* execution where each strategy works as follows:
*
* [[Backpressure.Strategy.Lossy]] will mean that effects will not be run in
* the presence of backpressure, meaning the result will be None
*
* [[Backpressure.Strategy.Lossless]] will mean that effects will run in the
* presence of backpressure, meaning the effect will semantically block until
* backpressure is alleviated
*/
trait Backpressure[F[_]] {

/**
* Applies rate limiting to an effect based on backpressure semantics
*
* @param f the effect that backpressure is applied to
* @return an Option where Option denotes if the effect was run or not
* according to backpressure semantics
*/
def metered[A](f: F[A]): F[Option[A]]
}

object Backpressure {

/**
* Creates an instance of Backpressure that can be used to rate limit effects
* @param strategy strategy to apply for this backpressure instance
* @param bound depth of the queue that the backpressure instance should manage
* @return a [[Backpressure]] instance
*/
def apply[F[_]: Concurrent](
strategy: Strategy,
bound: Int
): F[Backpressure[F]] = {
require(bound > 0)
Semaphore[F](bound.toLong).map(sem =>
strategy match {
case Strategy.Lossy =>
new Backpressure[F] {
override def metered[A](f: F[A]): F[Option[A]] =
sem.tryAcquire.bracket {
case true => f.map(_.some)
case false => none[A].pure[F]
} {
case true => sem.release
case false => Concurrent[F].unit
}
}
case Strategy.Lossless =>
new Backpressure[F] {
override def metered[A](f: F[A]): F[Option[A]] =
sem.withPermit(f).map(_.some)
}
}
)

}

sealed trait Strategy
object Strategy {
case object Lossy extends Strategy
case object Lossless extends Strategy
}
}
136 changes: 136 additions & 0 deletions core/shared/src/main/scala/cats/effect/concurrent/Supervisor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright (c) 2017-2021 The Typelevel Cats-effect Project Developers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.concurrent

import cats.Parallel
import cats.effect._
import cats.effect.implicits._
import cats.implicits._

/**
* A fiber-based supervisor that monitors the lifecycle of all fibers
* that are started via its interface. The supervisor is managed by a singular
* fiber to which the lifecycles of all spawned fibers are bound.
*
* Whereas [[Concurrent.background]] links the lifecycle of the spawned fiber to
* the calling fiber, starting a fiber via a [[Supervisor]] links the lifecycle
* of the spawned fiber to the supervisor fiber. This is useful when the scope
* of some fiber must survive the spawner, but should still be confined within
* some "larger" scope.
*
* The fibers started via the supervisor are guaranteed to be terminated when
* the supervisor fiber is terminated. When a supervisor fiber is canceled, all
* active and queued fibers will be safely finalized before finalization of
* the supervisor is complete.
*
* The following diagrams illustrate the lifecycle of a fiber spawned via
* [[Concurrent.start]], [[Concurrent.background]], and [[Supervisor]]. In each
* example, some fiber A is spawning another fiber B. Each box represents the
* lifecycle of a fiber. If a box is enclosed within another box, it means that
* the lifecycle of the former is confined within the lifecycle of the latter.
* In other words, if an outer fiber terminates, the inner fibers are
* guaranteed to be terminated as well.
*
* start:
* {{{
* Fiber A lifecycle
* +---------------------+
* | | |
* +-----------------|---+
* |
* |A starts B
* Fiber B lifecycle |
* +-----------------|---+
* | + |
* +---------------------+
* }}}
*
* background:
* {{{
* Fiber A lifecycle
* +------------------------+
* | | |
* | Fiber B lifecycle |A starts B
* | +------------------|-+ |
* | | | | |
* | +--------------------+ |
* +------------------------+
* }}}
*
* Supervisor:
* {{{
* Supervisor lifecycle
* +---------------------+
* | Fiber B lifecycle |
* | +-----------------+ |
* | | + | |
* | +---------------|-+ |
* +-----------------|---+
* |
* | A starts B
* Fiber A lifecycle |
* +-----------------|---+
* | | |
* +---------------------+
* }}}
*
* [[Supervisor]] should be used when fire-and-forget semantics are desired.
*/
trait Supervisor[F[_]] {

/**
* Starts the supplied effect `fa` on the supervisor.
*
* @return a [[Fiber]] that represents a handle to the started fiber.
*/
def supervise[A](fa: F[A]): F[Fiber[F, A]]
}

object Supervisor {

private class Token

/**
* Creates a [[Resource]] scope within which fibers can be monitored. When
* this scope exits, all supervised fibers will be finalized.
*/
def apply[F[_]](
implicit F: Concurrent[F],
P: Parallel[F]
): Resource[F, Supervisor[F]] =
for {
stateRef <- Resource.make(Ref.of[F, Map[Token, F[Unit]]](Map())) { state =>
state.get.flatMap { fibers =>
fibers.values.toList.parSequence
}.void
}
} yield {
new Supervisor[F] {
override def supervise[A](fa: F[A]): F[Fiber[F, A]] =
F.uncancelable {
Deferred[F, Unit].flatMap { gate =>
val token = new Token
val action = fa.guarantee(gate.get *> stateRef.update(_ - token))
F.start(action).flatMap { fiber =>
stateRef.update(_ + (token -> fiber.cancel)).as(fiber) <* gate
.complete(())
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2017-2021 The Typelevel Cats-effect Project Developers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.concurrent

import cats.effect._
import cats.syntax.all._

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

class BackpressureTests extends CatsEffectSuite {

implicit val executionContext: ExecutionContext = ExecutionContext.Implicits.global
implicit val timer: cats.effect.Timer[IO] = IO.timer(executionContext)
implicit val cs: ContextShift[IO] = IO.contextShift(executionContext)

test("Lossy Strategy should return IO[None] when no permits are available") {
for {
backpressure <- Backpressure[IO](Backpressure.Strategy.Lossy, 1)
never = backpressure.metered(IO.never)
lost <- IO.race(never, never)
} yield assert(lost.fold(identity, identity).isEmpty)
}

test("Lossless Strategy should complete effects even when no permits are available") {
for {
backpressure <- Backpressure[IO](Backpressure.Strategy.Lossless, 1)
f1 <- backpressure.metered(IO.sleep(1.second) *> 1.pure[IO]).start
f2 <- backpressure.metered(IO.sleep(1.second) *> 2.pure[IO]).start
tup <- (f1, f2).tupled.join
(res1, res2) = tup
} yield assertEquals((res1, res2), (Some(1), Some(2)))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2017-2021 The Typelevel Cats-effect Project Developers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.concurrent

import cats.effect._

import scala.concurrent.ExecutionContext

class SupervisorTests extends CatsEffectSuite {

implicit val executionContext: ExecutionContext = ExecutionContext.Implicits.global
implicit val timer: cats.effect.Timer[IO] = IO.timer(executionContext)
implicit val cs: ContextShift[IO] = IO.contextShift(executionContext)

test("start a fiber that completes successfully") {
Supervisor[IO]
.use { supervisor =>
supervisor.supervise(IO(1)).flatMap(_.join)
}
.map(x => assertEquals(x, 1))
}

test("start a fiber that raises an error") {
val t = new Throwable("failed")
Supervisor[IO]
.use { supervisor =>
supervisor.supervise(IO.raiseError[Unit](t)).flatMap(_.join)
}
.attempt
.map(x => assertEquals(x, Left(t)))
}

test("cancel active fibers when supervisor exits") {
for {
testPassed <- Deferred[IO, Boolean]
gate <- Deferred[IO, Unit]
_ <- Supervisor[IO].use { supervisor =>
supervisor.supervise(
(gate.complete(()) *> IO.never).guarantee(testPassed.complete(true))
) *> gate.get
}
result <- testPassed.get
} yield assert(result)
}
}