diff --git a/base/stats.go b/base/stats.go index a927633ed4..8bb4b6372d 100644 --- a/base/stats.go +++ b/base/stats.go @@ -422,7 +422,7 @@ type CacheStats struct { // The total number of active channels. NumActiveChannels *SgwIntStat `json:"num_active_channels"` // The total number of skipped sequences. This is a cumulative value. - NumSkippedSeqs *SgwIntStat `json:"cumulative_num_skipped_seqs"` + NumSkippedSeqs *SgwIntStat `json:"num_skipped_seqs"` // The total number of pending sequences. These are out-of-sequence entries waiting to be cached. PendingSeqLen *SgwIntStat `json:"pending_seq_len"` // The total number of revision cache bypass operations performed. @@ -1307,7 +1307,7 @@ func (d *DbStats) initCacheStats() error { if err != nil { return err } - resUtil.NumSkippedSeqs, err = NewIntStat(SubsystemCacheKey, "cumulative_num_skipped_seqs", StatUnitNoUnits, NumSkippedSeqsDesc, StatAddedVersion3dot0dot0, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.CounterValue, 0) + resUtil.NumSkippedSeqs, err = NewIntStat(SubsystemCacheKey, "num_skipped_seqs", StatUnitNoUnits, NumSkippedSeqsDesc, StatAddedVersion3dot0dot0, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.CounterValue, 0) if err != nil { return err } diff --git a/db/change_cache.go b/db/change_cache.go index 0fe690d83d..312cc400c6 100644 --- a/db/change_cache.go +++ b/db/change_cache.go @@ -85,15 +85,17 @@ func (c *changeCache) updateStats(ctx context.Context) { if c.db == nil { return } + // grab skipped sequence stats + skippedSequenceListStats := c.skippedSeqs.getStats() + c.db.DbStats.Database().HighSeqFeed.SetIfMax(int64(c.internalStats.highSeqFeed)) c.db.DbStats.Cache().PendingSeqLen.Set(int64(c.internalStats.pendingSeqLen)) c.db.DbStats.CBLReplicationPull().MaxPending.SetIfMax(int64(c.internalStats.maxPending)) c.db.DbStats.Cache().HighSeqStable.Set(int64(c._getMaxStableCached(ctx))) - c.db.DbStats.Cache().SkippedSeqLen.Set(c.skippedSeqs.getSliceLength()) - c.db.DbStats.Cache().SkippedSeqCap.Set(c.skippedSeqs.getSliceCapacity()) - c.db.DbStats.Cache().NumSkippedSeqs.Set(c.skippedSeqs.getCumulativeNumSkippedSequenceValue()) - c.db.DbStats.Cache().NumCurrentSeqsSkipped.Set(c.skippedSeqs.getNumCurrentSkippedSequenceValue()) - + c.db.DbStats.Cache().NumCurrentSeqsSkipped.Set(skippedSequenceListStats.NumCurrentSkippedSequencesStat) + c.db.DbStats.Cache().NumSkippedSeqs.Set(skippedSequenceListStats.NumCumulativeSkippedSequencesStat) + c.db.DbStats.Cache().SkippedSeqLen.Set(skippedSequenceListStats.ListLengthStat) + c.db.DbStats.Cache().SkippedSeqCap.Set(skippedSequenceListStats.ListCapacityStat) } type LogEntry channels.LogEntry @@ -304,9 +306,6 @@ func (c *changeCache) CleanSkippedSequenceQueue(ctx context.Context) error { base.InfofCtx(ctx, base.KeyCache, "Starting CleanSkippedSequenceQueue for database %s", base.MD(c.db.Name)) maxWait := int64(c.options.CacheSkippedSeqMaxWait.Seconds()) - if maxWait < 1 { - maxWait = 1 - } compactedSequences := c.skippedSeqs.SkippedSequenceCompact(ctx, maxWait) if compactedSequences == 0 { base.InfofCtx(ctx, base.KeyCache, "CleanSkippedSequenceQueue complete. No sequences to be compacted from skipped sequence list for database %s.", base.MD(c.db.Name)) @@ -780,14 +779,15 @@ func (c *changeCache) _addPendingLogs(ctx context.Context) channels.Set { var changedChannels channels.Set for len(c.pendingLogs) > 0 { - change := c.pendingLogs[0] - isNext := change.Sequence == c.nextSequence + oldestPending := c.pendingLogs[0] + isNext := oldestPending.Sequence == c.nextSequence if isNext { heap.Pop(&c.pendingLogs) - changedChannels = changedChannels.UpdateWithSlice(c._addToCache(ctx, change)) + changedChannels = changedChannels.UpdateWithSlice(c._addToCache(ctx, oldestPending)) } else if len(c.pendingLogs) > c.options.CachePendingSeqMaxNum || time.Since(c.pendingLogs[0].TimeReceived) >= c.options.CachePendingSeqMaxWait { - c.PushSkipped(ctx, c.nextSequence, change.Sequence-1) - c.nextSequence = change.Sequence + // Skip all sequences up to the oldest Pending + c.PushSkipped(ctx, c.nextSequence, oldestPending.Sequence-1) + c.nextSequence = oldestPending.Sequence } else { break } diff --git a/db/skipped_sequence.go b/db/skipped_sequence.go index 36b0ea3f63..2e6a4698a2 100644 --- a/db/skipped_sequence.go +++ b/db/skipped_sequence.go @@ -43,6 +43,14 @@ type SkippedSequenceListEntry struct { timestamp int64 // timestamp this entry was created in unix format } +// SkippedSequenceStats will hold all stats associated with the skipped sequence slice, used for getStats() +type SkippedSequenceStats struct { + NumCurrentSkippedSequencesStat int64 + NumCumulativeSkippedSequencesStat int64 + ListCapacityStat int64 + ListLengthStat int64 +} + func NewSkippedSequenceSlice(clipHeadroom int) *SkippedSequenceSlice { return &SkippedSequenceSlice{ list: []*SkippedSequenceListEntry{}, @@ -302,30 +310,15 @@ func (s *SkippedSequenceSlice) getOldest() uint64 { return s.list[0].getStartSeq() } -// getSliceLength retrieves the current skipped sequence slice length -func (s *SkippedSequenceSlice) getSliceLength() int64 { - s.lock.RLock() - defer s.lock.RUnlock() - return int64(len(s.list)) -} - -// getSliceCapacity retrieves the current skipped sequence slice capacity -func (s *SkippedSequenceSlice) getSliceCapacity() int64 { - s.lock.RLock() - defer s.lock.RUnlock() - return int64(cap(s.list)) -} - -// getNumCurrentSkippedSequenceValue retrieves the current skipped sequence count -func (s *SkippedSequenceSlice) getNumCurrentSkippedSequenceValue() int64 { +// getStats will return all associated stats with the skipped sequence slice +func (s *SkippedSequenceSlice) getStats() SkippedSequenceStats { s.lock.RLock() defer s.lock.RUnlock() - return s.NumCurrentSkippedSequences -} -// getCumulativeNumSkippedSequenceValue retrieves the cumulative skipped sequence count -func (s *SkippedSequenceSlice) getCumulativeNumSkippedSequenceValue() int64 { - s.lock.RLock() - defer s.lock.RUnlock() - return s.NumCumulativeSkippedSequences + return SkippedSequenceStats{ + NumCumulativeSkippedSequencesStat: s.NumCumulativeSkippedSequences, + NumCurrentSkippedSequencesStat: s.NumCurrentSkippedSequences, + ListCapacityStat: int64(cap(s.list)), + ListLengthStat: int64(len(s.list)), + } }