Skip to content

Commit

Permalink
Merge pull request #1817 from etspaceman/backpressure
Browse files Browse the repository at this point in the history
  • Loading branch information
kubukoz authored Jul 26, 2021
2 parents 8e514f4 + b91fae1 commit 07d1828
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 0 deletions.
89 changes: 89 additions & 0 deletions std/shared/src/main/scala/cats/effect/std/Backpressure.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2020-2021 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.std

import cats.effect.kernel._
import cats.effect.kernel.implicits._
import cats.syntax.all._

/**
* Utility to apply backpressure semantics to the execution of an Effect.
* Backpressure instances will apply a [[Backpressure.Strategy]] to the
* execution where each strategy works as follows:
*
* [[Backpressure.Strategy.Lossy]] will mean that effects will not be run in
* the presence of backpressure, meaning the result will be None
*
* [[Backpressure.Strategy.Lossless]] will mean that effects will run in the
* presence of backpressure, meaning the effect will semantically block until
* backpressure is alleviated
*/
trait Backpressure[F[_]] {

/**
* Applies rate limiting to an effect based on backpressure semantics
*
* @param f the effect that backpressure is applied to
* @return an Option where Option denotes if the effect was run or not
* according to backpressure semantics
*/
def metered[A](f: F[A]): F[Option[A]]
}

object Backpressure {

/**
* Creates an instance of Backpressure that can be used to rate limit effects
* @param strategy strategy to apply for this backpressure instance
* @param bound depth of the queue that the backpressure instance should manage
* @return a [[Backpressure]] instance
*/
def apply[F[_]](
strategy: Strategy,
bound: Int
)(implicit GC: GenConcurrent[F, Throwable]): F[Backpressure[F]] = {
require(bound > 0)
Semaphore[F](bound.toLong).map(sem =>
strategy match {
case Strategy.Lossy =>
new Backpressure[F] {
override def metered[A](f: F[A]): F[Option[A]] =
sem
.tryAcquire
.bracket {
case true => f.map(_.some)
case false => none[A].pure[F]
} {
case true => sem.release
case false => GC.unit
}
}
case Strategy.Lossless =>
new Backpressure[F] {
override def metered[A](f: F[A]): F[Option[A]] =
sem.permit.use(_ => f).map(_.some)
}
})

}

sealed trait Strategy
object Strategy {
case object Lossy extends Strategy
case object Lossless extends Strategy
}
}
51 changes: 51 additions & 0 deletions tests/shared/src/test/scala/cats/effect/std/BackpressureSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2020-2021 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect
package std

import cats.syntax.all._

import scala.concurrent.duration._

class BackpressureTests extends BaseSpec {

"Backpressure" should {
"Lossy Strategy should return IO[None] when no permits are available" in ticked {
implicit ticker =>
val test = for {
backpressure <- Backpressure[IO](Backpressure.Strategy.Lossy, 1)
never = backpressure.metered(IO.never)
lost <- IO.race(never, never)
} yield lost.fold(identity, identity).isEmpty

test must completeAs(true)
}

"Lossless Strategy should complete effects even when no permits are available" in ticked {
implicit ticker =>
val test = for {
backpressure <- Backpressure[IO](Backpressure.Strategy.Lossless, 1)
f1 <- backpressure.metered(IO.sleep(1.second) *> 1.pure[IO]).start
f2 <- backpressure.metered(IO.sleep(1.second) *> 2.pure[IO]).start
res1 <- f1.joinWithNever
res2 <- f2.joinWithNever
} yield (res1, res2)

test must completeAs((Some(1), Some(2)))
}
}
}

0 comments on commit 07d1828

Please sign in to comment.