Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execute IO.blocking on WSTP without BlockContext indirection #3903

Merged
merged 9 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -485,14 +485,16 @@ private[effect] final class WorkStealingThreadPool(
}

/**
* Checks if the blocking code can be executed in the current context (only returns true for
* worker threads that belong to this execution context).
* Checks if the blocking code can be executed in the current context and, if so, prepares it
* for blocking. Only returns true for worker threads that belong to this execution context.
*/
private[effect] def canExecuteBlockingCode(): Boolean = {
val thread = Thread.currentThread()
if (thread.isInstanceOf[WorkerThread]) {
val worker = thread.asInstanceOf[WorkerThread]
worker.canExecuteBlockingCodeOn(this)
if (worker.canExecuteBlockingCodeOn(this))
worker.prepareBlocking()
true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this logic up into IOFiber instead? I don't think there's any particular reason for it to be here, and you're making something that looks like a boolean check into a side-effecting method.

Copy link
Member Author

@armanbilge armanbilge Nov 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well conceptually I liked that if canExecuteBlockingCode() returns true then you can run blocking code, immediately.

I do see your point. We can expose this as a separate method which IOFiber calls. This requires:

  1. Doing the whole Thread.currentThread() dance a second time. How should we handle the case where the current thread is not a WorkerThread, or does not belong to the current pool? Do we care about those cases?

  2. Also exposing whatever new API we invent on the JS WorkStealingThreadPool shim. Not a big deal, just annoying.

Worth it?

Copy link
Member Author

@armanbilge armanbilge Nov 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh lol, also I just realized this is bugged as-written 😅 fixed in c2ef8bc.

} else {
false
}
Expand Down
22 changes: 13 additions & 9 deletions core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ private final class WorkerThread(
}

/**
* A mechanism for executing support code before executing a blocking action.
* Support code that must be run before executing a blocking action on this thread.
*
* The current thread creates a replacement worker thread (or reuses a cached one) that will
* take its place in the pool and does a complete transfer of ownership of the data structures
Expand All @@ -797,16 +797,14 @@ private final class WorkerThread(
* There is no reason to enclose any code in a `try/catch` block because the only way this
* code path can be exercised is through `IO.delay`, which already handles exceptions.
*/
override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = {
def prepareBlocking(): Unit = {
val rnd = random

pool.notifyParked(rnd)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated, but I wonder why we invoke notifyParked() unconditionally, even if this thread is already blocking = true.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I'm guessing there's no reason and we can probably guard that notification as you say.


if (blocking) {
// This `WorkerThread` is already inside an enclosing blocking region.
// There is no need to spawn another `WorkerThread`. Instead, directly
// execute the blocking action.
thunk
// This `WorkerThread` has already been prepared for blocking.
// There is no need to spawn another `WorkerThread`.
} else {
// Spawn a new `WorkerThread` to take the place of this thread, as the
// current thread prepares to execute a blocking action.
Expand All @@ -819,7 +817,7 @@ private final class WorkerThread(
cedeBypass = null
}

// Logically enter the blocking region.
// Logically become a blocking thread.
blocking = true

val prefix = pool.blockerThreadPrefix
Expand Down Expand Up @@ -853,11 +851,17 @@ private final class WorkerThread(
pool.blockedWorkerThreadCounter.incrementAndGet()
clone.start()
}

thunk
}
}

/**
* A mechanism for executing support code before executing a blocking action.
*/
override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = {
prepareBlocking()
thunk
}

private[this] def init(newIdx: Int): Unit = {
_index = newIdx
queue = pool.localQueues(newIdx)
Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/cats/effect/IOFiber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,7 @@ private final class IOFiber[A](
var error: Throwable = null
val r =
try {
scala.concurrent.blocking(cur.thunk())
cur.thunk()
} catch {
case t if NonFatal(t) =>
error = t
Expand Down
Loading