Skip to content

Commit

Permalink
CBG-3854: Integration of the skipped sequence slice into the cache (#…
Browse files Browse the repository at this point in the history
…6789)

* CBG-3854: Integeration of the skipped sequence slice into the cache

* remove old chnages not needed anymore + gh actions failures

* updates based of review. Move stat update to updateStats and alter soem naming. Test updates to reflect stat chnages too.

* remove incorrect incrementing of num current sequences inside PushSkippedSequenceEntry

* Fix race condition in updateStats()

* updates off review

* tidy CleanSkippedSequenceQueue
  • Loading branch information
gregns1 authored Apr 26, 2024
1 parent f890acf commit 3ae3cf4
Show file tree
Hide file tree
Showing 5 changed files with 305 additions and 229 deletions.
18 changes: 16 additions & 2 deletions base/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ type CacheStats struct {
NonMobileIgnoredCount *SgwIntStat `json:"non_mobile_ignored_count"`
// The total number of active channels.
NumActiveChannels *SgwIntStat `json:"num_active_channels"`
// The total number of skipped sequences.
// The total number of skipped sequences. This is a cumulative value.
NumSkippedSeqs *SgwIntStat `json:"num_skipped_seqs"`
// The total number of pending sequences. These are out-of-sequence entries waiting to be cached.
PendingSeqLen *SgwIntStat `json:"pending_seq_len"`
Expand All @@ -431,8 +431,12 @@ type CacheStats struct {
RevisionCacheHits *SgwIntStat `json:"rev_cache_hits"`
// The total number of revision cache misses.
RevisionCacheMisses *SgwIntStat `json:"rev_cache_misses"`
// The current length of the pending skipped sequence queue.
// The current length of the pending skipped sequence slice.
SkippedSeqLen *SgwIntStat `json:"skipped_seq_len"`
// The current capacity of the skipped sequence slice
SkippedSeqCap *SgwIntStat `json:"skipped_seq_cap"`
// The number of sequences currently in the skipped sequence slice
NumCurrentSeqsSkipped *SgwIntStat `json:"current_skipped_seq_count"`
// The total view_queries.
ViewQueries *SgwIntStat `json:"view_queries"`
}
Expand Down Expand Up @@ -1327,6 +1331,14 @@ func (d *DbStats) initCacheStats() error {
if err != nil {
return err
}
resUtil.SkippedSeqCap, err = NewIntStat(SubsystemCacheKey, "skipped_seq_cap", StatUnitNoUnits, SkippedSeqCapDesc, StatAddedVersion3dot2dot0, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.GaugeValue, 0)
if err != nil {
return err
}
resUtil.NumCurrentSeqsSkipped, err = NewIntStat(SubsystemCacheKey, "current_skipped_seq_count", StatUnitNoUnits, NumCurrentSkippedSeq, StatAddedVersion3dot2dot0, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.CounterValue, 0)
if err != nil {
return err
}
resUtil.ViewQueries, err = NewIntStat(SubsystemCacheKey, "view_queries", StatUnitNoUnits, ViewQueriesDesc, StatAddedVersion3dot0dot0, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.CounterValue, 0)
if err != nil {
return err
Expand Down Expand Up @@ -1357,6 +1369,8 @@ func (d *DbStats) unregisterCacheStats() {
prometheus.Unregister(d.CacheStats.NonMobileIgnoredCount)
prometheus.Unregister(d.CacheStats.NumActiveChannels)
prometheus.Unregister(d.CacheStats.NumSkippedSeqs)
prometheus.Unregister(d.CacheStats.SkippedSeqCap)
prometheus.Unregister(d.CacheStats.NumCurrentSeqsSkipped)
prometheus.Unregister(d.CacheStats.PendingSeqLen)
prometheus.Unregister(d.CacheStats.RevisionCacheBypass)
prometheus.Unregister(d.CacheStats.RevisionCacheHits)
Expand Down
8 changes: 6 additions & 2 deletions base/stats_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ const (

NumActiveChannelsDesc = "The total number of active channels."

NumSkippedSeqsDesc = "The total number of skipped sequences."
NumSkippedSeqsDesc = "The total number of skipped sequences. This is a cumulative value"

PendingSeqLengthDesc = "The total number of pending sequences. These are out-of-sequence entries waiting to be cached."

Expand All @@ -127,7 +127,11 @@ const (
RevCacheMissesDesc = "The total number of revision cache misses. This metric can be used to calculate the ratio of revision cache misses: " +
"Rev Cache Miss Ratio = rev_cache_misses / (rev_cache_hits + rev_cache_misses)"

SkippedSeqLengthDesc = "The current length of the pending skipped sequence queue."
SkippedSeqLengthDesc = "The current length of the pending skipped sequence slice."

SkippedSeqCapDesc = "The current capacity of the skipped sequence slice."

NumCurrentSkippedSeq = "The number of sequences currently in the skipped sequence slice."

ViewQueriesDesc = "The total view_queries."

Expand Down
184 changes: 28 additions & 156 deletions db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package db

import (
"container/heap"
"container/list"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -59,7 +58,7 @@ type changeCache struct {
notifyChange func(context.Context, channels.Set) // Client callback that notifies of channel changes
started base.AtomicBool // Set by the Start method
stopped base.AtomicBool // Set by the Stop method
skippedSeqs *SkippedSequenceList // Skipped sequences still pending on the TAP feed
skippedSeqs *SkippedSequenceSlice // Skipped sequences still pending on the DCP caching feed
lock sync.RWMutex // Coordinates access to struct fields
options CacheOptions // Cache config
terminator chan bool // Signal termination of background goroutines
Expand All @@ -86,11 +85,17 @@ func (c *changeCache) updateStats(ctx context.Context) {
if c.db == nil {
return
}
// grab skipped sequence stats
skippedSequenceListStats := c.skippedSeqs.getStats()

c.db.DbStats.Database().HighSeqFeed.SetIfMax(int64(c.internalStats.highSeqFeed))
c.db.DbStats.Cache().PendingSeqLen.Set(int64(c.internalStats.pendingSeqLen))
c.db.DbStats.CBLReplicationPull().MaxPending.SetIfMax(int64(c.internalStats.maxPending))
c.db.DbStats.Cache().HighSeqStable.Set(int64(c._getMaxStableCached(ctx)))

c.db.DbStats.Cache().NumCurrentSeqsSkipped.Set(skippedSequenceListStats.NumCurrentSkippedSequencesStat)
c.db.DbStats.Cache().NumSkippedSeqs.Set(skippedSequenceListStats.NumCumulativeSkippedSequencesStat)
c.db.DbStats.Cache().SkippedSeqLen.Set(skippedSequenceListStats.ListLengthStat)
c.db.DbStats.Cache().SkippedSeqCap.Set(skippedSequenceListStats.ListCapacityStat)
}

type LogEntry channels.LogEntry
Expand Down Expand Up @@ -125,11 +130,6 @@ type LogEntries []*LogEntry
// A priority-queue of LogEntries, kept ordered by increasing sequence #.
type LogPriorityQueue []*LogEntry

type SkippedSequence struct {
seq uint64
timeAdded time.Time
}

type CacheOptions struct {
ChannelCacheOptions
CachePendingSeqMaxWait time.Duration // Max wait for pending sequence before skipping
Expand Down Expand Up @@ -170,7 +170,7 @@ func (c *changeCache) Init(ctx context.Context, dbContext *DatabaseContext, chan
c.receivedSeqs = make(map[uint64]struct{})
c.terminator = make(chan bool)
c.initTime = time.Now()
c.skippedSeqs = NewSkippedSequenceList()
c.skippedSeqs = NewSkippedSequenceSlice(DefaultClipCapacityHeadroom)
c.lastAddPendingTime = time.Now().UnixNano()
c.sgCfgPrefix = dbContext.MetadataKeys.SGCfgPrefix(c.db.Options.GroupID)
c.metaKeys = metaKeys
Expand Down Expand Up @@ -300,23 +300,20 @@ func (c *changeCache) InsertPendingEntries(ctx context.Context) error {

// Cleanup function, invoked periodically.
// Removes skipped entries from skippedSeqs that have been waiting longer
// than MaxChannelLogMissingWaitTime from the queue. Attempts view retrieval
// prior to removal. Only locks skipped sequence queue to build the initial set (GetSkippedSequencesOlderThanMaxWait)
// and subsequent removal (RemoveSkipped).
// than CacheSkippedSeqMaxWait from the slice.
func (c *changeCache) CleanSkippedSequenceQueue(ctx context.Context) error {

oldSkippedSequences := c.GetSkippedSequencesOlderThanMaxWait()
if len(oldSkippedSequences) == 0 {
base.InfofCtx(ctx, base.KeyCache, "Starting CleanSkippedSequenceQueue for database %s", base.MD(c.db.Name))

compactedSequences := c.skippedSeqs.SkippedSequenceCompact(ctx, int64(c.options.CacheSkippedSeqMaxWait.Seconds()))
if compactedSequences == 0 {
base.InfofCtx(ctx, base.KeyCache, "CleanSkippedSequenceQueue complete. No sequences to be compacted from skipped sequence list for database %s.", base.MD(c.db.Name))
return nil
}

base.InfofCtx(ctx, base.KeyCache, "Starting CleanSkippedSequenceQueue, found %d skipped sequences older than max wait for database %s", len(oldSkippedSequences), base.MD(c.db.Name))

// Purge sequences not found from the skipped sequence queue
numRemoved := c.RemoveSkippedSequences(ctx, oldSkippedSequences)
c.db.DbStats.Cache().AbandonedSeqs.Add(numRemoved)
c.db.DbStats.Cache().AbandonedSeqs.Add(compactedSequences)

base.InfofCtx(ctx, base.KeyCache, "CleanSkippedSequenceQueue complete. Not Found:%d for database %s.", len(oldSkippedSequences), base.MD(c.db.Name))
base.InfofCtx(ctx, base.KeyCache, "CleanSkippedSequenceQueue complete. Cleaned %d sequences from skipped list for database %s.", compactedSequences, base.MD(c.db.Name))
return nil
}

Expand Down Expand Up @@ -781,15 +778,15 @@ func (c *changeCache) _addPendingLogs(ctx context.Context) channels.Set {
var changedChannels channels.Set

for len(c.pendingLogs) > 0 {
change := c.pendingLogs[0]
isNext := change.Sequence == c.nextSequence
oldestPending := c.pendingLogs[0]
isNext := oldestPending.Sequence == c.nextSequence
if isNext {
heap.Pop(&c.pendingLogs)
changedChannels = changedChannels.UpdateWithSlice(c._addToCache(ctx, change))
changedChannels = changedChannels.UpdateWithSlice(c._addToCache(ctx, oldestPending))
} else if len(c.pendingLogs) > c.options.CachePendingSeqMaxNum || time.Since(c.pendingLogs[0].TimeReceived) >= c.options.CachePendingSeqMaxWait {
c.db.DbStats.Cache().NumSkippedSeqs.Add(1)
c.PushSkipped(ctx, c.nextSequence)
c.nextSequence++
// Skip all sequences up to the oldest Pending
c.PushSkipped(ctx, c.nextSequence, oldestPending.Sequence-1)
c.nextSequence = oldestPending.Sequence
} else {
break
}
Expand Down Expand Up @@ -878,33 +875,21 @@ func (h *LogPriorityQueue) Pop() interface{} {
// ////// SKIPPED SEQUENCE QUEUE

func (c *changeCache) RemoveSkipped(x uint64) error {
err := c.skippedSeqs.Remove(x)
c.db.DbStats.Cache().SkippedSeqLen.Set(int64(c.skippedSeqs.skippedList.Len()))
err := c.skippedSeqs.removeSeq(x)
return err
}

// Removes a set of sequences. Logs warning on removal error, returns count of successfully removed.
func (c *changeCache) RemoveSkippedSequences(ctx context.Context, sequences []uint64) (removedCount int64) {
numRemoved := c.skippedSeqs.RemoveSequences(ctx, sequences)
c.db.DbStats.Cache().SkippedSeqLen.Set(int64(c.skippedSeqs.skippedList.Len()))
return numRemoved
func (c *changeCache) RemoveSkippedSequences(ctx context.Context, sequences []uint64) {
// this can be used for removal or ranges, pending CBG-3855
}

func (c *changeCache) WasSkipped(x uint64) bool {
return c.skippedSeqs.Contains(x)
}

func (c *changeCache) PushSkipped(ctx context.Context, sequence uint64) {
err := c.skippedSeqs.Push(&SkippedSequence{seq: sequence, timeAdded: time.Now()})
if err != nil {
base.InfofCtx(ctx, base.KeyCache, "Error pushing skipped sequence: %d, %v", sequence, err)
return
}
c.db.DbStats.Cache().SkippedSeqLen.Set(int64(c.skippedSeqs.skippedList.Len()))
}

func (c *changeCache) GetSkippedSequencesOlderThanMaxWait() (oldSequences []uint64) {
return c.skippedSeqs.getOlderThan(c.options.CacheSkippedSeqMaxWait)
func (c *changeCache) PushSkipped(ctx context.Context, startSeq uint64, endSeq uint64) {
c.skippedSeqs.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(startSeq, endSeq))
}

// waitForSequence blocks up to maxWaitTime until the given sequence has been received.
Expand Down Expand Up @@ -957,116 +942,3 @@ func (c *changeCache) _getMaxStableCached(ctx context.Context) uint64 {
}
return c.nextSequence - 1
}

// SkippedSequenceList stores the set of skipped sequences as an ordered list of *SkippedSequence with an associated map
// for sequence-based lookup.
type SkippedSequenceList struct {
skippedList *list.List // Ordered list of skipped sequences
skippedMap map[uint64]*list.Element // Map from sequence to list elements
lock sync.RWMutex // Coordinates access to skippedSequenceList
}

func NewSkippedSequenceList() *SkippedSequenceList {
return &SkippedSequenceList{
skippedMap: map[uint64]*list.Element{},
skippedList: list.New(),
}
}

// getOldest returns the sequence of the first element in the skippedSequenceList
func (l *SkippedSequenceList) getOldest() (oldestSkippedSeq uint64) {
l.lock.RLock()
if firstElement := l.skippedList.Front(); firstElement != nil {
value := firstElement.Value.(*SkippedSequence)
oldestSkippedSeq = value.seq
}
l.lock.RUnlock()
return oldestSkippedSeq
}

// Removes a single entry from the list.
func (l *SkippedSequenceList) Remove(x uint64) error {
l.lock.Lock()
err := l._remove(x)
l.lock.Unlock()
return err
}

func (l *SkippedSequenceList) RemoveSequences(ctx context.Context, sequences []uint64) (removedCount int64) {
l.lock.Lock()
for _, seq := range sequences {
err := l._remove(seq)
if err != nil {
base.WarnfCtx(ctx, "Error purging skipped sequence %d from skipped sequence queue: %v", seq, err)
} else {
removedCount++
}
}
l.lock.Unlock()
return removedCount
}

// Removes an entry from the list. Expects callers to hold l.lock.Lock
func (l *SkippedSequenceList) _remove(x uint64) error {
if listElement, ok := l.skippedMap[x]; ok {
l.skippedList.Remove(listElement)
delete(l.skippedMap, x)
return nil
} else {
return errors.New("Value not found")
}
}

// Contains does a simple search to detect presence
func (l *SkippedSequenceList) Contains(x uint64) bool {
l.lock.RLock()
_, ok := l.skippedMap[x]
l.lock.RUnlock()
return ok
}

// Push sequence to the end of SkippedSequenceList. Validates sequence ordering in list.
func (l *SkippedSequenceList) Push(x *SkippedSequence) (err error) {

l.lock.Lock()
validPush := false
lastElement := l.skippedList.Back()
if lastElement == nil {
validPush = true
} else {
lastSkipped, _ := lastElement.Value.(*SkippedSequence)
if lastSkipped.seq < x.seq {
validPush = true
}
}

if validPush {
newListElement := l.skippedList.PushBack(x)
l.skippedMap[x.seq] = newListElement
} else {
err = errors.New("Can't push sequence lower than existing maximum")
}

l.lock.Unlock()
return err

}

// getOldest returns a slice of sequences older than the specified duration of the first element in the skippedSequenceList
func (l *SkippedSequenceList) getOlderThan(skippedExpiry time.Duration) []uint64 {

l.lock.RLock()
oldSequences := make([]uint64, 0)
for e := l.skippedList.Front(); e != nil; e = e.Next() {
skippedSeq := e.Value.(*SkippedSequence)
if time.Since(skippedSeq.timeAdded) > skippedExpiry {
oldSequences = append(oldSequences, skippedSeq.seq)
} else {
// skippedSeqs are ordered by arrival time, so can stop iterating once we find one
// still inside the time window
break
}
}
l.lock.RUnlock()
return oldSequences
}
Loading

0 comments on commit 3ae3cf4

Please sign in to comment.