Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execute IO.blocking on WSTP without BlockContext indirection #3903

Merged
merged 9 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ private[effect] sealed abstract class WorkStealingThreadPool private ()
task: Runnable,
fallback: Scheduler): Runnable
private[effect] def canExecuteBlockingCode(): Boolean
private[effect] def prepareBlocking(): Unit
private[unsafe] def liveTraces(): (
Map[Runnable, Trace],
Map[WorkerThread, (Thread.State, Option[(Runnable, Trace)], Map[Runnable, Trace])],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,16 @@ private[effect] final class WorkStealingThreadPool(
}
}

/**
* Prepares the current thread for running blocking code. This should be called only if
* [[canExecuteBlockingCode]] returns `true`.
*/
private[effect] def prepareBlocking(): Unit = {
val thread = Thread.currentThread()
val worker = thread.asInstanceOf[WorkerThread]
worker.prepareBlocking()
}

/**
* Schedules a fiber for execution on this thread pool originating from an external thread (a
* thread which is not owned by this thread pool).
Expand Down
22 changes: 13 additions & 9 deletions core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ private final class WorkerThread(
}

/**
* A mechanism for executing support code before executing a blocking action.
* Support code that must be run before executing a blocking action on this thread.
*
* The current thread creates a replacement worker thread (or reuses a cached one) that will
* take its place in the pool and does a complete transfer of ownership of the data structures
Expand All @@ -797,16 +797,14 @@ private final class WorkerThread(
* There is no reason to enclose any code in a `try/catch` block because the only way this
* code path can be exercised is through `IO.delay`, which already handles exceptions.
*/
override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = {
def prepareBlocking(): Unit = {
val rnd = random

pool.notifyParked(rnd)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated, but I wonder why we invoke notifyParked() unconditionally, even if this thread is already blocking = true.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I'm guessing there's no reason and we can probably guard that notification as you say.


if (blocking) {
// This `WorkerThread` is already inside an enclosing blocking region.
// There is no need to spawn another `WorkerThread`. Instead, directly
// execute the blocking action.
thunk
// This `WorkerThread` has already been prepared for blocking.
// There is no need to spawn another `WorkerThread`.
} else {
// Spawn a new `WorkerThread` to take the place of this thread, as the
// current thread prepares to execute a blocking action.
Expand All @@ -819,7 +817,7 @@ private final class WorkerThread(
cedeBypass = null
}

// Logically enter the blocking region.
// Logically become a blocking thread.
blocking = true

val prefix = pool.blockerThreadPrefix
Expand Down Expand Up @@ -853,11 +851,17 @@ private final class WorkerThread(
pool.blockedWorkerThreadCounter.incrementAndGet()
clone.start()
}

thunk
}
}

/**
* A mechanism for executing support code before executing a blocking action.
*/
override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = {
prepareBlocking()
thunk
}

private[this] def init(newIdx: Int): Unit = {
_index = newIdx
queue = pool.localQueues(newIdx)
Expand Down
4 changes: 3 additions & 1 deletion core/shared/src/main/scala/cats/effect/IOFiber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -991,10 +991,12 @@ private final class IOFiber[A](
if (ec.isInstanceOf[WorkStealingThreadPool]) {
val wstp = ec.asInstanceOf[WorkStealingThreadPool]
if (wstp.canExecuteBlockingCode()) {
wstp.prepareBlocking()

var error: Throwable = null
val r =
try {
scala.concurrent.blocking(cur.thunk())
cur.thunk()
} catch {
case t if NonFatal(t) =>
error = t
Expand Down
Loading