Skip to content

Commit

Permalink
Merge pull request typelevel#2954 from b3nk3i/duration
Browse files Browse the repository at this point in the history
Duration
  • Loading branch information
djspiewak authored Jun 18, 2022
2 parents 25c2777 + 4a37b01 commit 3b4920d
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 13 deletions.
27 changes: 24 additions & 3 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import cats.{
}
import cats.data.Ior
import cats.effect.instances.spawn
import cats.effect.kernel.GenTemporal.handleFinite
import cats.effect.std.{Console, Env, UUIDGen}
import cats.effect.tracing.{Tracing, TracingEvent}
import cats.syntax.all._
Expand Down Expand Up @@ -666,7 +667,10 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
* is the time span for which we wait for the source to complete; in the event that the
* specified time has passed without the source completing, a `TimeoutException` is raised
*/
def timeout[A2 >: A](duration: FiniteDuration): IO[A2] =
def timeout[A2 >: A](duration: Duration): IO[A2] =
handleFinite(this, duration)(finiteDuration => timeout(finiteDuration))

private[effect] def timeout(duration: FiniteDuration): IO[A] =
timeoutTo(duration, IO.defer(IO.raiseError(new TimeoutException(duration.toString))))

/**
Expand All @@ -685,11 +689,16 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
* @param fallback
* is the task evaluated after the duration has passed and the source canceled
*/
def timeoutTo[A2 >: A](duration: FiniteDuration, fallback: IO[A2]): IO[A2] =
def timeoutTo[A2 >: A](duration: Duration, fallback: IO[A2]): IO[A2] = {
handleFinite[IO, A2](this, duration)(finiteDuration => timeoutTo(finiteDuration, fallback))
}

private[effect] def timeoutTo[A2 >: A](duration: FiniteDuration, fallback: IO[A2]): IO[A2] = {
race(IO.sleep(duration)).flatMap {
case Right(_) => fallback
case Left(value) => IO.pure(value)
}
}

/**
* Returns an IO that either completes with the result of the source within the specified time
Expand All @@ -709,7 +718,14 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
* @see
* [[timeout]] for a variant which respects backpressure and does not leak fibers
*/
def timeoutAndForget(duration: FiniteDuration): IO[A] =
private[effect] def timeoutAndForget(duration: FiniteDuration): IO[A] =
Temporal[IO].timeoutAndForget(this, duration)

/**
* @see
* [[timeoutAndForget]]
*/
def timeoutAndForget(duration: Duration): IO[A] =
Temporal[IO].timeoutAndForget(this, duration)

def timed: IO[(FiniteDuration, A)] =
Expand Down Expand Up @@ -1576,6 +1592,11 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
fa.timeout(duration)
}

override def timeout[A](fa: IO[A], duration: Duration)(
implicit ev: TimeoutException <:< Throwable): IO[A] = {
fa.timeout(duration)
}

def handleErrorWith[A](fa: IO[A])(f: Throwable => IO[A]): IO[A] =
fa.handleErrorWith(f)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.kernel.syntax

import cats.effect.kernel.GenTemporal

import scala.concurrent.duration.FiniteDuration

import java.util.concurrent.TimeoutException

private[syntax] trait GenTemporalOps_CompanionCompat {

@deprecated("Preserved for binary-compatibility", "3.4.0")
def timeoutTo$extension[F[_], A](
wrapped: F[A],
duration: FiniteDuration,
fallback: F[A],
F: GenTemporal[F, _]): F[A] =
F.timeoutTo(wrapped, duration, fallback)

}

