diff --git a/rbmutex.go b/rbmutex.go index 0cc9e49..868b114 100644 --- a/rbmutex.go +++ b/rbmutex.go @@ -64,22 +64,41 @@ func NewRBMutex() *RBMutex { return &mu } -// TryRLock tries to retrieve a reader lock token via -// the fast path or gives up returning nil. -func (mu *RBMutex) TryRLock() *RToken { - if _, mux := mu.fastTryRlock(); mux != nil { - return mux +// TryRLock tries to lock m for reading without blocking. +// When TryRLock succeeds, it returns true and a reader token. +// In case of a failure, a false is returned. +func (mu *RBMutex) TryRLock() (bool, *RToken) { + if t := mu.fastRlock(); t != nil { + return true, t } - return nil + // Optimistic slow path. + if mu.rw.TryRLock() { + if atomic.LoadInt32(&mu.rbias) == 0 && time.Now().After(mu.inhibitUntil) { + atomic.StoreInt32(&mu.rbias, 1) + } + return true, nil + } + return false, nil } -// TryLock tries to acquire a write lock without blocking, -// exposing the underlying TryLock method of sync.RWMutex. -func (mu *RBMutex) TryLock() bool { - return mu.rw.TryLock() +// RLock locks m for reading and returns a reader token. The +// token must be used in the later RUnlock call. +// +// Should not be used for recursive read locking; a blocked Lock +// call excludes new readers from acquiring the lock. +func (mu *RBMutex) RLock() *RToken { + if t := mu.fastRlock(); t != nil { + return t + } + // Slow path. + mu.rw.RLock() + if atomic.LoadInt32(&mu.rbias) == 0 && time.Now().After(mu.inhibitUntil) { + atomic.StoreInt32(&mu.rbias, 1) + } + return nil } -func (mu *RBMutex) fastTryRlock() (*rslot, *RToken) { +func (mu *RBMutex) fastRlock() *RToken { if atomic.LoadInt32(&mu.rbias) == 1 { t, ok := rtokenPool.Get().(*RToken) if !ok { @@ -95,36 +114,16 @@ func (mu *RBMutex) fastTryRlock() (*rslot, *RToken) { if atomic.LoadInt32(&mu.rbias) == 1 { // Hot path succeeded. t.slot = slot - return rslot, t + return t } + // The mutex is no longer reader biased. Roll back. + atomic.AddInt32(&rslot.mu, -1) rtokenPool.Put(t) - return rslot, nil + return nil } // Contention detected. Give a try with the next slot. } } - return nil, nil -} - -// RLock locks m for reading and returns a reader token. The -// token must be used in the later RUnlock call. -// -// Should not be used for recursive read locking; a blocked Lock -// call excludes new readers from acquiring the lock. -func (mu *RBMutex) RLock() *RToken { - rslot, r := mu.fastTryRlock() - if r != nil { - return r - } - if rslot != nil { - // The mutex is no longer reader biased. Go to the slow path. - atomic.AddInt32(&rslot.mu, -1) - } - // Slow path. - mu.rw.RLock() - if atomic.LoadInt32(&mu.rbias) == 0 && time.Now().After(mu.inhibitUntil) { - atomic.StoreInt32(&mu.rbias, 1) - } return nil } @@ -143,6 +142,26 @@ func (mu *RBMutex) RUnlock(t *RToken) { rtokenPool.Put(t) } +// TryLock tries to lock m for writing without blocking. +func (mu *RBMutex) TryLock() bool { + if mu.rw.TryLock() { + if atomic.LoadInt32(&mu.rbias) == 1 { + atomic.StoreInt32(&mu.rbias, 0) + for i := 0; i < len(mu.rslots); i++ { + if atomic.LoadInt32(&mu.rslots[i].mu) > 0 { + // There is a reader. Roll back. + atomic.StoreInt32(&mu.rbias, 1) + mu.rw.Unlock() + return false + } + } + mu.inhibitUntil = time.Now() + } + return true + } + return false +} + // Lock locks m for writing. If the lock is already locked for // reading or writing, Lock blocks until the lock is available. func (mu *RBMutex) Lock() { diff --git a/rbmutex_test.go b/rbmutex_test.go index 4808578..22e0eea 100644 --- a/rbmutex_test.go +++ b/rbmutex_test.go @@ -15,23 +15,55 @@ import ( ) func TestRBMutexSerialReader(t *testing.T) { - const numIters = 10 + const numCalls = 10 mu := NewRBMutex() - var rtokens [numIters]*RToken - for i := 0; i < numIters; i++ { - rtokens[i] = mu.RLock() + for i := 0; i < 3; i++ { + var rtokens [numCalls]*RToken + for j := 0; j < numCalls; j++ { + rtokens[j] = mu.RLock() + } + for j := 0; j < numCalls; j++ { + mu.RUnlock(rtokens[j]) + } + } +} +func TestRBMutexSerialOptimisticReader(t *testing.T) { + const numCalls = 10 + mu := NewRBMutex() + for i := 0; i < 3; i++ { + var rtokens [numCalls]*RToken + for j := 0; j < numCalls; j++ { + ok, rt := mu.TryRLock() + if !ok { + t.Fatalf("TryRLock failed for %d", j) + } + if rt == nil { + t.Fatalf("nil reader token for %d", j) + } + rtokens[j] = rt + } + for j := 0; j < numCalls; j++ { + mu.RUnlock(rtokens[j]) + } } - for i := 0; i < numIters; i++ { - mu.RUnlock(rtokens[i]) +} + +func TestRBMutexSerialOptimisticWriter(t *testing.T) { + mu := NewRBMutex() + for i := 0; i < 3; i++ { + if !mu.TryLock() { + t.Fatal("TryLock failed") + } + mu.Unlock() } } func parallelReader(mu *RBMutex, clocked, cunlock, cdone chan bool) { - tk := mu.RLock() + t := mu.RLock() clocked <- true <-cunlock - mu.RUnlock(tk) + mu.RUnlock(t) cdone <- true } @@ -66,16 +98,16 @@ func TestRBMutexParallelReaders(t *testing.T) { func reader(mu *RBMutex, numIterations int, activity *int32, cdone chan bool) { for i := 0; i < numIterations; i++ { - tk := mu.RLock() + t := mu.RLock() n := atomic.AddInt32(activity, 1) if n < 1 || n >= 10000 { - mu.RUnlock(tk) + mu.RUnlock(t) panic(fmt.Sprintf("rlock(%d)\n", n)) } for i := 0; i < 100; i++ { } atomic.AddInt32(activity, -1) - mu.RUnlock(tk) + mu.RUnlock(t) } cdone <- true } @@ -132,6 +164,112 @@ func TestRBMutex(t *testing.T) { hammerRBMutex(10, 5, n) } +func optimisticReader(mu *RBMutex, numIterations int, activity *int32, cdone chan bool) { + for i := 0; i < numIterations; i++ { + if ok, t := mu.TryRLock(); ok { + n := atomic.AddInt32(activity, 1) + if n < 1 || n >= 10000 { + mu.RUnlock(t) + panic(fmt.Sprintf("rlock(%d)\n", n)) + } + for i := 0; i < 100; i++ { + } + atomic.AddInt32(activity, -1) + mu.RUnlock(t) + } + } + cdone <- true +} + +func optimisticWriter(mu *RBMutex, numIterations int, activity *int32, cdone chan bool) { + for i := 0; i < numIterations; i++ { + if mu.TryLock() { + n := atomic.AddInt32(activity, 10000) + if n != 10000 { + mu.Unlock() + panic(fmt.Sprintf("wlock(%d)\n", n)) + } + for i := 0; i < 100; i++ { + } + atomic.AddInt32(activity, -10000) + mu.Unlock() + } + } + cdone <- true +} + +func hammerOptimisticRBMutex(gomaxprocs, numReaders, numIterations int) { + runtime.GOMAXPROCS(gomaxprocs) + // Number of active readers + 10000 * number of active writers. + var activity int32 + mu := NewRBMutex() + cdone := make(chan bool) + go optimisticWriter(mu, numIterations, &activity, cdone) + var i int + for i = 0; i < numReaders/2; i++ { + go optimisticReader(mu, numIterations, &activity, cdone) + } + go optimisticWriter(mu, numIterations, &activity, cdone) + for ; i < numReaders; i++ { + go optimisticReader(mu, numIterations, &activity, cdone) + } + // Wait for the 2 writers and all readers to finish. + for i := 0; i < 2+numReaders; i++ { + <-cdone + } +} + +func TestRBMutex_Optimistic(t *testing.T) { + const n = 1000 + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(0)) + hammerOptimisticRBMutex(1, 1, n) + hammerOptimisticRBMutex(1, 3, n) + hammerOptimisticRBMutex(1, 10, n) + hammerOptimisticRBMutex(4, 1, n) + hammerOptimisticRBMutex(4, 3, n) + hammerOptimisticRBMutex(4, 10, n) + hammerOptimisticRBMutex(10, 1, n) + hammerOptimisticRBMutex(10, 3, n) + hammerOptimisticRBMutex(10, 10, n) + hammerOptimisticRBMutex(10, 5, n) +} + +func hammerMixedRBMutex(gomaxprocs, numReaders, numIterations int) { + runtime.GOMAXPROCS(gomaxprocs) + // Number of active readers + 10000 * number of active writers. + var activity int32 + mu := NewRBMutex() + cdone := make(chan bool) + go writer(mu, numIterations, &activity, cdone) + var i int + for i = 0; i < numReaders/2; i++ { + go reader(mu, numIterations, &activity, cdone) + } + go optimisticWriter(mu, numIterations, &activity, cdone) + for ; i < numReaders; i++ { + go optimisticReader(mu, numIterations, &activity, cdone) + } + // Wait for the 2 writers and all readers to finish. + for i := 0; i < 2+numReaders; i++ { + <-cdone + } +} + +func TestRBMutex_Mixed(t *testing.T) { + const n = 1000 + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(0)) + hammerMixedRBMutex(1, 1, n) + hammerMixedRBMutex(1, 3, n) + hammerMixedRBMutex(1, 10, n) + hammerMixedRBMutex(4, 1, n) + hammerMixedRBMutex(4, 3, n) + hammerMixedRBMutex(4, 10, n) + hammerMixedRBMutex(10, 1, n) + hammerMixedRBMutex(10, 3, n) + hammerMixedRBMutex(10, 10, n) + hammerMixedRBMutex(10, 5, n) +} + func benchmarkRBMutex(b *testing.B, parallelism, localWork, writeRatio int) { mu := NewRBMutex() b.SetParallelism(parallelism)