Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-3854: Integration of the skipped sequence slice into the cache #6789

Merged
merged 7 commits into from
Apr 26, 2024
Merged
18 changes: 16 additions & 2 deletions base/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ type CacheStats struct {
NonMobileIgnoredCount *SgwIntStat `json:"non_mobile_ignored_count"`
// The total number of active channels.
NumActiveChannels *SgwIntStat `json:"num_active_channels"`
// The total number of skipped sequences.
// The total number of skipped sequences. This is a cumulative value.
NumSkippedSeqs *SgwIntStat `json:"num_skipped_seqs"`
// The total number of pending sequences. These are out-of-sequence entries waiting to be cached.
PendingSeqLen *SgwIntStat `json:"pending_seq_len"`
Expand All @@ -431,8 +431,12 @@ type CacheStats struct {
RevisionCacheHits *SgwIntStat `json:"rev_cache_hits"`
// The total number of revision cache misses.
RevisionCacheMisses *SgwIntStat `json:"rev_cache_misses"`
// The current length of the pending skipped sequence queue.
// The current length of the pending skipped sequence slice.
SkippedSeqLen *SgwIntStat `json:"skipped_seq_len"`
// The current capacity of the skipped sequence slice
SkippedSeqCap *SgwIntStat `json:"skipped_seq_cap"`
// The number of sequences currently in the skipped sequence slice
NumCurrentSeqsSkipped *SgwIntStat `json:"current_skipped_seq_count"`
// The total view_queries.
ViewQueries *SgwIntStat `json:"view_queries"`
}
Expand Down Expand Up @@ -1327,6 +1331,14 @@ func (d *DbStats) initCacheStats() error {
if err != nil {
return err
}
resUtil.SkippedSeqCap, err = NewIntStat(SubsystemCacheKey, "skipped_seq_cap", StatUnitNoUnits, SkippedSeqCapDesc, StatAddedVersion3dot2dot0, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.GaugeValue, 0)
if err != nil {
return err
}
resUtil.NumCurrentSeqsSkipped, err = NewIntStat(SubsystemCacheKey, "current_skipped_seq_count", StatUnitNoUnits, NumCurrentSkippedSeq, StatAddedVersion3dot2dot0, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.CounterValue, 0)
if err != nil {
return err
}
resUtil.ViewQueries, err = NewIntStat(SubsystemCacheKey, "view_queries", StatUnitNoUnits, ViewQueriesDesc, StatAddedVersion3dot0dot0, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.CounterValue, 0)
if err != nil {
return err
Expand Down Expand Up @@ -1357,6 +1369,8 @@ func (d *DbStats) unregisterCacheStats() {
prometheus.Unregister(d.CacheStats.NonMobileIgnoredCount)
prometheus.Unregister(d.CacheStats.NumActiveChannels)
prometheus.Unregister(d.CacheStats.NumSkippedSeqs)
prometheus.Unregister(d.CacheStats.SkippedSeqCap)
prometheus.Unregister(d.CacheStats.NumCurrentSeqsSkipped)
prometheus.Unregister(d.CacheStats.PendingSeqLen)
prometheus.Unregister(d.CacheStats.RevisionCacheBypass)
prometheus.Unregister(d.CacheStats.RevisionCacheHits)
Expand Down
8 changes: 6 additions & 2 deletions base/stats_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ const (

NumActiveChannelsDesc = "The total number of active channels."

NumSkippedSeqsDesc = "The total number of skipped sequences."
NumSkippedSeqsDesc = "The total number of skipped sequences. This is a cumulative value"

PendingSeqLengthDesc = "The total number of pending sequences. These are out-of-sequence entries waiting to be cached."

Expand All @@ -127,7 +127,11 @@ const (
RevCacheMissesDesc = "The total number of revision cache misses. This metric can be used to calculate the ratio of revision cache misses: " +
"Rev Cache Miss Ratio = rev_cache_misses / (rev_cache_hits + rev_cache_misses)"

SkippedSeqLengthDesc = "The current length of the pending skipped sequence queue."
SkippedSeqLengthDesc = "The current length of the pending skipped sequence slice."

SkippedSeqCapDesc = "The current capacity of the skipped sequence slice."

NumCurrentSkippedSeq = "The number of sequences currently in the skipped sequence slice."

ViewQueriesDesc = "The total view_queries."

Expand Down
184 changes: 28 additions & 156 deletions db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package db

import (
"container/heap"
"container/list"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -59,7 +58,7 @@ type changeCache struct {
notifyChange func(context.Context, channels.Set) // Client callback that notifies of channel changes
started base.AtomicBool // Set by the Start method
stopped base.AtomicBool // Set by the Stop method
skippedSeqs *SkippedSequenceList // Skipped sequences still pending on the TAP feed
skippedSeqs *SkippedSequenceSlice // Skipped sequences still pending on the DCP caching feed
lock sync.RWMutex // Coordinates access to struct fields
options CacheOptions // Cache config
terminator chan bool // Signal termination of background goroutines
Expand All @@ -86,11 +85,17 @@ func (c *changeCache) updateStats(ctx context.Context) {
if c.db == nil {
return
}
// grab skipped sequence stats
skippedSequenceListStats := c.skippedSeqs.getStats()

c.db.DbStats.Database().HighSeqFeed.SetIfMax(int64(c.internalStats.highSeqFeed))
c.db.DbStats.Cache().PendingSeqLen.Set(int64(c.internalStats.pendingSeqLen))
c.db.DbStats.CBLReplicationPull().MaxPending.SetIfMax(int64(c.internalStats.maxPending))
c.db.DbStats.Cache().HighSeqStable.Set(int64(c._getMaxStableCached(ctx)))

c.db.DbStats.Cache().NumCurrentSeqsSkipped.Set(skippedSequenceListStats.NumCurrentSkippedSequencesStat)
c.db.DbStats.Cache().NumSkippedSeqs.Set(skippedSequenceListStats.NumCumulativeSkippedSequencesStat)
c.db.DbStats.Cache().SkippedSeqLen.Set(skippedSequenceListStats.ListLengthStat)
c.db.DbStats.Cache().SkippedSeqCap.Set(skippedSequenceListStats.ListCapacityStat)
}

type LogEntry channels.LogEntry
Expand Down Expand Up @@ -125,11 +130,6 @@ type LogEntries []*LogEntry
// A priority-queue of LogEntries, kept ordered by increasing sequence #.
type LogPriorityQueue []*LogEntry

type SkippedSequence struct {
seq uint64
timeAdded time.Time
}

type CacheOptions struct {
ChannelCacheOptions
CachePendingSeqMaxWait time.Duration // Max wait for pending sequence before skipping
Expand Down Expand Up @@ -170,7 +170,7 @@ func (c *changeCache) Init(ctx context.Context, dbContext *DatabaseContext, chan
c.receivedSeqs = make(map[uint64]struct{})
c.terminator = make(chan bool)
c.initTime = time.Now()
c.skippedSeqs = NewSkippedSequenceList()
c.skippedSeqs = NewSkippedSequenceSlice(DefaultClipCapacityHeadroom)
c.lastAddPendingTime = time.Now().UnixNano()
c.sgCfgPrefix = dbContext.MetadataKeys.SGCfgPrefix(c.db.Options.GroupID)
c.metaKeys = metaKeys
Expand Down Expand Up @@ -300,23 +300,20 @@ func (c *changeCache) InsertPendingEntries(ctx context.Context) error {

// Cleanup function, invoked periodically.
// Removes skipped entries from skippedSeqs that have been waiting longer
// than MaxChannelLogMissingWaitTime from the queue. Attempts view retrieval
// prior to removal. Only locks skipped sequence queue to build the initial set (GetSkippedSequencesOlderThanMaxWait)
// and subsequent removal (RemoveSkipped).
// than CacheSkippedSeqMaxWait from the slice.
func (c *changeCache) CleanSkippedSequenceQueue(ctx context.Context) error {

oldSkippedSequences := c.GetSkippedSequencesOlderThanMaxWait()
if len(oldSkippedSequences) == 0 {
base.InfofCtx(ctx, base.KeyCache, "Starting CleanSkippedSequenceQueue for database %s", base.MD(c.db.Name))

compactedSequences := c.skippedSeqs.SkippedSequenceCompact(ctx, int64(c.options.CacheSkippedSeqMaxWait.Seconds()))
if compactedSequences == 0 {
base.InfofCtx(ctx, base.KeyCache, "CleanSkippedSequenceQueue complete. No sequences to be compacted from skipped sequence list for database %s.", base.MD(c.db.Name))
return nil
}

base.InfofCtx(ctx, base.KeyCache, "Starting CleanSkippedSequenceQueue, found %d skipped sequences older than max wait for database %s", len(oldSkippedSequences), base.MD(c.db.Name))

// Purge sequences not found from the skipped sequence queue
numRemoved := c.RemoveSkippedSequences(ctx, oldSkippedSequences)
c.db.DbStats.Cache().AbandonedSeqs.Add(numRemoved)
c.db.DbStats.Cache().AbandonedSeqs.Add(compactedSequences)

base.InfofCtx(ctx, base.KeyCache, "CleanSkippedSequenceQueue complete. Not Found:%d for database %s.", len(oldSkippedSequences), base.MD(c.db.Name))
base.InfofCtx(ctx, base.KeyCache, "CleanSkippedSequenceQueue complete. Cleaned %d sequences from skipped list for database %s.", compactedSequences, base.MD(c.db.Name))
return nil
}

Expand Down Expand Up @@ -781,15 +778,15 @@ func (c *changeCache) _addPendingLogs(ctx context.Context) channels.Set {
var changedChannels channels.Set

for len(c.pendingLogs) > 0 {
change := c.pendingLogs[0]
isNext := change.Sequence == c.nextSequence
oldestPending := c.pendingLogs[0]
isNext := oldestPending.Sequence == c.nextSequence
if isNext {
heap.Pop(&c.pendingLogs)
changedChannels = changedChannels.UpdateWithSlice(c._addToCache(ctx, change))
changedChannels = changedChannels.UpdateWithSlice(c._addToCache(ctx, oldestPending))
} else if len(c.pendingLogs) > c.options.CachePendingSeqMaxNum || time.Since(c.pendingLogs[0].TimeReceived) >= c.options.CachePendingSeqMaxWait {
c.db.DbStats.Cache().NumSkippedSeqs.Add(1)
c.PushSkipped(ctx, c.nextSequence)
c.nextSequence++
// Skip all sequences up to the oldest Pending
c.PushSkipped(ctx, c.nextSequence, oldestPending.Sequence-1)
c.nextSequence = oldestPending.Sequence
} else {
break
}
Expand Down Expand Up @@ -878,33 +875,21 @@ func (h *LogPriorityQueue) Pop() interface{} {
// ////// SKIPPED SEQUENCE QUEUE

func (c *changeCache) RemoveSkipped(x uint64) error {
err := c.skippedSeqs.Remove(x)
c.db.DbStats.Cache().SkippedSeqLen.Set(int64(c.skippedSeqs.skippedList.Len()))
err := c.skippedSeqs.removeSeq(x)
return err
}

// Removes a set of sequences. Logs warning on removal error, returns count of successfully removed.
func (c *changeCache) RemoveSkippedSequences(ctx context.Context, sequences []uint64) (removedCount int64) {
numRemoved := c.skippedSeqs.RemoveSequences(ctx, sequences)
c.db.DbStats.Cache().SkippedSeqLen.Set(int64(c.skippedSeqs.skippedList.Len()))
return numRemoved
func (c *changeCache) RemoveSkippedSequences(ctx context.Context, sequences []uint64) {
// this can be used for removal or ranges, pending CBG-3855
}

func (c *changeCache) WasSkipped(x uint64) bool {
return c.skippedSeqs.Contains(x)
}

func (c *changeCache) PushSkipped(ctx context.Context, sequence uint64) {
err := c.skippedSeqs.Push(&SkippedSequence{seq: sequence, timeAdded: time.Now()})
if err != nil {
base.InfofCtx(ctx, base.KeyCache, "Error pushing skipped sequence: %d, %v", sequence, err)
return
}
c.db.DbStats.Cache().SkippedSeqLen.Set(int64(c.skippedSeqs.skippedList.Len()))
}

func (c *changeCache) GetSkippedSequencesOlderThanMaxWait() (oldSequences []uint64) {
return c.skippedSeqs.getOlderThan(c.options.CacheSkippedSeqMaxWait)
func (c *changeCache) PushSkipped(ctx context.Context, startSeq uint64, endSeq uint64) {
c.skippedSeqs.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(startSeq, endSeq))
}

// waitForSequence blocks up to maxWaitTime until the given sequence has been received.
Expand Down Expand Up @@ -957,116 +942,3 @@ func (c *changeCache) _getMaxStableCached(ctx context.Context) uint64 {
}
return c.nextSequence - 1
}

// SkippedSequenceList stores the set of skipped sequences as an ordered list of *SkippedSequence with an associated map
// for sequence-based lookup.
type SkippedSequenceList struct {
skippedList *list.List // Ordered list of skipped sequences
skippedMap map[uint64]*list.Element // Map from sequence to list elements
lock sync.RWMutex // Coordinates access to skippedSequenceList
}

func NewSkippedSequenceList() *SkippedSequenceList {
return &SkippedSequenceList{
skippedMap: map[uint64]*list.Element{},
skippedList: list.New(),
}
}

// getOldest returns the sequence of the first element in the skippedSequenceList
func (l *SkippedSequenceList) getOldest() (oldestSkippedSeq uint64) {
l.lock.RLock()
if firstElement := l.skippedList.Front(); firstElement != nil {
value := firstElement.Value.(*SkippedSequence)
oldestSkippedSeq = value.seq
}
l.lock.RUnlock()
return oldestSkippedSeq
}

// Removes a single entry from the list.
func (l *SkippedSequenceList) Remove(x uint64) error {
l.lock.Lock()
err := l._remove(x)
l.lock.Unlock()
return err
}

func (l *SkippedSequenceList) RemoveSequences(ctx context.Context, sequences []uint64) (removedCount int64) {
l.lock.Lock()
for _, seq := range sequences {
err := l._remove(seq)
if err != nil {
base.WarnfCtx(ctx, "Error purging skipped sequence %d from skipped sequence queue: %v", seq, err)
} else {
removedCount++
}
}
l.lock.Unlock()
return removedCount
}

// Removes an entry from the list. Expects callers to hold l.lock.Lock
func (l *SkippedSequenceList) _remove(x uint64) error {
if listElement, ok := l.skippedMap[x]; ok {
l.skippedList.Remove(listElement)
delete(l.skippedMap, x)
return nil
} else {
return errors.New("Value not found")
}
}

// Contains does a simple search to detect presence
func (l *SkippedSequenceList) Contains(x uint64) bool {
l.lock.RLock()
_, ok := l.skippedMap[x]
l.lock.RUnlock()
return ok
}

// Push sequence to the end of SkippedSequenceList. Validates sequence ordering in list.
func (l *SkippedSequenceList) Push(x *SkippedSequence) (err error) {

l.lock.Lock()
validPush := false
lastElement := l.skippedList.Back()
if lastElement == nil {
validPush = true
} else {
lastSkipped, _ := lastElement.Value.(*SkippedSequence)
if lastSkipped.seq < x.seq {
validPush = true
}
}

if validPush {
newListElement := l.skippedList.PushBack(x)
l.skippedMap[x.seq] = newListElement
} else {
err = errors.New("Can't push sequence lower than existing maximum")
}

l.lock.Unlock()
return err

}

// getOldest returns a slice of sequences older than the specified duration of the first element in the skippedSequenceList
func (l *SkippedSequenceList) getOlderThan(skippedExpiry time.Duration) []uint64 {

l.lock.RLock()
oldSequences := make([]uint64, 0)
for e := l.skippedList.Front(); e != nil; e = e.Next() {
skippedSeq := e.Value.(*SkippedSequence)
if time.Since(skippedSeq.timeAdded) > skippedExpiry {
oldSequences = append(oldSequences, skippedSeq.seq)
} else {
// skippedSeqs are ordered by arrival time, so can stop iterating once we find one
// still inside the time window
break
}
}
l.lock.RUnlock()
return oldSequences
}
Loading
Loading