Skip to content

Commit

Permalink
Merge pull request typelevel#2868 from djspiewak/bug/report-fiber-fai…
Browse files Browse the repository at this point in the history
…lure
  • Loading branch information
djspiewak authored Mar 19, 2022
2 parents 544b95d + 489770d commit 1a0893c
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 8 deletions.
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,10 @@ lazy val core = crossProject(JSPlatform, JVMPlatform)
ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.WorkerThread$Data"),
// introduced by #2844, Thread local fallback weak bag
// changes to `cats.effect.unsafe` package private code
ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.SynchronizedWeakBag")
ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.SynchronizedWeakBag"),
// introduced by #2868
// added signaling from CallbackStack to indicate successful invocation
ProblemFilters.exclude[DirectMissingMethodProblem]("cats.effect.CallbackStack.apply")
) ++ {
if (isDotty.value) {
// Scala 3 specific exclusions
Expand Down
18 changes: 12 additions & 6 deletions core/shared/src/main/scala/cats/effect/CallbackStack.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,25 @@ private final class CallbackStack[A](private[this] var callback: OutcomeIO[A] =>
}

/**
* Invokes *all* non-null callbacks in the queue, starting with the current one.
* Invokes *all* non-null callbacks in the queue, starting with the current one. Returns true
* iff *any* callbacks were invoked.
*/
@tailrec
def apply(oc: OutcomeIO[A]): Unit = {
def apply(oc: OutcomeIO[A], invoked: Boolean): Boolean = {
val cb = callback
if (cb != null) {

val invoked2 = if (cb != null) {
cb(oc)
true
} else {
invoked
}

val next = get()
if (next != null) {
next(oc)
}
if (next != null)
next(oc, invoked2)
else
invoked2
}

/**
Expand Down
7 changes: 6 additions & 1 deletion core/shared/src/main/scala/cats/effect/IOFiber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,12 @@ private final class IOFiber[A] private (
outcome = oc

try {
callbacks(oc)
if (!callbacks(oc, false)) {
oc match {
case Outcome.Errored(e) => currentCtx.reportFailure(e)
case _ => ()
}
}
} finally {
callbacks.lazySet(null) /* avoid leaks */
}
Expand Down
42 changes: 42 additions & 0 deletions tests/shared/src/test/scala/cats/effect/IOSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,48 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification {
}(_ => IO.raiseError(WrongException))
io.attempt must completeAs(Left(TestException))
})

"report unhandled failure to the execution context" in ticked { implicit ticker =>
case object TestException extends RuntimeException

val action = IO.executionContext flatMap { ec =>
IO defer {
var ts: List[Throwable] = Nil

val ec2 = new ExecutionContext {
def reportFailure(t: Throwable) = ts ::= t
def execute(r: Runnable) = ec.execute(r)
}

IO.raiseError(TestException).start.evalOn(ec2) *> IO.sleep(10.millis) *> IO(ts)
}
}

action must completeAs(List(TestException))
}

"not report observed failures to the execution context" in ticked { implicit ticker =>
case object TestException extends RuntimeException

val action = IO.executionContext flatMap { ec =>
IO defer {
var ts: List[Throwable] = Nil

val ec2 = new ExecutionContext {
def reportFailure(t: Throwable) = ts ::= t
def execute(r: Runnable) = ec.execute(r)
}

for {
f <- (IO.sleep(10.millis) *> IO.raiseError(TestException)).start.evalOn(ec2)
_ <- f.join
back <- IO(ts)
} yield back
}
}

action must completeAs(Nil)
}
}

"suspension of side effects" should {
Expand Down

0 comments on commit 1a0893c

Please sign in to comment.