Skip to content

Commit

Permalink
Merge pull request #1910 from armanbilge/feature/weakasync-liftio
Browse files Browse the repository at this point in the history
Add `WeakAsync.liftIO`
  • Loading branch information
jatcwang authored Sep 5, 2023
2 parents 7040127 + 0681750 commit eca423e
Showing 1 changed file with 19 additions and 12 deletions.
31 changes: 19 additions & 12 deletions modules/free/src/main/scala/doobie/WeakAsync.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package doobie

import cats.~>
import cats.implicits._
import cats.effect.{ IO, LiftIO }
import cats.effect.kernel.{ Async, Poll, Resource, Sync }
import cats.effect.std.Dispatcher
import scala.concurrent.Future
Expand Down Expand Up @@ -44,19 +45,25 @@ object WeakAsync {
* `cats.effect.std.Dispatcher` the trasformation is based on is stateful and requires finalization.
* Leaking it from it's resource scope will lead to erorrs at runtime. */
def liftK[F[_], G[_]](implicit F: Async[F], G: WeakAsync[G]): Resource[F, F ~> G] =
Dispatcher.parallel[F].map(dispatcher =>
new(F ~> G) {
def apply[T](fa: F[T]) = // first try to interpret directly into G, then fallback to the Dispatcher
F.syncStep[G, T](fa, Int.MaxValue).flatMap { // MaxValue b/c we assume G will implement ceding/fairness
case Left(fa) =>
G.fromFutureCancelable {
G.delay(dispatcher.unsafeToFutureCancelable(fa)).map { case (fut, cancel) =>
(fut, G.fromFuture(G.delay(cancel())))
}
}
case Right(a) => G.pure(a)
Dispatcher.parallel[F].map(new Lifter(_))

/** Like [[liftK]] but specifically returns a [[LiftIO]] */
def liftIO[F[_]](implicit F: WeakAsync[F]): Resource[IO, LiftIO[F]] =
Dispatcher.parallel[IO].map(new Lifter(_) with LiftIO[F] {
def liftIO[A](ioa: IO[A]) = super[Lifter].apply(ioa)
})

private class Lifter[F[_], G[_]](dispatcher: Dispatcher[F])(implicit F: Async[F], G: WeakAsync[G]) extends (F ~> G) {
def apply[T](fa: F[T]) = // first try to interpret directly into G, then fallback to the Dispatcher
F.syncStep[G, T](fa, Int.MaxValue).flatMap { // MaxValue b/c we assume G will implement ceding/fairness
case Left(fa) =>
G.fromFutureCancelable {
G.delay(dispatcher.unsafeToFutureCancelable(fa)).map { case (fut, cancel) =>
(fut, G.fromFuture(G.delay(cancel())))
}
}
case Right(a) => G.pure(a)
}
)
}

}

0 comments on commit eca423e

Please sign in to comment.