private[syntax] trait GenTemporalOpsCompanionCompat {

@deprecated("Preserved for binary-compatibility", "3.4.0")
def timeout$extension[F[_], A, E](
wrapped: F[A],
duration: FiniteDuration,
F: GenTemporal[F, E],
timeoutToE: TimeoutException <:< E): F[A] = F.timeout(wrapped, duration)(timeoutToE)

@deprecated("Preserved for binary-compatibility", "3.4.0")
def timeoutAndForget$extension[F[_], A, E](
wrapped: F[A],
duration: FiniteDuration,
F: GenTemporal[F, E],
timeoutToE: TimeoutException <:< E): F[A] =
F.timeoutAndForget(wrapped, duration)(timeoutToE)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.kernel.syntax

private[syntax] trait GenTemporalOps_CompanionCompat

private[syntax] trait GenTemporalOpsCompanionCompat
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.kernel.syntax

private[syntax] trait GenTemporalOps_CompanionCompat

private[syntax] trait GenTemporalOpsCompanionCompat
40 changes: 32 additions & 8 deletions kernel/shared/src/main/scala/cats/effect/kernel/GenTemporal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cats.effect.kernel

import cats.{Applicative, MonadError, Monoid, Semigroup}
import cats.data._
import cats.effect.kernel.GenTemporal.handleFinite
import cats.syntax.all._

import scala.concurrent.TimeoutException
Expand Down Expand Up @@ -78,7 +79,10 @@ trait GenTemporal[F[_], E] extends GenConcurrent[F, E] with Clock[F] {
* @param fallback
* The task evaluated after the duration has passed and the source canceled
*/
def timeoutTo[A](fa: F[A], duration: FiniteDuration, fallback: F[A]): F[A] =
def timeoutTo[A](fa: F[A], duration: Duration, fallback: F[A]): F[A] =
handleFinite(fa, duration)(finiteDuration => timeoutTo(fa, finiteDuration, fallback))

protected def timeoutTo[A](fa: F[A], duration: FiniteDuration, fallback: F[A]): F[A] =
flatMap(race(fa, sleep(duration))) {
case Left(a) => pure(a)
case Right(_) => fallback
Expand All @@ -97,7 +101,11 @@ trait GenTemporal[F[_], E] extends GenConcurrent[F, E] with Clock[F] {
* The time span for which we wait for the source to complete; in the event that the
* specified time has passed without the source completing, a `TimeoutException` is raised
*/
def timeout[A](fa: F[A], duration: FiniteDuration)(
def timeout[A](fa: F[A], duration: Duration)(implicit ev: TimeoutException <:< E): F[A] = {
handleFinite(fa, duration)(finiteDuration => timeout(fa, finiteDuration))
}

protected def timeout[A](fa: F[A], duration: FiniteDuration)(
implicit ev: TimeoutException <:< E): F[A] = {
flatMap(race(fa, sleep(duration))) {
case Left(a) => pure(a)
Expand All @@ -110,20 +118,28 @@ trait GenTemporal[F[_], E] extends GenConcurrent[F, E] with Clock[F] {
* time `duration` or otherwise raises a `TimeoutException`.
*
* The source is canceled in the event that it takes longer than the specified time duration
* to complete. Unlike [[timeout]], the cancelation of the source will be ''requested'' but
* not awaited, and the exception will be raised immediately upon the completion of the timer.
* This may more closely match intuitions about timeouts, but it also violates backpressure
* guarantees and intentionally leaks fibers.
* to complete. Unlike
* [[timeout[A](fa:F[A],duration:scala\.concurrent\.duration\.Duration)* timeout]], the
* cancelation of the source will be ''requested'' but not awaited, and the exception will be
* raised immediately upon the completion of the timer. This may more closely match intuitions
* about timeouts, but it also violates backpressure guarantees and intentionally leaks
* fibers.
*
* This combinator should be applied very carefully.
*
* @param duration
* The time span for which we wait for the source to complete; in the event that the
* specified time has passed without the source completing, a `TimeoutException` is raised
* @see
* [[timeout]] for a variant which respects backpressure and does not leak fibers
* [[timeout[A](fa:F[A],duration:scala\.concurrent\.duration\.Duration)* timeout]] for a
* variant which respects backpressure and does not leak fibers
*/
def timeoutAndForget[A](fa: F[A], duration: FiniteDuration)(
def timeoutAndForget[A](fa: F[A], duration: Duration)(
implicit ev: TimeoutException <:< E): F[A] = {
handleFinite(fa, duration)(finiteDuration => timeoutAndForget(fa, finiteDuration))
}

protected def timeoutAndForget[A](fa: F[A], duration: FiniteDuration)(
implicit ev: TimeoutException <:< E): F[A] =
uncancelable { poll =>
implicit val F: GenTemporal[F, E] = this
Expand Down Expand Up @@ -263,6 +279,14 @@ object GenTemporal {
instantiateGenTemporalForWriterT(temporal)
}

private[effect] def handleFinite[F[_], A](fa: F[A], duration: Duration)(
f: FiniteDuration => F[A]): F[A] = {
duration match {
case _: Duration.Infinite => fa
case finite: FiniteDuration => f(finite)
}
}

private[kernel] def instantiateGenTemporalForWriterT[F[_], L, E](F0: GenTemporal[F, E])(
implicit L0: Monoid[L]): WriterTTemporal[F, L, E] =
new WriterTTemporal[F, L, E] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package cats.effect.kernel.syntax

import cats.effect.kernel.GenTemporal

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.{Duration, FiniteDuration}

import java.util.concurrent.TimeoutException

Expand All @@ -40,6 +40,9 @@ trait GenTemporalSyntax {
final class GenTemporalOps_[F[_], A] private[syntax] (private val wrapped: F[A])
extends AnyVal {

def timeoutTo(duration: Duration, fallback: F[A])(implicit F: GenTemporal[F, _]): F[A] =
F.timeoutTo(wrapped, duration, fallback)

def timeoutTo(duration: FiniteDuration, fallback: F[A])(implicit F: GenTemporal[F, _]): F[A] =
F.timeoutTo(wrapped, duration, fallback)

Expand All @@ -50,6 +53,8 @@ final class GenTemporalOps_[F[_], A] private[syntax] (private val wrapped: F[A])
F.andWait(wrapped, time)
}

object GenTemporalOps_ extends GenTemporalOps_CompanionCompat

final class GenTemporalOps[F[_], A, E] private[syntax] (private val wrapped: F[A])
extends AnyVal {

Expand All @@ -58,8 +63,20 @@ final class GenTemporalOps[F[_], A, E] private[syntax] (private val wrapped: F[A
timeoutToE: TimeoutException <:< E
): F[A] = F.timeout(wrapped, duration)

def timeout(duration: Duration)(
implicit F: GenTemporal[F, E],
timeoutToE: TimeoutException <:< E
): F[A] = F.timeout(wrapped, duration)

def timeoutAndForget(duration: FiniteDuration)(
implicit F: GenTemporal[F, E],
timeoutToE: TimeoutException <:< E
): F[A] = F.timeoutAndForget(wrapped, duration)

def timeoutAndForget(duration: Duration)(
implicit F: GenTemporal[F, E],
timeoutToE: TimeoutException <:< E
): F[A] = F.timeoutAndForget(wrapped, duration)
}

object GenTemporalOps extends GenTemporalOpsCompanionCompat
15 changes: 14 additions & 1 deletion kernel/shared/src/test/scala/cats/effect/kernel/SyntaxSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import cats.implicits._
import org.specs2.mutable.Specification

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.{Duration, FiniteDuration}

class SyntaxSpec extends Specification {

Expand Down Expand Up @@ -157,6 +157,13 @@ class SyntaxSpec extends Specification {
result: F[A]
}

{
val param1: Duration = null.asInstanceOf[Duration]
val param2: F[A] = null.asInstanceOf[F[A]]
val result = target.timeoutTo(param1, param2)
result: F[A]
}

{
val param: FiniteDuration = null.asInstanceOf[FiniteDuration]
val result = target.delayBy(param)
Expand All @@ -180,6 +187,12 @@ class SyntaxSpec extends Specification {
val result = target.timeout(param)
result: F[A]
}

{
val param: Duration = null.asInstanceOf[Duration]
val result = target.timeout(param)
result: F[A]
}
}

def asyncSyntax[F[_], A](target: F[A])(implicit F: Async[F]) = {
Expand Down
24 changes: 24 additions & 0 deletions laws/shared/src/test/scala/cats/effect/laws/GenTemporalSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,30 @@ class GenTemporalSpec extends Specification { outer =>

val loop: TimeT[F, Unit] = F.sleep(5.millis).foreverM

"temporal" should {
"timeout" should {
"return identity when infinite duration" in {
val fa = F.pure(true)
F.timeout(fa, Duration.Inf) mustEqual fa
}
}

"timeoutTo" should {
"return identity when infinite duration" in {
val fa: TimeT[F, Boolean] = F.pure(true)
val fallback: TimeT[F, Boolean] = F.raiseError(new RuntimeException)
F.timeoutTo(fa, Duration.Inf, fallback) mustEqual fa
}
}

"timeoutAndForget" should {
"return identity when infinite duration" in {
val fa: TimeT[F, Boolean] = F.pure(true)
F.timeoutAndForget(fa, Duration.Inf) mustEqual fa
}
}
}

// TODO enable these tests once Temporal for TimeT is fixed
/*"temporal" should {
"timeout" should {
Expand Down

0 comments on commit 3b4920d

Please sign in to comment.