Skip to content

Commit

Permalink
Merge pull request #3074 from djspiewak/bug/interruptibleMany
Browse files Browse the repository at this point in the history
Resolve race condition in `interruptibleMany` after interruption
  • Loading branch information
armanbilge authored Jul 9, 2022
2 parents 07359bb + a3ae9d6 commit eacb835
Showing 1 changed file with 36 additions and 12 deletions.
48 changes: 36 additions & 12 deletions core/jvm/src/main/scala/cats/effect/IOFiberPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ private[effect] abstract class IOFiberPlatform[A] extends AtomicBoolean(false) {
cb <- IO(new AtomicReference[Either[Throwable, Unit] => Unit](null))

canInterrupt <- IO(new juc.Semaphore(0))
manyDone <- IO(new AtomicBoolean(false))

target <- IO uncancelable { _ =>
IO.async[Thread] { initCb =>
Expand All @@ -58,23 +59,41 @@ private[effect] abstract class IOFiberPlatform[A] extends AtomicBoolean(false) {
val result =
try {
canInterrupt.release()
val back = Right(cur.thunk())

val back =
try {
Right(cur.thunk())
} catch {
// this won't suppress the interruption
case NonFatal(t) => Left(t)
}

// this is why it has to be a semaphore rather than an atomic boolean
// this needs to hard-block if we're in the process of being interrupted
// once we acquire this lock, we cannot be interrupted
canInterrupt.acquire()

if (many) {
manyDone.set(true) // in this case, we weren't interrupted
}

back
} catch {
case _: InterruptedException =>
null

case NonFatal(t) =>
Left(t)
} finally {
canInterrupt.tryAcquire()
done.set(true)

if (!many) {
if (many) {
// wait for the hot loop to finish
// we can probably also do this with canInterrupt, but that seems confusing
// this needs to be a busy-wait otherwise it will be interrupted
while (!manyDone.get()) {}
Thread.interrupted() // clear the status

()
} else {
val cb0 = cb.getAndSet(null)
if (cb0 != null) {
cb0(RightUnit)
Expand Down Expand Up @@ -115,16 +134,21 @@ private[effect] abstract class IOFiberPlatform[A] extends AtomicBoolean(false) {

val repeat = if (many) {
IO {
while (!done.get()) {
if (canInterrupt.tryAcquire()) {
try {
while (!done.get()) {
target.interrupt() // it's hammer time!
// we need the outer try because we may be done *before* we enter
try {
while (!done.get()) {
if (canInterrupt.tryAcquire()) {
try {
while (!done.get()) {
target.interrupt() // it's hammer time!
}
} finally {
canInterrupt.release()
}
} finally {
canInterrupt.release()
}
}
} finally {
manyDone.set(true) // signal that we're done looping
}

finCb(RightUnit)
Expand Down

0 comments on commit eacb835

Please sign in to comment.