diff --git a/app.dhall b/app.dhall index b4e70f8..54e9700 100644 --- a/app.dhall +++ b/app.dhall @@ -1,6 +1,6 @@ -let http4sVersion = "0.22.15" +let http4sVersion = "0.23.21" -let finagleVersion = "22.3.0" +let finagleVersion = "22.12.0" in { version = "${http4sVersion}-${finagleVersion}" , http4sVersion diff --git a/build.sbt b/build.sbt index 5b023fe..f95d5d1 100644 --- a/build.sbt +++ b/build.sbt @@ -1,11 +1,9 @@ -import Dependencies._ - val scala213 = "2.13.8" val scala212 = "2.12.15" val dotty = "3.0.2" -val Http4sVersion = "0.22.15" -val FinagleVersion = "22.3.0" +val Http4sVersion = "0.23.21" +val FinagleVersion = "22.12.0" val supportedScalaVersions = List(scala213,scala212,dotty) inScope(Scope.GlobalScope)( diff --git a/project/Dependencies.scala b/project/Dependencies.scala deleted file mode 100644 index e1ac989..0000000 --- a/project/Dependencies.scala +++ /dev/null @@ -1,5 +0,0 @@ -import sbt._ - -object Dependencies { - lazy val scalaTest = "org.scalatest" %% "scalatest" % "3.1.1" -} diff --git a/src/main/scala/org/http4s/finagle/Finagle.scala b/src/main/scala/org/http4s/finagle/Finagle.scala index 3b53c52..d32f961 100644 --- a/src/main/scala/org/http4s/finagle/Finagle.scala +++ b/src/main/scala/org/http4s/finagle/Finagle.scala @@ -3,6 +3,7 @@ package finagle import org.http4s.client._ import cats.effect._ +import cats.effect.std.Dispatcher import cats.syntax.functor._ import com.twitter.finagle.{Http, Service} import com.twitter.finagle.http.{Method, Version, Request => Req, Response => Resp} @@ -15,28 +16,29 @@ import com.twitter.util.Promise import cats.syntax.apply._ import com.twitter.finagle.http.Fields import com.twitter.util.Base64StringEncoder - import java.nio.charset.StandardCharsets import org.http4s.{Method => H4Method} import org.typelevel.ci._ +import scala.util.{Failure, Success} +import scala.concurrent.ExecutionContext object Finagle { - def mkClient[F[_]](dest: String)(implicit F: ConcurrentEffect[F]): Resource[F, Client[F]] = + def mkClient[F[_]: Async](dest: String): Resource[F, Client[F]] = mkClient(Http.newService(dest)) - def mkClient[F[_]](svc: Service[Req, Resp])( - implicit F: ConcurrentEffect[F]): Resource[F, Client[F]] = + def mkClient[F[_]: Async](svc: Service[Req, Resp]): Resource[F, Client[F]] = Resource .make(allocate(svc)) { _ => - F.delay(()) + Async[F].delay(()) } - def mkService[F[_]: ConcurrentEffect](route: HttpApp[F]): Service[Req, Resp] = - (req: Req) => toFuture(route.local(toHttp4sReq[F]).flatMapF(fromHttp4sResponse[F]).run(req)) + def mkService[F[_]: Async](route: HttpApp[F])(implicit ec: ExecutionContext): Resource[F, Service[Req, Resp]] = + Dispatcher.parallel[F] map { dispatcher => + (req: Req) => toFuture(dispatcher, route.local(toHttp4sReq[F]).flatMapF(fromHttp4sResponse[F]).run(req)) + } - private def allocate[F[_]](svc: Service[Req, Resp])( - implicit F: ConcurrentEffect[F]): F[Client[F]] = - F.delay(Client[F] { req => + private def allocate[F[_]: Async](svc: Service[Req, Resp]): F[Client[F]] = + Async[F].delay(Client[F] { req => Resource .eval(for { freq <- fromHttp4sReq(req) @@ -56,7 +58,7 @@ object Finagle { Request(method, uri, version, headers, body) } - private def fromHttp4sResponse[F[_]: Concurrent](resp: Response[F]): F[Resp] = { + private def fromHttp4sResponse[F[_]: Async](resp: Response[F]): F[Resp] = { import com.twitter.finagle.http.Status val status = Status(resp.status.code) val headers = resp.headers.headers.map(h => (h.name.show, h.value)) @@ -75,7 +77,7 @@ object Finagle { writeBody.as(finagleResp) } - def fromHttp4sReq[F[_]](req: Request[F])(implicit F: Concurrent[F]): F[Req] = { + def fromHttp4sReq[F[_]: Async](req: Request[F]): F[Req] = { val method = Method(req.method.name) val version = Version(req.httpVersion.major, req.httpVersion.minor) val request = Req(version, method, req.uri.toString) @@ -92,7 +94,7 @@ object Finagle { if (req.isChunked) { request.headerMap.remove(Fields.TransferEncoding) request.setChunked(true) - Concurrent[F].start(streamBody(req.body, request.writer).compile.drain) *> F.delay(request) + Spawn[F].start(streamBody(req.body, request.writer).compile.drain) *> Async[F].delay(request) } else { req.as[Array[Byte]].map { b => if(b.nonEmpty) { @@ -100,7 +102,7 @@ object Finagle { request.content = content request.contentLength = content.length.longValue() } - } *> F.delay(request) + } *> Async[F].delay(request) } } @@ -127,21 +129,22 @@ object Finagle { } } - private def toF[F[_], A](f: Future[A])(implicit F: Async[F]): F[A] = F.async { cb => - f.respond { - case Return(value) => cb(Right(value)) - case Throw(exception) => cb(Left(exception)) + private def toF[F[_]: Async, A](f: Future[A]): F[A] = Async[F].async_ { cb => + f.respond { + case Return(value) => + cb(Right(value)) + case Throw(exception) => + cb(Left(exception)) + } + () } - () - } - private def toFuture[F[_]: Effect, A](f: F[A]): Future[A] = { - val promise: Promise[A] = Promise() - Effect[F] - .runAsync(f) { - case Right(value) => IO(promise.setValue(value)) - case Left(exception) => IO(promise.setException(exception)) + + private def toFuture[F[_], A](dispatcher: Dispatcher[F], f: F[A])(implicit ec: ExecutionContext): Future[A] = { + val promise: Promise[A] = Promise() + dispatcher.unsafeToFuture(f).onComplete { + case Success(value) => promise.setValue(value) + case Failure(exception) => promise.setException(exception) } - .unsafeRunSync() promise } } diff --git a/src/test/scala/org/http4s/finagle/FinagleSpec.scala b/src/test/scala/org/http4s/finagle/FinagleSpec.scala index 7063de8..792f0fe 100644 --- a/src/test/scala/org/http4s/finagle/FinagleSpec.scala +++ b/src/test/scala/org/http4s/finagle/FinagleSpec.scala @@ -6,44 +6,47 @@ import org.http4s.multipart._ import org.http4s._ import org.http4s.dsl.io._ import org.http4s.client.dsl.io._ -import org.http4s.implicits._ +import org.http4s.syntax.all._ import cats.implicits._ -import scala.concurrent.ExecutionContext import client._ import cats.effect._ + import scala.concurrent.duration._ import fs2._ import org.scalacheck.Prop._ import com.twitter.finagle.http.RequestBuilder +import cats.effect.unsafe.implicits.{global => runtime} + +import scala.concurrent.ExecutionContext class FinagleSpec extends munit.FunSuite with munit.ScalaCheckSuite { - implicit val context: ContextShift[IO] = IO.contextShift(ExecutionContext.global) - implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) + implicit val ec: ExecutionContext = runtime.compute val service = Finagle.mkService{HttpRoutes.of[IO] { case req @ _ -> Root / "echo" => Ok(req.as[String]) case GET -> Root / "simple" => Ok("simple path") case req @ POST -> Root / "chunked" => Response[IO](Ok) .withEntity(Stream.emits(req.as[String].unsafeRunSync().toSeq.map(_.toString)).covary[IO]) .pure[IO] - case GET -> Root / "delayed" => timer.sleep(1.second) *> + case GET -> Root / "delayed" => IO.sleep(1.second) *> Ok("delayed path") case GET -> Root / "no-content" => NoContent() case GET -> Root / "not-found" => NotFound("not found") case GET -> Root / "empty-not-found" => NotFound() case GET -> Root / "internal-error" => InternalServerError() - }.orNotFound} + }.orNotFound}.allocated.unsafeRunSync() var client: (Client[IO], IO[Unit]) = null var server: com.twitter.finagle.ListeningServer = null override def beforeAll(): Unit = { - client = Finagle.mkClient[IO]("localhost:8080").allocated[IO, Client[IO]].unsafeRunSync() - server = com.twitter.finagle.Http.serve(":8080", service) + client = Finagle.mkClient[IO]("localhost:8080").allocated[Client[IO]].unsafeRunSync() + server = com.twitter.finagle.Http.serve(":8080", service._1) () } override def afterAll():Unit = { server.close() client._2.unsafeRunSync() + service._2.unsafeRunSync() () } val localhost = Uri.unsafeFromString("http://localhost:8080") @@ -109,7 +112,7 @@ class FinagleSpec extends munit.FunSuite with munit.ScalaCheckSuite { request = POST(multipart, localhost / "echo").withHeaders(multipart.headers) } yield request assert( - client._1.expect[String](req).unsafeRunSync().contains(value) == true + req.flatMap(r => client._1.expect[String](r)).unsafeRunSync().contains(value) ) } }