-
Notifications
You must be signed in to change notification settings - Fork 521
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rename worker threads in blocking regions #3012
Conversation
"rename itself when entering and exiting blocking region" in real { | ||
for { | ||
computeThreadName <- getThreadName | ||
blockerThreadName <- blocking { getThreadName } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it correctly understood that this works the same as using IO.blocking
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure, so Vasil will have to comment. But my understanding was the IO.blocking
calls can be shifted to an existing blocking thread if available. However because blocking { }
is synchronous, it forces the current thread to become a blocking thread. So possibly not the same.
Also the semantics here seem a bit strange, maybe you actually want IO(blocking(Thread.currentThread().getName()))
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not test scala.concurrent.blocking
and I would advise against using it to wrap effects.
The more correct usage is to wrap effectful code directly, like Arman suggested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure, so Vasil will have to comment. But my understanding was the IO.blocking calls can be shifted to an existing blocking thread if available. However because blocking { } is synchronous, it forces the current thread to become a blocking thread. So possibly not the same.
IO.blocking
calls are either:
- Rewritten as
IO(scala.concurrent.blocking(thunk))
and then executed in place, by making the current compute thread become a blocking thread and replacing it with a cached/new thread. - Shifted completely to the
blocking
pool and then shifted back to thecompute
pool.
Cached threads that blocked at one time can be used as replacements for compute threads again, but not directly as blocking threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cached threads that blocked at one time can be used as replacements for compute threads again, but not directly as blocking threads.
Oh interesting, at the risk of going on a tangent why is that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing it to IO.blocking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh interesting, at the risk of going on a tangent why is that?
If you shift a fiber to a dedicated blocking thread (regardless of whether it's cached or spawned just now) you incur 2 context switches (one by switching to the blocking thread and one by switching back to the compute pool). Runnin the blocking code in place and replacing the thread only incurs 1 context switch, when you go back to the compute pool. This is demonstrably quicker on benchmarks, something CE was criticized for (inefficiency of blocking code execution) now it benches the same as the best of them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"rename itself when entering and exiting blocking region" in real { | ||
for { | ||
computeThreadName <- getThreadName | ||
blockerThreadName <- blocking { getThreadName } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure, so Vasil will have to comment. But my understanding was the IO.blocking
calls can be shifted to an existing blocking thread if available. However because blocking { }
is synchronous, it forces the current thread to become a blocking thread. So possibly not the same.
Also the semantics here seem a bit strange, maybe you actually want IO(blocking(Thread.currentThread().getName()))
?
_ <- IO.cede | ||
resetComputeThreadName <- getThreadName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my understanding, this doesn't check if the thread name is reset. All it's checking is that the fiber is now running on a compute thread, which is not the same thing.
It seems to me all of your assertions should check that the underlying thread is the same (by reference equality) so that we are in fact seeing name changes (and not simply thread jumps).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. My first test that didn't work, without the cede
, had the same thread id for all three fibers. Now the first two are the same and correctly changes name, but the last one is on a different thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed my debug println (😳) and updated the test.
What it does now is
- create a runtime with a single thread threadpool,
- check that the compute thread is correctly renamed when entering a blocking region,
- then force two threads to become blocking,
- thus forcing the previously blocking thread to be readded to the compute pool
- and verify that the thread has been renamed again
It works, but feels iffy.
Oh also, this PR can probably be targeted at 3.3.x, so we can release it sooner :) |
b9229b9
to
667dffa
Compare
I guess that involves rebasing on series/3.3.x and changing the target branch here, right? Just confirming before I force push. |
667dffa
to
649aa49
Compare
/poke @armanbilge |
I would like to review this PR too. Please give me a few days. Sorry for the delay. |
Sorry, missed your message! Yes, you should rebase and force push. |
649aa49
to
df35555
Compare
It should be ready now, targeting |
Holding the merge on this until @vasilmkd approves. |
@vasilmkd Any availability to review within the next day or so? I'd like to get this merged. All good if you're not able to get to it. |
I will review it tomorrow, I don't have a computer at the moment. I'm sorry for the delay. 😄 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work, thanks for chasing this down, and sorry for the delay.
This adds a new parameter to the
WorkStealingThreadPool
constructor, but it looks like the constructor is private to the project.Closes #3010