Skip to content

Commit

Permalink
Merge pull request #4067 from durban/issue4066
Browse files Browse the repository at this point in the history
Fix #4066: shut down executors when IORuntime.global shuts down
  • Loading branch information
djspiewak authored Apr 26, 2024
2 parents ad5f8c5 + 1e554c4 commit 771a51c
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,15 @@ private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type
def global: IORuntime = {
if (_global == null) {
installGlobal {
val (compute, _) = createWorkStealingComputeThreadPool()
val (blocking, _) = createDefaultBlockingExecutionContext()
val (compute, computeDown) = createWorkStealingComputeThreadPool()
val (blocking, blockingDown) = createDefaultBlockingExecutionContext()
val shutdown = () => {
computeDown()
blockingDown()
resetGlobal()
}

IORuntime(compute, blocking, compute, () => resetGlobal(), IORuntimeConfig())
IORuntime(compute, blocking, compute, shutdown, IORuntimeConfig())
}
}

Expand Down
7 changes: 7 additions & 0 deletions tests/jvm/src/test/scala/cats/effect/IOAppSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,13 @@ class IOAppSpec extends Specification {
ok
}

"shut down WSTP on fatal error without IOApp" in {
val h = platform(FatalErrorShutsDownRt, List.empty)
h.awaitStatus()
h.stdout() must not(contain("sadness"))
h.stdout() must contain("done")
}

"support main thread evaluation" in {
val h = platform(EvalOnMainThread, List.empty)
h.awaitStatus() mustEqual 0
Expand Down
33 changes: 33 additions & 0 deletions tests/shared/src/main/scala/catseffect/examples.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import cats.effect.std.{Console, Random}
import cats.effect.unsafe.{IORuntime, IORuntimeConfig, Scheduler}
import cats.syntax.all._

import scala.concurrent.Await
import scala.concurrent.duration._

package examples {

import java.util.concurrent.TimeoutException

object HelloWorld extends IOApp.Simple {
def run: IO[Unit] =
IO(println("Hello, World!"))
Expand Down Expand Up @@ -58,6 +61,36 @@ package examples {
}
}

object FatalErrorShutsDownRt extends RawApp {
def main(args: Array[String]): Unit = {
val rt = cats.effect.unsafe.IORuntime.global
@volatile var thread: Thread = null
val action = for {
// make sure a blocking thread exists, save it:
_ <- IO.blocking {
thread = Thread.currentThread()
}
// get back on the WSTP:
_ <- IO.cede
// fatal error on the WSTP thread:
_ <- IO {
throw new OutOfMemoryError("Boom!")
}.attempt.flatMap(_ => IO.println("sadness (attempt)"))
} yield ()
val fut = action.unsafeToFuture()(rt)
try {
Await.ready(fut, atMost = 2.seconds)
} catch {
case _: TimeoutException => println("sadness (timeout)")
}
Thread.sleep(500L)
// by now the WSTP (and all its threads) must've been shut down:
if (thread eq null) println("sadness (thread is null)")
else if (thread.isAlive()) println("sadness (thread is alive)")
println("done")
}
}

object RaiseFatalErrorAttempt extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
IO.raiseError[Unit](new OutOfMemoryError("Boom!"))
Expand Down

0 comments on commit 771a51c

Please sign in to comment.