Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backpressure Interface for CE3 #1817

Merged
merged 2 commits into from
Jul 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions std/shared/src/main/scala/cats/effect/std/Backpressure.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2020-2021 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.std

import cats.effect.kernel._
import cats.effect.kernel.implicits._
import cats.syntax.all._

/**
* Utility to apply backpressure semantics to the execution of an Effect.
* Backpressure instances will apply a [[Backpressure.Strategy]] to the
* execution where each strategy works as follows:
*
* [[Backpressure.Strategy.Lossy]] will mean that effects will not be run in
* the presence of backpressure, meaning the result will be None
*
* [[Backpressure.Strategy.Lossless]] will mean that effects will run in the
* presence of backpressure, meaning the effect will semantically block until
* backpressure is alleviated
*/
trait Backpressure[F[_]] {

/**
* Applies rate limiting to an effect based on backpressure semantics
*
* @param f the effect that backpressure is applied to
* @return an Option where Option denotes if the effect was run or not
* according to backpressure semantics
*/
def metered[A](f: F[A]): F[Option[A]]
}

object Backpressure {

/**
* Creates an instance of Backpressure that can be used to rate limit effects
* @param strategy strategy to apply for this backpressure instance
* @param bound depth of the queue that the backpressure instance should manage
* @return a [[Backpressure]] instance
*/
def apply[F[_]](
strategy: Strategy,
bound: Int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is passed to the underlying semaphore, which accepts Long arguments, should we maybe have this be a Long too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially, though the CE2 version uses an Int. Also, Long would be quite a large bound for what Backpressure is; Long.MaxValue would be a heck of a lot of concurrent effects 😅

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I was just raising a question on the API design. I'm fine either way.

)(implicit GC: GenConcurrent[F, Throwable]): F[Backpressure[F]] = {
require(bound > 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to raise than throw here? I'm surprised to find require in the code in a couple places, so there is precedent for this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... these instances exist because of the GenConcurrent constraint...

Semaphore[F](bound.toLong).map(sem =>
strategy match {
case Strategy.Lossy =>
new Backpressure[F] {
override def metered[A](f: F[A]): F[Option[A]] =
sem
.tryAcquire
.bracket {
case true => f.map(_.some)
case false => none[A].pure[F]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could cache this value so it doesn't need to be recreated on every backpressured call, at the expense of always creating it once.

} {
case true => sem.release
case false => GC.unit
}
}
case Strategy.Lossless =>
new Backpressure[F] {
override def metered[A](f: F[A]): F[Option[A]] =
sem.permit.use(_ => f).map(_.some)
}
})

}

sealed trait Strategy
object Strategy {
case object Lossy extends Strategy
case object Lossless extends Strategy
}
}
51 changes: 51 additions & 0 deletions tests/shared/src/test/scala/cats/effect/std/BackpressureSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2020-2021 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect
package std

import cats.syntax.all._

import scala.concurrent.duration._

class BackpressureTests extends BaseSpec {

"Backpressure" should {
"Lossy Strategy should return IO[None] when no permits are available" in ticked {
implicit ticker =>
val test = for {
backpressure <- Backpressure[IO](Backpressure.Strategy.Lossy, 1)
never = backpressure.metered(IO.never)
lost <- IO.race(never, never)
} yield lost.fold(identity, identity).isEmpty

test must completeAs(true)
}

"Lossless Strategy should complete effects even when no permits are available" in ticked {
implicit ticker =>
val test = for {
backpressure <- Backpressure[IO](Backpressure.Strategy.Lossless, 1)
f1 <- backpressure.metered(IO.sleep(1.second) *> 1.pure[IO]).start
f2 <- backpressure.metered(IO.sleep(1.second) *> 2.pure[IO]).start
res1 <- f1.joinWithNever
res2 <- f2.joinWithNever
} yield (res1, res2)

test must completeAs((Some(1), Some(2)))
}
}
}