Skip to content

Commit

Permalink
Add some uncancelables
Browse files Browse the repository at this point in the history
  • Loading branch information
armanbilge committed Dec 20, 2022
1 parent 72b05a7 commit d18fa76
Showing 1 changed file with 50 additions and 46 deletions.
96 changes: 50 additions & 46 deletions core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,58 +109,62 @@ object EpollSystem extends PollingSystem {

def pollReadRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] =
readSemaphore.permit.surround {
def go(a: A, before: Int): IO[B] =
f(a).flatMap {
case Left(a) =>
IO(readReadyCounter).flatMap { after =>
if (before != after)
// there was a read-ready notification since we started, try again immediately
go(a, after)
else
IO.async[Int] { cb =>
IO {
readCallback = cb
// check again before we suspend
val now = readReadyCounter
if (now != before) {
cb(Right(now))
readCallback = null
None
} else Some(IO(this.readCallback = null))
}
}.flatMap(go(a, _))
}
case Right(b) => IO.pure(b)
}
IO.uncancelable { poll =>
def go(a: A, before: Int): IO[B] =
poll(f(a)).flatMap {
case Left(a) =>
IO(readReadyCounter).flatMap { after =>
if (before != after)
// there was a read-ready notification since we started, try again immediately
go(a, after)
else
poll(IO.async[Int] { cb =>
IO {
readCallback = cb
// check again before we suspend
val now = readReadyCounter
if (now != before) {
cb(Right(now))
readCallback = null
None
} else Some(IO(this.readCallback = null))
}
}).flatMap(go(a, _))
}
case Right(b) => IO.pure(b)
}
}

IO(readReadyCounter).flatMap(go(a, _))
}

def pollWriteRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] =
writeSemaphore.permit.surround {
def go(a: A, before: Int): IO[B] =
f(a).flatMap {
case Left(a) =>
IO(writeReadyCounter).flatMap { after =>
if (before != after)
// there was a write-ready notification since we started, try again immediately
go(a, after)
else
IO.async[Int] { cb =>
IO {
writeCallback = cb
// check again before we suspend
val now = writeReadyCounter
if (now != before) {
cb(Right(now))
writeCallback = null
None
} else Some(IO(this.writeCallback = null))
}
}.flatMap(go(a, _))
}
case Right(b) => IO.pure(b)
}
IO.uncancelable { poll =>
def go(a: A, before: Int): IO[B] =
poll(f(a)).flatMap {
case Left(a) =>
IO(writeReadyCounter).flatMap { after =>
if (before != after)
// there was a write-ready notification since we started, try again immediately
go(a, after)
else
poll(IO.async[Int] { cb =>
IO {
writeCallback = cb
// check again before we suspend
val now = writeReadyCounter
if (now != before) {
cb(Right(now))
writeCallback = null
None
} else Some(IO(this.writeCallback = null))
}
}).flatMap(go(a, _))
}
case Right(b) => IO.pure(b)
}
}

IO(writeReadyCounter).flatMap(go(a, _))
}
Expand Down

0 comments on commit d18fa76

Please sign in to comment.