Skip to content

Commit

Permalink
Fix race condition in updateStats()
Browse files Browse the repository at this point in the history
  • Loading branch information
gregns1 committed Apr 24, 2024
1 parent cf36b3a commit a79f17d
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 16 deletions.
18 changes: 5 additions & 13 deletions db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ func (c *changeCache) updateStats(ctx context.Context) {
c.db.DbStats.Cache().PendingSeqLen.Set(int64(c.internalStats.pendingSeqLen))
c.db.DbStats.CBLReplicationPull().MaxPending.SetIfMax(int64(c.internalStats.maxPending))
c.db.DbStats.Cache().HighSeqStable.Set(int64(c._getMaxStableCached(ctx)))
c.db.DbStats.Cache().SkippedSeqLen.Set(int64(len(c.skippedSeqs.list)))
c.db.DbStats.Cache().SkippedSeqCap.Set(int64(cap(c.skippedSeqs.list)))
c.db.DbStats.Cache().NumSkippedSeqs.Set(c.skippedSeqs.NumCumulativeSkippedSequences)
c.db.DbStats.Cache().NumCurrentSeqsSkipped.Set(c.skippedSeqs.NumCurrentSkippedSequences)
c.db.DbStats.Cache().SkippedSeqLen.Set(c.skippedSeqs.getSliceLength())
c.db.DbStats.Cache().SkippedSeqCap.Set(c.skippedSeqs.getSliceCapacity())
c.db.DbStats.Cache().NumSkippedSeqs.Set(c.skippedSeqs.getCumulativeNumSkippedSequenceValue())
c.db.DbStats.Cache().NumCurrentSeqsSkipped.Set(c.skippedSeqs.getNumCurrentSkippedSequenceValue())

}

Expand Down Expand Up @@ -314,7 +314,6 @@ func (c *changeCache) CleanSkippedSequenceQueue(ctx context.Context) error {
}

c.db.DbStats.Cache().AbandonedSeqs.Add(compactedSequences)
c.skippedSeqs.NumCurrentSkippedSequences -= compactedSequences

base.InfofCtx(ctx, base.KeyCache, "CleanSkippedSequenceQueue complete. Cleaned %d sequences from skipped list for database %s.", compactedSequences, base.MD(c.db.Name))
return nil
Expand Down Expand Up @@ -878,9 +877,6 @@ func (h *LogPriorityQueue) Pop() interface{} {

func (c *changeCache) RemoveSkipped(x uint64) error {
err := c.skippedSeqs.removeSeq(x)
if err == nil {
c.skippedSeqs.NumCurrentSkippedSequences -= 1
}
return err
}

Expand All @@ -894,11 +890,7 @@ func (c *changeCache) WasSkipped(x uint64) bool {
}

func (c *changeCache) PushSkipped(ctx context.Context, startSeq uint64, endSeq uint64) {
newEntry := NewSkippedSequenceRangeEntry(startSeq, endSeq)
numSequences := newEntry.getNumSequencesInEntry()
c.skippedSeqs.NumCurrentSkippedSequences += numSequences
c.skippedSeqs.NumCumulativeSkippedSequences += numSequences
c.skippedSeqs.PushSkippedSequenceEntry(newEntry)
c.skippedSeqs.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(startSeq, endSeq))
}

// waitForSequence blocks up to maxWaitTime until the given sequence has been received.
Expand Down
3 changes: 0 additions & 3 deletions db/change_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2334,9 +2334,6 @@ func TestSkippedSequenceCompact(t *testing.T) {
}
_ = testChangeCache.processEntry(ctx, highEntry)

// update cache stats for assertions
testChangeCache.updateStats(ctx)

// assert this pushes an entry on the skipped sequence slice
require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, 1, len(testChangeCache.skippedSeqs.list))
Expand Down
38 changes: 38 additions & 0 deletions db/skipped_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ func (s *SkippedSequenceSlice) SkippedSequenceCompact(ctx context.Context, maxWa
}
// resize slice to reclaim memory if we need to
s._clip(ctx)
// decrement number of current skipped sequences by the number of sequences compacted
s.NumCurrentSkippedSequences -= numSequencesCompacted

return numSequencesCompacted
}

Expand Down Expand Up @@ -209,6 +212,8 @@ func (s *SkippedSequenceSlice) removeSeq(x uint64) error {
if !found {
return fmt.Errorf("sequence %d not found in the skipped list", x)
}
// if found we need to decrement the current num skipped sequences stat
s.NumCurrentSkippedSequences -= 1

// take the element at the index and handle cases required to removal of a sequence
rangeElem := s.list[index]
Expand Down Expand Up @@ -264,6 +269,11 @@ func (s *SkippedSequenceSlice) PushSkippedSequenceEntry(entry *SkippedSequenceLi
s.lock.Lock()
defer s.lock.Unlock()

// update num current skipped sequences count + the cumulative count of skipped sequences
numSequencesIncoming := entry.getNumSequencesInEntry()
s.NumCurrentSkippedSequences += numSequencesIncoming
s.NumCumulativeSkippedSequences += numSequencesIncoming

if len(s.list) == 0 {
s.list = append(s.list, entry)
return
Expand Down Expand Up @@ -291,3 +301,31 @@ func (s *SkippedSequenceSlice) getOldest() uint64 {
// grab fist element in slice and take the start seq of that range/single sequence
return s.list[0].getStartSeq()
}

// getSliceLength retrieves the current skipped sequence slice length
func (s *SkippedSequenceSlice) getSliceLength() int64 {
s.lock.RLock()
defer s.lock.RUnlock()
return int64(len(s.list))
}

// getSliceCapacity retrieves the current skipped sequence slice capacity
func (s *SkippedSequenceSlice) getSliceCapacity() int64 {
s.lock.RLock()
defer s.lock.RUnlock()
return int64(cap(s.list))
}

// getNumCurrentSkippedSequenceValue retrieves the current skipped sequence count
func (s *SkippedSequenceSlice) getNumCurrentSkippedSequenceValue() int64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.NumCurrentSkippedSequences
}

// getCumulativeNumSkippedSequenceValue retrieves the cumulative skipped sequence count
func (s *SkippedSequenceSlice) getCumulativeNumSkippedSequenceValue() int64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.NumCumulativeSkippedSequences
}

0 comments on commit a79f17d

Please sign in to comment.