Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cancelation leak in fromFuture, fromFutureCancelable #3892

Merged
merged 12 commits into from
Nov 23, 2023
14 changes: 7 additions & 7 deletions kernel/shared/src/main/scala/cats/effect/kernel/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,21 +212,21 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] {
* [[fromFutureCancelable]] for a cancelable version
*/
def fromFuture[A](fut: F[Future[A]]): F[A] =
flatMap(fut) { f =>
flatMap(executionContext) { implicit ec =>
async_[A](cb => f.onComplete(t => cb(t.toEither)))
flatMap(executionContext) { implicit ec =>
uncancelable { poll =>
flatMap(poll(fut)) { f => async_[A](cb => f.onComplete(t => cb(t.toEither))) }
}
}

/**
* Like [[fromFuture]], but is cancelable via the provided finalizer.
*/
def fromFutureCancelable[A](futCancel: F[(Future[A], F[Unit])]): F[A] =
flatMap(futCancel) {
case (fut, fin) =>
flatMap(executionContext) { implicit ec =>
flatMap(executionContext) { implicit ec =>
flatMap(uncancelable(_(futCancel))) {
case (fut, fin) =>
async[A](cb => as(delay(fut.onComplete(t => cb(t.toEither))), Some(fin)))
}
}
}

/**
Expand Down
66 changes: 65 additions & 1 deletion tests/shared/src/test/scala/cats/effect/kernel/AsyncSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@
package cats.effect
package kernel

import cats.syntax.all._
import cats.{Eq, Order, StackSafeMonad}
import cats.arrow.FunctionK
import cats.effect.laws.AsyncTests
import cats.laws.discipline.arbitrary._
import cats.effect.testkit.TestControl
import cats.effect.unsafe.IORuntimeConfig

import cats.effect.std.Random

import org.scalacheck.{Arbitrary, Cogen, Prop}
import org.scalacheck.Arbitrary.arbitrary
import org.typelevel.discipline.specs2.mutable.Discipline

import scala.concurrent.ExecutionContext
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.duration._
import java.util.concurrent.atomic.AtomicBoolean

class AsyncSpec extends BaseSpec with Discipline {

Expand All @@ -43,6 +49,64 @@ class AsyncSpec extends BaseSpec with Discipline {
) /*(Parameters(seed = Some(Seed.fromBase64("ZxDXpm7_3Pdkl-Fvt8M90Cxfam9wKuzcifQ1QsIJxND=").get)))*/
}

"fromFuture" should {
"not leak the future on cancelation" in real {
import scala.concurrent.ExecutionContext.Implicits.global

implicit val rand: Random[IO] = Random.javaUtilConcurrentThreadLocalRandom[IO]

val jitter = Random[IO].betweenInt(1, 5).flatMap(n => IO.sleep(n.micros))

val run = IO(new AtomicBoolean(false) -> new AtomicBoolean(false)).flatMap {
case (started, finished) =>
val future = IO {
Future {
started.set(true)
Thread.sleep(20)
finished.set(true)
}
}

(jitter >> IO.fromFuture(future)).race(jitter).map { _ =>
val wasStarted = started.get
val wasFinished = finished.get

wasStarted mustEqual wasFinished
}

}

List.fill(100)(run).sequence
}
}

"fromFuture" should {
"backpressure on cancelation" in real {
// a non-cancelable, never-completing Future
def mkf() = Promise[Unit]().future
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def mkf() = Promise[Unit]().future
def mkf() = Future.never


def go = for {
started <- IO(new AtomicBoolean)
fiber <- IO.fromFuture {
IO {
started.set(true)
mkf()
}
}.start
_ <- IO.cede.whileM_(IO(!started.get))
_ <- fiber.cancel
} yield ()

TestControl
.executeEmbed(go, IORuntimeConfig(1, 2))
.as(false)
.recover { case _: TestControl.NonTerminationException => true }
.replicateA(1000)
.map(_.forall(identity(_)))
}

}

final class AsyncIO[A](val io: IO[A])

implicit def asyncForAsyncIO: Async[AsyncIO] = new Async[AsyncIO]
Expand Down
Loading