Skip to content

Commit

Permalink
[3.1.6 Backport] CBG-3826: log warning if released unused sequence co…
Browse files Browse the repository at this point in the history
…unt > 1 million (#6798)

Co-authored-by: Tor Colvin <tor.colvin@couchbase.com>
  • Loading branch information
bbrks and torcolvin authored May 1, 2024
1 parent 4e64afd commit 203c1d6
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 29 deletions.
11 changes: 9 additions & 2 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import (
)

const (
kMaxRecentSequences = 20 // Maximum number of sequences stored in RecentSequences before pruning is triggered
kMaxRecentSequences = 20 // Maximum number of sequences stored in RecentSequences before pruning is triggered
unusedSequenceWarningThreshold = 10000 // Warn when releasing more than this many sequences due to existing sequence on the document
)

// ErrForbidden is returned when the user requests a document without a revision that they do not have access to.
Expand Down Expand Up @@ -1550,17 +1551,23 @@ func (db *DatabaseContext) assignSequence(ctx context.Context, docSequence uint6
if docSequence, err = db.sequences.nextSequence(ctx); err != nil {
return unusedSequences, err
}
firstAllocatedSequence := docSequence

// If the assigned sequence is less than or equal to the previous sequence on the document, release
// the assigned sequence and acquire one using nextSequenceGreaterThan
if docSequence <= doc.Sequence {
if err = db.sequences.releaseSequence(ctx, docSequence); err != nil {
base.WarnfCtx(ctx, "Error returned when releasing sequence %d. Falling back to skipped sequence handling. Error:%v", docSequence, err)
}
docSequence, err = db.sequences.nextSequenceGreaterThan(ctx, doc.Sequence)
var releasedSequenceCount uint64
docSequence, releasedSequenceCount, err = db.sequences.nextSequenceGreaterThan(ctx, doc.Sequence)
if err != nil {
return unusedSequences, err
}
if releasedSequenceCount > unusedSequenceWarningThreshold {
base.WarnfCtx(ctx, "Doc %s / %s had an existing sequence %d that is higher than the next db sequence value %d, resulting in the release of %d unused sequences. This may indicate documents being migrated between databases by an external process.", base.UD(doc.ID), doc.CurrentRev, doc.Sequence, firstAllocatedSequence, releasedSequenceCount)
}

}
}

Expand Down
37 changes: 21 additions & 16 deletions db/sequence_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (s *sequenceAllocator) releaseUnusedSequences(ctx context.Context) {
return
}
if s.last < s.max {
err := s.releaseSequenceRange(ctx, s.last+1, s.max)
_, err := s.releaseSequenceRange(ctx, s.last+1, s.max)
if err != nil {
base.WarnfCtx(ctx, "Error returned when releasing sequence range [%d-%d]. Falling back to skipped sequence handling. Error:%v", s.last+1, s.max, err)
}
Expand Down Expand Up @@ -185,7 +185,7 @@ func (s *sequenceAllocator) nextSequence(ctx context.Context) (sequence uint64,
// nextSequenceGreaterThan increments _sync:seq such that it's greater than existingSequence + s.sequenceBatchSize
// In the case where our local s.max < _sync:seq (another node has incremented _sync:seq), we may be releasing
// sequences greater than existingSequence, but we will only ever release sequences allocated by this node's incr operation
func (s *sequenceAllocator) nextSequenceGreaterThan(ctx context.Context, existingSequence uint64) (sequence uint64, err error) {
func (s *sequenceAllocator) nextSequenceGreaterThan(ctx context.Context, existingSequence uint64) (sequence uint64, releasedSequenceCount uint64, err error) {

targetSequence := existingSequence + 1
s.mutex.Lock()
Expand All @@ -194,13 +194,13 @@ func (s *sequenceAllocator) nextSequenceGreaterThan(ctx context.Context, existin
sequence, sequencesReserved, err := s._nextSequence(ctx)
s.mutex.Unlock()
if err != nil {
return 0, err
return 0, 0, err
}
if sequencesReserved {
s.reserveNotify <- struct{}{}
}
s.dbStats.SequenceAssignedCount.Add(1)
return sequence, nil
return sequence, 0, nil
}

// If the target sequence is in our existing batch (between s.last and s.max), we want to release all unused sequences in the batch earlier
Expand All @@ -210,12 +210,14 @@ func (s *sequenceAllocator) nextSequenceGreaterThan(ctx context.Context, existin
s.last = targetSequence
s.mutex.Unlock()
if releaseFrom < targetSequence {
if err := s.releaseSequenceRange(ctx, releaseFrom, targetSequence-1); err != nil {
released, err := s.releaseSequenceRange(ctx, releaseFrom, targetSequence-1)
if err != nil {
base.WarnfCtx(ctx, "Error returned when releasing sequence range [%d-%d] from existing batch. Will be handled by skipped sequence handling. Error:%v", releaseFrom, targetSequence-1, err)
}
releasedSequenceCount += released
}
s.dbStats.SequenceAssignedCount.Add(1)
return targetSequence, nil
return targetSequence, releasedSequenceCount, nil

}

Expand All @@ -237,7 +239,7 @@ func (s *sequenceAllocator) nextSequenceGreaterThan(ctx context.Context, existin
if err != nil {
base.WarnfCtx(ctx, "Error from incrementSequence in nextSequenceGreaterThan(%d): %v", existingSequence, err)
s.mutex.Unlock()
return 0, err
return 0, 0, err
}

s.max = allocatedToSeq
Expand All @@ -252,22 +254,23 @@ func (s *sequenceAllocator) nextSequenceGreaterThan(ctx context.Context, existin
s.dbStats.SequenceAssignedCount.Add(1)

// Release previously allocated sequences (c), if any
err = s.releaseSequenceRange(ctx, prevAllocReleaseFrom, prevAllocReleaseTo)
released, err := s.releaseSequenceRange(ctx, prevAllocReleaseFrom, prevAllocReleaseTo)
if err != nil {
base.WarnfCtx(ctx, "Error returned when releasing sequence range [%d-%d] for previously allocated sequences. Will be handled by skipped sequence handling. Error:%v", prevAllocReleaseFrom, prevAllocReleaseTo, err)
}

releasedSequenceCount += released
// Release the newly allocated sequences that were used to catch up to existingSequence (d)
if numberToRelease > 0 {
releaseTo := allocatedToSeq - numberToAllocate
releaseFrom := releaseTo - numberToRelease + 1 // +1, as releaseSequenceRange is inclusive
err = s.releaseSequenceRange(ctx, releaseFrom, releaseTo)
released, err := s.releaseSequenceRange(ctx, releaseFrom, releaseTo)
if err != nil {
base.WarnfCtx(ctx, "Error returned when releasing sequence range [%d-%d] to reach target sequence. Will be handled by skipped sequence handling. Error:%v", releaseFrom, releaseTo, err)
}
releasedSequenceCount += released
}

return sequence, err
return sequence, releasedSequenceCount, err
}

// _nextSequence reserves if needed, and then returns the next sequence
Expand Down Expand Up @@ -349,23 +352,25 @@ func (s *sequenceAllocator) releaseSequence(ctx context.Context, sequence uint64
// releaseSequenceRange writes a binary document with the key _sync:unusedSeqs:fromSeq:toSeq.
// fromSeq and toSeq are inclusive (i.e. both fromSeq and toSeq are unused).
// From and to seq are stored as the document contents to avoid null doc issues.
func (s *sequenceAllocator) releaseSequenceRange(ctx context.Context, fromSequence, toSequence uint64) error {
// Returns the number of sequences released.
func (s *sequenceAllocator) releaseSequenceRange(ctx context.Context, fromSequence, toSequence uint64) (uint64, error) {

// Exit if there's nothing to release
if toSequence == 0 || toSequence < fromSequence {
return nil
return 0, nil
}
key := s.metaKeys.UnusedSeqRangeKey(fromSequence, toSequence)
body := make([]byte, 16)
binary.LittleEndian.PutUint64(body[:8], fromSequence)
binary.LittleEndian.PutUint64(body[8:16], toSequence)
_, err := s.datastore.AddRaw(key, UnusedSequenceTTL, body)
if err != nil {
return err
return 0, err
}
s.dbStats.SequenceReleasedCount.Add(int64(toSequence - fromSequence + 1))
count := toSequence - fromSequence + 1
s.dbStats.SequenceReleasedCount.Add(int64(count))
base.DebugfCtx(ctx, base.KeyCRUD, "Released unused sequences #%d-#%d", fromSequence, toSequence)
return nil
return count, nil
}

// waitForReleasedSequences blocks for 'releaseSequenceWait' past the provided startTime.
Expand Down
33 changes: 22 additions & 11 deletions db/sequence_allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,47 +262,54 @@ func TestNextSequenceGreaterThanSingleNode(t *testing.T) {
assert.NoError(t, err, "error retrieving last sequence")

// nextSequenceGreaterThan(0) should perform initial batch allocation of size 10, and not release any sequences
nextSequence, err := a.nextSequenceGreaterThan(ctx, 0)
nextSequence, releasedSequenceCount, err := a.nextSequenceGreaterThan(ctx, 0)
assert.NoError(t, err)
assert.Equal(t, uint64(1), nextSequence)
require.Equal(t, 0, int(releasedSequenceCount))
assertNewAllocatorStats(t, testStats, 1, 10, 1, 0) // incr, reserved, assigned, released counts

// Calling the same again should use from the existing batch
nextSequence, err = a.nextSequenceGreaterThan(ctx, 0)
nextSequence, releasedSequenceCount, err = a.nextSequenceGreaterThan(ctx, 0)
assert.NoError(t, err)
assert.Equal(t, uint64(2), nextSequence)
assert.Equal(t, 0, int(releasedSequenceCount))
assertNewAllocatorStats(t, testStats, 1, 10, 2, 0)

// Test case where greaterThan == s.Last + 1
nextSequence, err = a.nextSequenceGreaterThan(ctx, 2)
nextSequence, releasedSequenceCount, err = a.nextSequenceGreaterThan(ctx, 2)
assert.NoError(t, err)
assert.Equal(t, uint64(3), nextSequence)
require.Equal(t, 0, int(releasedSequenceCount))
assertNewAllocatorStats(t, testStats, 1, 10, 3, 0)

// When requested nextSequenceGreaterThan is > s.Last + 1, we should release previously allocated sequences but
// don't require a new incr
nextSequence, err = a.nextSequenceGreaterThan(ctx, 5)
nextSequence, releasedSequenceCount, err = a.nextSequenceGreaterThan(ctx, 5)
assert.NoError(t, err)
assert.Equal(t, uint64(6), nextSequence)
require.Equal(t, 2, int(releasedSequenceCount))
assertNewAllocatorStats(t, testStats, 1, 10, 4, 2)

// Test when requested nextSequenceGreaterThan == s.Max; should release previously allocated sequences and allocate a new batch
nextSequence, err = a.nextSequenceGreaterThan(ctx, 10)
nextSequence, releasedSequenceCount, err = a.nextSequenceGreaterThan(ctx, 10)
assert.NoError(t, err)
assert.Equal(t, uint64(11), nextSequence)
assert.Equal(t, 4, int(releasedSequenceCount))
assertNewAllocatorStats(t, testStats, 2, 20, 5, 6)

// Test when requested nextSequenceGreaterThan = s.Max + 1; should release previously allocated sequences AND max+1
nextSequence, err = a.nextSequenceGreaterThan(ctx, 21)
nextSequence, releasedSequenceCount, err = a.nextSequenceGreaterThan(ctx, 21)
assert.NoError(t, err)
assert.Equal(t, uint64(22), nextSequence)
assert.Equal(t, 10, int(releasedSequenceCount))
assertNewAllocatorStats(t, testStats, 3, 31, 6, 16)

// Test when requested nextSequenceGreaterThan > s.Max + batch size; should release 9 previously allocated sequences (23-31)
// and 19 in the gap to the requested sequence (32-50)
nextSequence, err = a.nextSequenceGreaterThan(ctx, 50)
nextSequence, releasedSequenceCount, err = a.nextSequenceGreaterThan(ctx, 50)
assert.NoError(t, err)
assert.Equal(t, uint64(51), nextSequence)
assert.Equal(t, 28, int(releasedSequenceCount))
assertNewAllocatorStats(t, testStats, 4, 60, 7, 44)

}
Expand Down Expand Up @@ -350,31 +357,35 @@ func TestNextSequenceGreaterThanMultiNode(t *testing.T) {
assert.NoError(t, err, "error retrieving last sequence")

// nextSequenceGreaterThan(0) on A should perform initial batch allocation of size 10 (allocs 1-10), and not release any sequences
nextSequence, err := a.nextSequenceGreaterThan(ctx, 0)
nextSequence, releasedSequenceCount, err := a.nextSequenceGreaterThan(ctx, 0)
assert.NoError(t, err)
assert.Equal(t, uint64(1), nextSequence)
assert.Equal(t, 0, int(releasedSequenceCount))
assertNewAllocatorStats(t, dbStatsA, 1, 10, 1, 0) // incr, reserved, assigned, released counts

// nextSequenceGreaterThan(0) on B should perform initial batch allocation of size 10 (allocs 11-20), and not release any sequences
nextSequence, err = b.nextSequenceGreaterThan(ctx, 0)
nextSequence, releasedSequenceCount, err = b.nextSequenceGreaterThan(ctx, 0)
assert.NoError(t, err)
assert.Equal(t, uint64(11), nextSequence)
assert.Equal(t, 0, int(releasedSequenceCount))
assertNewAllocatorStats(t, dbStatsB, 1, 10, 1, 0)

// calling nextSequenceGreaterThan(15) on B will assign from the existing batch, and release 12-15
nextSequence, err = b.nextSequenceGreaterThan(ctx, 15)
nextSequence, releasedSequenceCount, err = b.nextSequenceGreaterThan(ctx, 15)
assert.NoError(t, err)
assert.Equal(t, uint64(16), nextSequence)
assert.Equal(t, 4, int(releasedSequenceCount))
assertNewAllocatorStats(t, dbStatsB, 1, 10, 2, 4)

// calling nextSequenceGreaterThan(15) on A will increment _sync:seq by 5 on it's previously allocated sequence (10).
// Since node B has already updated _sync:seq to 20, will result in:
// node A releasing sequences 2-10 from it's existing buffer
// node A allocating and releasing sequences 21-24
// node A adding sequences 25-35 to its buffer, and assigning 25 to the current request
nextSequence, err = a.nextSequenceGreaterThan(ctx, 15)
nextSequence, releasedSequenceCount, err = a.nextSequenceGreaterThan(ctx, 15)
assert.NoError(t, err)
assert.Equal(t, uint64(26), nextSequence)
assert.Equal(t, 14, int(releasedSequenceCount))
assertNewAllocatorStats(t, dbStatsA, 2, 25, 2, 14)

}

0 comments on commit 203c1d6

Please sign in to comment.