Skip to content

Commit

Permalink
updates off review
Browse files Browse the repository at this point in the history
  • Loading branch information
gregns1 committed Apr 26, 2024
1 parent a79f17d commit 51baa32
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 38 deletions.
4 changes: 2 additions & 2 deletions base/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ type CacheStats struct {
// The total number of active channels.
NumActiveChannels *SgwIntStat `json:"num_active_channels"`
// The total number of skipped sequences. This is a cumulative value.
NumSkippedSeqs *SgwIntStat `json:"cumulative_num_skipped_seqs"`
NumSkippedSeqs *SgwIntStat `json:"num_skipped_seqs"`
// The total number of pending sequences. These are out-of-sequence entries waiting to be cached.
PendingSeqLen *SgwIntStat `json:"pending_seq_len"`
// The total number of revision cache bypass operations performed.
Expand Down Expand Up @@ -1307,7 +1307,7 @@ func (d *DbStats) initCacheStats() error {
if err != nil {
return err
}
resUtil.NumSkippedSeqs, err = NewIntStat(SubsystemCacheKey, "cumulative_num_skipped_seqs", StatUnitNoUnits, NumSkippedSeqsDesc, StatAddedVersion3dot0dot0, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.CounterValue, 0)
resUtil.NumSkippedSeqs, err = NewIntStat(SubsystemCacheKey, "num_skipped_seqs", StatUnitNoUnits, NumSkippedSeqsDesc, StatAddedVersion3dot0dot0, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.CounterValue, 0)
if err != nil {
return err
}
Expand Down
26 changes: 13 additions & 13 deletions db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,17 @@ func (c *changeCache) updateStats(ctx context.Context) {
if c.db == nil {
return
}
// grab skipped sequence stats
skippedSequenceListStats := c.skippedSeqs.getStats()

c.db.DbStats.Database().HighSeqFeed.SetIfMax(int64(c.internalStats.highSeqFeed))
c.db.DbStats.Cache().PendingSeqLen.Set(int64(c.internalStats.pendingSeqLen))
c.db.DbStats.CBLReplicationPull().MaxPending.SetIfMax(int64(c.internalStats.maxPending))
c.db.DbStats.Cache().HighSeqStable.Set(int64(c._getMaxStableCached(ctx)))
c.db.DbStats.Cache().SkippedSeqLen.Set(c.skippedSeqs.getSliceLength())
c.db.DbStats.Cache().SkippedSeqCap.Set(c.skippedSeqs.getSliceCapacity())
c.db.DbStats.Cache().NumSkippedSeqs.Set(c.skippedSeqs.getCumulativeNumSkippedSequenceValue())
c.db.DbStats.Cache().NumCurrentSeqsSkipped.Set(c.skippedSeqs.getNumCurrentSkippedSequenceValue())

c.db.DbStats.Cache().NumCurrentSeqsSkipped.Set(skippedSequenceListStats.NumCurrentSkippedSequencesStat)
c.db.DbStats.Cache().NumSkippedSeqs.Set(skippedSequenceListStats.NumCumulativeSkippedSequencesStat)
c.db.DbStats.Cache().SkippedSeqLen.Set(skippedSequenceListStats.ListLengthStat)
c.db.DbStats.Cache().SkippedSeqCap.Set(skippedSequenceListStats.ListCapacityStat)
}

type LogEntry channels.LogEntry
Expand Down Expand Up @@ -304,9 +306,6 @@ func (c *changeCache) CleanSkippedSequenceQueue(ctx context.Context) error {
base.InfofCtx(ctx, base.KeyCache, "Starting CleanSkippedSequenceQueue for database %s", base.MD(c.db.Name))

maxWait := int64(c.options.CacheSkippedSeqMaxWait.Seconds())
if maxWait < 1 {
maxWait = 1
}
compactedSequences := c.skippedSeqs.SkippedSequenceCompact(ctx, maxWait)
if compactedSequences == 0 {
base.InfofCtx(ctx, base.KeyCache, "CleanSkippedSequenceQueue complete. No sequences to be compacted from skipped sequence list for database %s.", base.MD(c.db.Name))
Expand Down Expand Up @@ -780,14 +779,15 @@ func (c *changeCache) _addPendingLogs(ctx context.Context) channels.Set {
var changedChannels channels.Set

for len(c.pendingLogs) > 0 {
change := c.pendingLogs[0]
isNext := change.Sequence == c.nextSequence
oldestPending := c.pendingLogs[0]
isNext := oldestPending.Sequence == c.nextSequence
if isNext {
heap.Pop(&c.pendingLogs)
changedChannels = changedChannels.UpdateWithSlice(c._addToCache(ctx, change))
changedChannels = changedChannels.UpdateWithSlice(c._addToCache(ctx, oldestPending))
} else if len(c.pendingLogs) > c.options.CachePendingSeqMaxNum || time.Since(c.pendingLogs[0].TimeReceived) >= c.options.CachePendingSeqMaxWait {
c.PushSkipped(ctx, c.nextSequence, change.Sequence-1)
c.nextSequence = change.Sequence
// Skip all sequences up to the oldest Pending
c.PushSkipped(ctx, c.nextSequence, oldestPending.Sequence-1)
c.nextSequence = oldestPending.Sequence
} else {
break
}
Expand Down
39 changes: 16 additions & 23 deletions db/skipped_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ type SkippedSequenceListEntry struct {
timestamp int64 // timestamp this entry was created in unix format
}

// SkippedSequenceStats will hold all stats associated with the skipped sequence slice, used for getStats()
type SkippedSequenceStats struct {
NumCurrentSkippedSequencesStat int64
NumCumulativeSkippedSequencesStat int64
ListCapacityStat int64
ListLengthStat int64
}

func NewSkippedSequenceSlice(clipHeadroom int) *SkippedSequenceSlice {
return &SkippedSequenceSlice{
list: []*SkippedSequenceListEntry{},
Expand Down Expand Up @@ -302,30 +310,15 @@ func (s *SkippedSequenceSlice) getOldest() uint64 {
return s.list[0].getStartSeq()
}

// getSliceLength retrieves the current skipped sequence slice length
func (s *SkippedSequenceSlice) getSliceLength() int64 {
s.lock.RLock()
defer s.lock.RUnlock()
return int64(len(s.list))
}

// getSliceCapacity retrieves the current skipped sequence slice capacity
func (s *SkippedSequenceSlice) getSliceCapacity() int64 {
s.lock.RLock()
defer s.lock.RUnlock()
return int64(cap(s.list))
}

// getNumCurrentSkippedSequenceValue retrieves the current skipped sequence count
func (s *SkippedSequenceSlice) getNumCurrentSkippedSequenceValue() int64 {
// getStats will return all associated stats with the skipped sequence slice
func (s *SkippedSequenceSlice) getStats() SkippedSequenceStats {
s.lock.RLock()
defer s.lock.RUnlock()
return s.NumCurrentSkippedSequences
}

// getCumulativeNumSkippedSequenceValue retrieves the cumulative skipped sequence count
func (s *SkippedSequenceSlice) getCumulativeNumSkippedSequenceValue() int64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.NumCumulativeSkippedSequences
return SkippedSequenceStats{
NumCumulativeSkippedSequencesStat: s.NumCumulativeSkippedSequences,
NumCurrentSkippedSequencesStat: s.NumCurrentSkippedSequences,
ListCapacityStat: int64(cap(s.list)),
ListLengthStat: int64(len(s.list)),
}
}

0 comments on commit 51baa32

Please sign in to comment.