Skip to content

Commit

Permalink
server: set multiple concurrentReadTx instances share one txReadBuffer.
Browse files Browse the repository at this point in the history
  • Loading branch information
wilsonwang371 committed May 24, 2021
1 parent 46b49a6 commit 9c82e8c
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 4 deletions.
80 changes: 78 additions & 2 deletions server/mvcc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ type Snapshot interface {
Close() error
}

type txReadBufferCache struct {
mu sync.Mutex
buf *txReadBuffer
bufVersion uint64
}

type backend struct {
// size and commits are used with atomic operations so they must be
// 64-bit aligned, otherwise 32-bit tests will crash
Expand All @@ -102,6 +108,11 @@ type backend struct {
batchTx *batchTxBuffered

readTx *readTx
// txReadBufferCache mirrors "txReadBuffer" within "readTx" -- readTx.baseReadTx.buf.
// When creating "concurrentReadTx":
// - if the cache is up-to-date, "readTx.baseReadTx.buf" copy can be skipped
// - if the cache is empty or outdated, "readTx.baseReadTx.buf" copy is required
txReadBufferCache txReadBufferCache

stopc chan struct{}
donec chan struct{}
Expand Down Expand Up @@ -183,19 +194,26 @@ func newBackend(bcfg BackendConfig) *backend {
readTx: &readTx{
baseReadTx: baseReadTx{
buf: txReadBuffer{
txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
bufVersion: 0,
},
buckets: make(map[BucketID]*bolt.Bucket),
txWg: new(sync.WaitGroup),
txMu: new(sync.RWMutex),
},
},
txReadBufferCache: txReadBufferCache{
mu: sync.Mutex{},
bufVersion: 0,
buf: nil,
},

stopc: make(chan struct{}),
donec: make(chan struct{}),

lg: bcfg.Logger,
}

b.batchTx = newBatchTxBuffered(b)
// We set it after newBatchTxBuffered to skip the 'empty' commit.
b.hooks = bcfg.Hooks
Expand All @@ -221,10 +239,68 @@ func (b *backend) ConcurrentReadTx() ReadTx {
defer b.readTx.RUnlock()
// prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock().
b.readTx.txWg.Add(1)

// TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.

// inspect/update cache recency iff there's no ongoing update to the cache
// this falls through if there's no cache update

// by this line, "ConcurrentReadTx" code path is already protected against concurrent "writeback" operations
// which requires write lock to update "readTx.baseReadTx.buf".
// Which means setting "buf *txReadBuffer" with "readTx.buf.unsafeCopy()" is guaranteed to be up-to-date,
// whereas "txReadBufferCache.buf" may be stale from concurrent "writeback" operations.
// We only update "txReadBufferCache.buf" if we know "buf *txReadBuffer" is up-to-date.
// The update to "txReadBufferCache.buf" will benefit the following "ConcurrentReadTx" creation
// by avoiding copying "readTx.baseReadTx.buf".
b.txReadBufferCache.mu.Lock()

curCache := b.txReadBufferCache.buf
curCacheVer := b.txReadBufferCache.bufVersion
curBufVer := b.readTx.buf.bufVersion

isEmptyCache := curCache == nil
isStaleCache := curCacheVer != curBufVer

var buf *txReadBuffer
switch {
case isEmptyCache:
// perform safe copy of buffer while holding "b.txReadBufferCache.mu.Lock"
// this is only supposed to run once so there won't be much overhead
curBuf := b.readTx.buf.unsafeCopy()
buf = &curBuf
case isStaleCache:
// to maximize the concurrency, try unsafe copy of buffer
// release the lock while copying buffer -- cache may become stale again and
// get overwritten by someone else.
// therefore, we need to check the readTx buffer version again
b.txReadBufferCache.mu.Unlock()
curBuf := b.readTx.buf.unsafeCopy()
b.txReadBufferCache.mu.Lock()
buf = &curBuf
default:
// neither empty nor stale cache, just use the current buffer
buf = curCache
}
// txReadBufferCache.bufVersion can be modified when we doing an unsafeCopy()
// as a result, curCacheVer could be no longer the same as
// txReadBufferCache.bufVersion
// if !isEmptyCache && curCacheVer != b.txReadBufferCache.bufVersion
// then the cache became stale while copying "readTx.baseReadTx.buf".
// It is safe to not update "txReadBufferCache.buf", because the next following
// "ConcurrentReadTx" creation will trigger a new "readTx.baseReadTx.buf" copy
// and "buf" is still used for the current "concurrentReadTx.baseReadTx.buf".
if isEmptyCache || curCacheVer == b.txReadBufferCache.bufVersion {
// continue if the cache is never set or no one has modified the cache
b.txReadBufferCache.buf = buf
b.txReadBufferCache.bufVersion = curBufVer
}

b.txReadBufferCache.mu.Unlock()

// concurrentReadTx is not supposed to write to its txReadBuffer
return &concurrentReadTx{
baseReadTx: baseReadTx{
buf: b.readTx.buf.unsafeCopy(),
buf: *buf,
txMu: b.readTx.txMu,
tx: b.readTx.tx,
buckets: b.readTx.buckets,
Expand Down
13 changes: 11 additions & 2 deletions server/mvcc/backend/tx_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"sort"
)

const bucketBufferInitialSize = 512

// txBuffer handles functionality shared between txWriteBuffer and txReadBuffer.
type txBuffer struct {
buckets map[BucketID]*bucketBuffer
Expand Down Expand Up @@ -88,10 +90,16 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
rb.merge(wb)
}
txw.reset()
// increase the buffer version
txr.bufVersion++
}

// txReadBuffer accesses buffered updates.
type txReadBuffer struct{ txBuffer }
type txReadBuffer struct {
txBuffer
// bufVersion is used to check if the buffer is modified recently
bufVersion uint64
}

func (txr *txReadBuffer) Range(bucket Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
if b := txr.buckets[bucket.ID()]; b != nil {
Expand All @@ -113,6 +121,7 @@ func (txr *txReadBuffer) unsafeCopy() txReadBuffer {
txBuffer: txBuffer{
buckets: make(map[BucketID]*bucketBuffer, len(txr.txBuffer.buckets)),
},
bufVersion: 0,
}
for bucketName, bucket := range txr.txBuffer.buckets {
txrCopy.txBuffer.buckets[bucketName] = bucket.Copy()
Expand All @@ -133,7 +142,7 @@ type bucketBuffer struct {
}

func newBucketBuffer() *bucketBuffer {
return &bucketBuffer{buf: make([]kv, 512), used: 0}
return &bucketBuffer{buf: make([]kv, bucketBufferInitialSize), used: 0}
}

func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
Expand Down

0 comments on commit 9c82e8c

Please sign in to comment.