Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Even faster async mutex #3409

Merged
merged 10 commits into from
Feb 14, 2023
Merged

Conversation

armanbilge
Copy link
Member

This PR tries my idea from #3346 (comment) of re-using the UnsafeUnbounded datastructure from the async queue to implement the async mutex. /cc @BalmungSan

this PR

[info] Benchmark                           (fibers)  (iterations)   Mode  Cnt    Score   Error  Units
[info] MutexBenchmark.cancellationAsync          10          1000  thrpt   20   15.434 ± 1.058  ops/s
[info] MutexBenchmark.cancellationAsync         100          1000  thrpt   20    3.203 ± 0.464  ops/s
[info] MutexBenchmark.happyPathAsync             10          1000  thrpt   20  145.271 ± 5.933  ops/s
[info] MutexBenchmark.happyPathAsync            100          1000  thrpt   20   13.297 ± 1.656  ops/s
[info] MutexBenchmark.highContentionAsync        10          1000  thrpt   20   22.955 ± 2.932  ops/s
[info] MutexBenchmark.highContentionAsync       100          1000  thrpt   20    2.953 ± 0.479  ops/s

series/3.x

[info] Benchmark                           (fibers)  (iterations)   Mode  Cnt    Score   Error  Units
[info] MutexBenchmark.cancellationAsync          10          1000  thrpt   20   14.627 ± 1.514  ops/s
[info] MutexBenchmark.cancellationAsync         100          1000  thrpt   20    3.234 ± 0.658  ops/s
[info] MutexBenchmark.happyPathAsync             10          1000  thrpt   20  124.522 ± 7.762  ops/s
[info] MutexBenchmark.happyPathAsync            100          1000  thrpt   20   11.987 ± 0.379  ops/s
[info] MutexBenchmark.highContentionAsync        10          1000  thrpt   20   16.659 ± 2.072  ops/s
[info] MutexBenchmark.highContentionAsync       100          1000  thrpt   20    2.131 ± 0.127  ops/s

Comment on lines 21 to 22
private[std] val FailureSignal: Throwable = new RuntimeException
with scala.util.control.NoStackTrace
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of placing this in the package object we could also make a top-level object FailureSignal extends RuntimeException with NoStackTrace.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that either approach introduces a memory barrier unless we locally cache the value. If we locally cache the value, then they're the same. I would prefer the top-level object approach, tbh, but we really need to make sure it's pre-cached within scopes.


import java.util.concurrent.atomic.AtomicReference

private final class UnsafeUnbounded[A] {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't make any changes here, just copy-pasta.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, another advantage of pulling this out into a top-level class it is much easier to provide an specialized implementation for JS without Platform shenanigans.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally planned on pulling it out to the top-level but got lazy.

Comment on lines +129 to +136
} catch { // no waiter found
case FailureSignal =>
locked.set(false) // release
try {
var waiter = waiters.take()
while (waiter eq null) waiter = waiters.take()
waiter(RightFalse) // waken any new waiters
} catch {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's some fairness corruption here under contention, where an acquirer may cut-in-line of an acquirer that had placed itself in the queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the FIFO semantics of the current Mutex something we would like to preserve? If so, should we be louder about it on the docs?

BTW, does the Semaphore based one guarantee that as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall it's still FIFO (or should be, we should add a test if we don't have already). It's just under contention it can be slightly corrupted—but under contention, who was really "first" anyway?

I think this was the long-running debate Daniel and Fabio had for the async queue :)

mutex
.flatMap { m => m.lock.use_.replicateA_(fibers) }
.replicateA_(iterations)
.unsafeRunSync()
mutex.flatMap { m => m.lock.use_.replicateA_(fibers * iterations) }.unsafeRunSync()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what is the rationale for this change? And why only to the happy path?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I misunderstood the purpose of the benchmark, but we want to replicate many acquire/releases of the same mutex from the same fiber—we don't need to allocate a new mutex in each iteration, and "fibers" is not really accurate term. It's just iterations in the end.

Actually you are right, we can probably make a similar change to the other benchmarks.

F.asyncCheckAttempt[Unit] { thisCB =>
F.delay {
val previousCell = state.getAndSet(thisCell)
private[this] val locked = new AtomicBoolean(false)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we swap this AtomicBoolean for an AtomicInteger then do we basically have an AsyncSemaphore?

Copy link
Member

@djspiewak djspiewak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor change


import java.util.concurrent.atomic.AtomicReference

private final class UnsafeUnbounded[A] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally planned on pulling it out to the top-level but got lazy.

Comment on lines 21 to 22
private[std] val FailureSignal: Throwable = new RuntimeException
with scala.util.control.NoStackTrace
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that either approach introduces a memory barrier unless we locally cache the value. If we locally cache the value, then they're the same. I would prefer the top-level object approach, tbh, but we really need to make sure it's pre-cached within scopes.

@@ -99,6 +99,7 @@ object Mutex {

private[this] val locked = new AtomicBoolean(false)
private[this] val waiters = new UnsafeUnbounded[Either[Throwable, Boolean] => Unit]
private[this] val failureSignal = FailureSignal // prefetch
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super minor thing but I'd prefer if we shadow FailureSignal here by prefetching the fully-qualified name. This avoids the need for the backticks, and also prevents people from accidentally introducing memory barriers by referring to the non-prefetched thing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I thought about this as well. I was more afraid that I would forget to prefetch it, unless I always referred to it under a name only available as a prefetch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well now this way your only option is to prefetch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you remembered to shadow it in the first place 😆 if the shadow doesn't exist in your current scope, you have no way of knowing that you are going through the memory barrier.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we introduce new scopes less often than we introduce new code in scopes. :-P

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants