Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cancelation leak in fromFuture, fromFutureCancelable #3892

Merged
merged 12 commits into from
Nov 23, 2023
18 changes: 10 additions & 8 deletions kernel/shared/src/main/scala/cats/effect/kernel/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,21 +212,23 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] {
* [[fromFutureCancelable]] for a cancelable version
*/
def fromFuture[A](fut: F[Future[A]]): F[A] =
flatMap(fut) { f =>
flatMap(executionContext) { implicit ec =>
async_[A](cb => f.onComplete(t => cb(t.toEither)))
async_[A] { cb =>
flatMap(fut) { f =>
map(executionContext) { implicit ec => f.onComplete(t => cb(t.toEither)) }
}
}

/**
* Like [[fromFuture]], but is cancelable via the provided finalizer.
*/
def fromFutureCancelable[A](futCancel: F[(Future[A], F[Unit])]): F[A] =
flatMap(futCancel) {
case (fut, fin) =>
flatMap(executionContext) { implicit ec =>
async[A](cb => as(delay(fut.onComplete(t => cb(t.toEither))), Some(fin)))
}
async[A] { cb =>
flatMap(futCancel) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have to use uncancelable+poll. Otherwise starting the Future with futCancel is not cancelable, and it can be. It's only after we've started it, that we need guarantee the async is setup appropriately.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope that is sufficient anyway, otherwise we'd have to drop down to cont 😬

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well I did only say that I'd fix the leak 😛

Yeah I was wondering about that as I wrote it. I'll give it a go with uncancelable/poll. Although I did notice that in AsyncPlatform#fromCompletableFuture we changed it to use cont. I don't suppose you remember why?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't suppose you remember why?

That's because the cancelation protocol for CompletableFuture is more complex: even if you request cancelation, it may indicate to you that it hasn't canceled in which case you need to fallback to waiting for the result. The best way to express this is with cont.

(Aside, but CE doesn't really handle this well, we still have leaks even in the cont-based implementation #3474).

case (fut, fin) =>
flatMap(executionContext) { implicit ec =>
as(delay(fut.onComplete(t => cb(t.toEither))), Some(fin))
}
}
}

/**
Expand Down
Loading