Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented unhandled error reporting for orphan fibers #2868

Merged
merged 3 commits into from
Mar 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,10 @@ lazy val core = crossProject(JSPlatform, JVMPlatform)
ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.WorkerThread$Data"),
// introduced by #2844, Thread local fallback weak bag
// changes to `cats.effect.unsafe` package private code
ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.SynchronizedWeakBag")
ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.SynchronizedWeakBag"),
// introduced by #2868
// added signaling from CallbackStack to indicate successful invocation
ProblemFilters.exclude[DirectMissingMethodProblem]("cats.effect.CallbackStack.apply")
) ++ {
if (isDotty.value) {
// Scala 3 specific exclusions
Expand Down
18 changes: 12 additions & 6 deletions core/shared/src/main/scala/cats/effect/CallbackStack.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,25 @@ private final class CallbackStack[A](private[this] var callback: OutcomeIO[A] =>
}

/**
* Invokes *all* non-null callbacks in the queue, starting with the current one.
* Invokes *all* non-null callbacks in the queue, starting with the current one. Returns true
* iff *any* callbacks were invoked.
*/
@tailrec
def apply(oc: OutcomeIO[A]): Unit = {
def apply(oc: OutcomeIO[A], invoked: Boolean): Boolean = {
val cb = callback
if (cb != null) {

val invoked2 = if (cb != null) {
cb(oc)
true
} else {
invoked
}

val next = get()
if (next != null) {
next(oc)
}
if (next != null)
next(oc, invoked2)
else
invoked2
}

/**
Expand Down
7 changes: 6 additions & 1 deletion core/shared/src/main/scala/cats/effect/IOFiber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,12 @@ private final class IOFiber[A] private (
outcome = oc

try {
callbacks(oc)
if (!callbacks(oc, false)) {
oc match {
case Outcome.Errored(e) => currentCtx.reportFailure(e)
case _ => ()
}
}
} finally {
callbacks.lazySet(null) /* avoid leaks */
}
Expand Down
42 changes: 42 additions & 0 deletions tests/shared/src/test/scala/cats/effect/IOSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,48 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification {
}(_ => IO.raiseError(WrongException))
io.attempt must completeAs(Left(TestException))
})

"report unhandled failure to the execution context" in ticked { implicit ticker =>
case object TestException extends RuntimeException

val action = IO.executionContext flatMap { ec =>
IO defer {
var ts: List[Throwable] = Nil

val ec2 = new ExecutionContext {
def reportFailure(t: Throwable) = ts ::= t
def execute(r: Runnable) = ec.execute(r)
}

IO.raiseError(TestException).start.evalOn(ec2) *> IO.sleep(10.millis) *> IO(ts)
}
}

action must completeAs(List(TestException))
}

"not report observed failures to the execution context" in ticked { implicit ticker =>
case object TestException extends RuntimeException

val action = IO.executionContext flatMap { ec =>
IO defer {
var ts: List[Throwable] = Nil

val ec2 = new ExecutionContext {
def reportFailure(t: Throwable) = ts ::= t
def execute(r: Runnable) = ec.execute(r)
}

for {
f <- (IO.sleep(10.millis) *> IO.raiseError(TestException)).start.evalOn(ec2)
_ <- f.join
back <- IO(ts)
} yield back
}
}

action must completeAs(Nil)
}
}

"suspension of side effects" should {
Expand Down