From b91fae112a6ccf396a36faa2af531216651d5016 Mon Sep 17 00:00:00 2001 From: Eric Meisel Date: Tue, 23 Mar 2021 23:05:11 -0500 Subject: [PATCH] Backpressure Interface --- .../scala/cats/effect/std/Backpressure.scala | 89 +++++++++++++++++++ .../cats/effect/std/BackpressureSpec.scala | 51 +++++++++++ 2 files changed, 140 insertions(+) create mode 100644 std/shared/src/main/scala/cats/effect/std/Backpressure.scala create mode 100644 tests/shared/src/test/scala/cats/effect/std/BackpressureSpec.scala diff --git a/std/shared/src/main/scala/cats/effect/std/Backpressure.scala b/std/shared/src/main/scala/cats/effect/std/Backpressure.scala new file mode 100644 index 0000000000..ec027f35ad --- /dev/null +++ b/std/shared/src/main/scala/cats/effect/std/Backpressure.scala @@ -0,0 +1,89 @@ +/* + * Copyright 2020-2021 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.std + +import cats.effect.kernel._ +import cats.effect.kernel.implicits._ +import cats.syntax.all._ + +/** + * Utility to apply backpressure semantics to the execution of an Effect. + * Backpressure instances will apply a [[Backpressure.Strategy]] to the + * execution where each strategy works as follows: + * + * [[Backpressure.Strategy.Lossy]] will mean that effects will not be run in + * the presence of backpressure, meaning the result will be None + * + * [[Backpressure.Strategy.Lossless]] will mean that effects will run in the + * presence of backpressure, meaning the effect will semantically block until + * backpressure is alleviated + */ +trait Backpressure[F[_]] { + + /** + * Applies rate limiting to an effect based on backpressure semantics + * + * @param f the effect that backpressure is applied to + * @return an Option where Option denotes if the effect was run or not + * according to backpressure semantics + */ + def metered[A](f: F[A]): F[Option[A]] +} + +object Backpressure { + + /** + * Creates an instance of Backpressure that can be used to rate limit effects + * @param strategy strategy to apply for this backpressure instance + * @param bound depth of the queue that the backpressure instance should manage + * @return a [[Backpressure]] instance + */ + def apply[F[_]]( + strategy: Strategy, + bound: Int + )(implicit GC: GenConcurrent[F, Throwable]): F[Backpressure[F]] = { + require(bound > 0) + Semaphore[F](bound.toLong).map(sem => + strategy match { + case Strategy.Lossy => + new Backpressure[F] { + override def metered[A](f: F[A]): F[Option[A]] = + sem + .tryAcquire + .bracket { + case true => f.map(_.some) + case false => none[A].pure[F] + } { + case true => sem.release + case false => GC.unit + } + } + case Strategy.Lossless => + new Backpressure[F] { + override def metered[A](f: F[A]): F[Option[A]] = + sem.permit.use(_ => f).map(_.some) + } + }) + + } + + sealed trait Strategy + object Strategy { + case object Lossy extends Strategy + case object Lossless extends Strategy + } +} diff --git a/tests/shared/src/test/scala/cats/effect/std/BackpressureSpec.scala b/tests/shared/src/test/scala/cats/effect/std/BackpressureSpec.scala new file mode 100644 index 0000000000..ec4be251be --- /dev/null +++ b/tests/shared/src/test/scala/cats/effect/std/BackpressureSpec.scala @@ -0,0 +1,51 @@ +/* + * Copyright 2020-2021 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package std + +import cats.syntax.all._ + +import scala.concurrent.duration._ + +class BackpressureTests extends BaseSpec { + + "Backpressure" should { + "Lossy Strategy should return IO[None] when no permits are available" in ticked { + implicit ticker => + val test = for { + backpressure <- Backpressure[IO](Backpressure.Strategy.Lossy, 1) + never = backpressure.metered(IO.never) + lost <- IO.race(never, never) + } yield lost.fold(identity, identity).isEmpty + + test must completeAs(true) + } + + "Lossless Strategy should complete effects even when no permits are available" in ticked { + implicit ticker => + val test = for { + backpressure <- Backpressure[IO](Backpressure.Strategy.Lossless, 1) + f1 <- backpressure.metered(IO.sleep(1.second) *> 1.pure[IO]).start + f2 <- backpressure.metered(IO.sleep(1.second) *> 2.pure[IO]).start + res1 <- f1.joinWithNever + res2 <- f2.joinWithNever + } yield (res1, res2) + + test must completeAs((Some(1), Some(2))) + } + } +}