Skip to content

Commit

Permalink
CBG-3853: Implementation of range sequence entry for skipped sequence…
Browse files Browse the repository at this point in the history
… slice (#6764)
  • Loading branch information
gregns1 authored Apr 19, 2024
1 parent 64bad65 commit e8a27bf
Show file tree
Hide file tree
Showing 2 changed files with 613 additions and 179 deletions.
209 changes: 140 additions & 69 deletions db/skipped_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,89 +24,123 @@ const (
DefaultClipCapacityHeadroom = 1000
)

type SkippedSequenceListEntry interface {
getTimestamp() int64
getLastSeq() uint64
setLastSeq(seq uint64)
getStartSeq() uint64
isRange() bool
getNumSequencesInEntry() int
}

// SkippedSequenceSlice stores the set of skipped sequences as an ordered slice of single skipped sequences
// or skipped sequence ranges
type SkippedSequenceSlice struct {
list []SkippedSequenceListEntry
list []*SkippedSequenceListEntry
ClipCapacityHeadroom int
lock sync.RWMutex
}

var _ SkippedSequenceListEntry = &SingleSkippedSequence{}

// SingleSkippedSequence contains a single skipped sequence value + unix timestamp of the time it's created
type SingleSkippedSequence struct {
seq uint64
timestamp int64
// SkippedSequenceListEntry contains start + end sequence for a range of skipped sequences +
// a timestamp at which the entry was created in unix format. If an entry is a singular skipped sequence the start and
// end sequence will be equal
type SkippedSequenceListEntry struct {
start uint64 // start sequence of a range
end uint64 // end sequence of the range (0 if a singular skipped sequence)
timestamp int64 // timestamp this entry was created in unix format
}

func NewSkippedSequenceSlice(clipHeadroom int) *SkippedSequenceSlice {
return &SkippedSequenceSlice{
list: []SkippedSequenceListEntry{},
list: []*SkippedSequenceListEntry{},
ClipCapacityHeadroom: clipHeadroom,
}
}

// NewSingleSkippedSequenceEntry returns a SingleSkippedSequence with the specified sequence and the current
// time in unix time
func NewSingleSkippedSequenceEntry(sequence uint64) *SingleSkippedSequence {
return &SingleSkippedSequence{
seq: sequence,
// NewSkippedSequenceRangeEntry returns a SkippedSequenceListEntry with the specified sequence range and the current
// timestamp in unix time
func NewSkippedSequenceRangeEntry(start, end uint64) *SkippedSequenceListEntry {
return &SkippedSequenceListEntry{
start: start,
end: end,
timestamp: time.Now().Unix(),
}
}

// NewSkippedSequenceRangeEntryAt returns a SkippedSequenceListEntry with the specified sequences and the supplied
// timestamp in unix time
func NewSkippedSequenceRangeEntryAt(start, end uint64, timeStamp int64) *SkippedSequenceListEntry {
return &SkippedSequenceListEntry{
start: start,
end: end,
timestamp: timeStamp,
}
}

// NewSingleSkippedSequenceEntry returns a SkippedSequenceListEntry with start and end seq defined as equal
// with the current timestamp in unix time
func NewSingleSkippedSequenceEntry(seq uint64) *SkippedSequenceListEntry {
return &SkippedSequenceListEntry{
start: seq,
end: seq,
timestamp: time.Now().Unix(),
}
}

// NewSingleSkippedSequenceEntryAt returns a SkippedSequenceListEntry with start and end seq defined as equal
// and the supplied timestamp in unix time
func NewSingleSkippedSequenceEntryAt(seq uint64, timeStamp int64) *SkippedSequenceListEntry {
return &SkippedSequenceListEntry{
start: seq,
end: seq,
timestamp: timeStamp,
}
}

// getTimestamp returns the timestamp of the entry
func (s *SingleSkippedSequence) getTimestamp() int64 {
func (s *SkippedSequenceListEntry) getTimestamp() int64 {
return s.timestamp
}

// getLastSeq gets the last sequence in the range on the entry, for single items the sequence will be returned
func (s *SingleSkippedSequence) getLastSeq() uint64 {
return s.seq
// getStartSeq gets the start sequence in the range on the entry, for single items the sequence (startSeq) will be returned
func (s *SkippedSequenceListEntry) getStartSeq() uint64 {
return s.start
}

// setLastSeq sets the last sequence in the range on the entry, for single items its a no-op
func (s *SingleSkippedSequence) setLastSeq(seq uint64) {
// no-op
// getLastSeq gets the last sequence in the range on the entry, for single items the sequence 0 will be returned
func (s *SkippedSequenceListEntry) getLastSeq() uint64 {
return s.end
}

// getStartSeq gets the start sequence in the range on the entry, for single items the sequence will be returned
func (s *SingleSkippedSequence) getStartSeq() uint64 {
return s.seq
// setLastSeq sets the last sequence in the range on the entry
func (s *SkippedSequenceListEntry) setLastSeq(seq uint64) {
s.end = seq
}

// isRange returns true if the entry is a sequence range entry, false if not
func (s *SingleSkippedSequence) isRange() bool {
return false
// setStartSeq sets the last sequence in the range on the entry
func (s *SkippedSequenceListEntry) setStartSeq(seq uint64) {
s.start = seq
}

// getNumSequencesInEntry returns the number of sequences a entry in skipped slice holds,
// getNumSequencesInEntry returns the number of sequences an entry in skipped slice holds,
// for single entries it will return just 1
func (s *SingleSkippedSequence) getNumSequencesInEntry() int {
return 1
func (s *SkippedSequenceListEntry) getNumSequencesInEntry() int64 {
if s.end == s.start {
return 1
}
numSequences := (s.end - s.start) + 1
return int64(numSequences)
}

// extendRange will set the range last seq to the incoming contiguous entry's last seq
// + set the timestamp.
func (s *SkippedSequenceListEntry) extendRange(lastSeq uint64, timeStamp int64) {
s.timestamp = timeStamp
s.setLastSeq(lastSeq)
}

// Contains returns true if a given sequence exists in the skipped sequence slice
func (s *SkippedSequenceSlice) Contains(x uint64) bool {
s.lock.RLock()
defer s.lock.RUnlock()
_, found := s.findSequence(x)
_, found := s._findSequence(x)
return found
}

// SkippedSequenceCompact will compact the entries with timestamp old enough. It will also clip
// the capacity of the slice to length + 100 if the current capacity is 2.5x the length
func (s *SkippedSequenceSlice) SkippedSequenceCompact(ctx context.Context, maxWait int64) (numSequencesCompacted int) {
func (s *SkippedSequenceSlice) SkippedSequenceCompact(ctx context.Context, maxWait int64) (numSequencesCompacted int64) {
s.lock.Lock()
defer s.lock.Unlock()

Expand All @@ -128,13 +162,13 @@ func (s *SkippedSequenceSlice) SkippedSequenceCompact(ctx context.Context, maxWa
s.list = slices.Delete(s.list, 0, indexToDelete+1)
}
// resize slice to reclaim memory if we need to
s.clip(ctx)
s._clip(ctx)
return numSequencesCompacted
}

// clip will clip the capacity of the slice to the current length plus the configured headroom in ClipCapacityHeadroom
// _clip will clip the capacity of the slice to the current length plus the configured headroom in ClipCapacityHeadroom
// if the current capacity of the slice in 2.5x the length
func (s *SkippedSequenceSlice) clip(ctx context.Context) {
func (s *SkippedSequenceSlice) _clip(ctx context.Context) {
// threshold is 2.5x the current length of the slice
threshold := 2.5 * float64(len(s.list))

Expand All @@ -147,68 +181,105 @@ func (s *SkippedSequenceSlice) clip(ctx context.Context) {
}
}

// findSequence will use binary search to search the elements in the slice for a given sequence
func (s *SkippedSequenceSlice) findSequence(x uint64) (int, bool) {
// _findSequence will use binary search to search the elements in the slice for a given sequence
func (s *SkippedSequenceSlice) _findSequence(x uint64) (int, bool) {
i, found := slices.BinarySearchFunc(s.list, x, binarySearchFunc)
return i, found
}

func binarySearchFunc(a SkippedSequenceListEntry, seq uint64) int {
singleSeq, ok := a.(*SingleSkippedSequence)
if ok {
if singleSeq.seq > seq {
return 1
}
if singleSeq.seq == seq {
return 0
}
return -1
// binarySearchFunc contains the custom search function for searching the skipped sequence slice for a particular sequence
func binarySearchFunc(a *SkippedSequenceListEntry, seq uint64) int {
if a.getStartSeq() > seq {
return 1
}

if a.getLastSeq() >= seq {
return 0
}
// should never get here as it stands, will have extra handling here pending CBG-3853
return 1
return -1
}

// removeSeq will remove a given sequence from the slice if it exists
func (s *SkippedSequenceSlice) removeSeq(x uint64) error {
s.lock.Lock()
defer s.lock.Unlock()
index, found := s.findSequence(x)
index, found := s._findSequence(x)
if !found {
return fmt.Errorf("sequence %d not found in the skipped list", x)
}
// handle border cases where x is equal to start or end sequence on range (or just sequence for single entries)
if !s.list[index].isRange() {

// take the element at the index and handle cases required to removal of a sequence
rangeElem := s.list[index]
numSequences := rangeElem.getNumSequencesInEntry()

if numSequences == 1 {
// if not a range, we can just remove the single entry from the slice
s.list = slices.Delete(s.list, index, index+1)
return nil
}
// more range handling here CBG-3853, temporarily error as we shouldn't get to this point at this time
return fmt.Errorf("entered range handling code")

// range entry handling
// if x == startSeq set startSeq+1, if x == lastSeq set lastSeq-1
if rangeElem.getStartSeq() == x {
rangeElem.setStartSeq(x + 1)
return nil
}
if rangeElem.getLastSeq() == x {
rangeElem.setLastSeq(x - 1)
return nil
}

if numSequences == 3 {
// if we get here then x is in middle of the 3 sequence range. x being == startSeq or
// lastSeq is handled above
// add new single entry at elem + 1 then modify range
newElem := NewSingleSkippedSequenceEntryAt(rangeElem.getLastSeq(), rangeElem.getTimestamp())
s._insert(index+1, newElem)
// make rangeElem a single entry
rangeElem.setLastSeq(rangeElem.getStartSeq())
return nil
}

// if we get here we can assume that startSeq < x < lastSeq
// split index range
newElem := NewSkippedSequenceRangeEntryAt(x+1, rangeElem.getLastSeq(), rangeElem.getTimestamp())
s._insert(index+1, newElem)
rangeElem.setLastSeq(x - 1)

return nil
}

// insert will insert element in middle of slice maintaining order of rest of slice
func (s *SkippedSequenceSlice) insert(index int, entry SkippedSequenceListEntry) {
s.lock.Lock()
defer s.lock.Unlock()
// _insert will insert element in middle of slice maintaining order of rest of slice
func (s *SkippedSequenceSlice) _insert(index int, entry *SkippedSequenceListEntry) {
s.list = append(s.list, nil)
copy(s.list[index+1:], s.list[index:])
s.list[index] = entry
}

// PushSkippedSequenceEntry will append a new skipped sequence entry to the end of the slice, if adding a contiguous
// sequence function will expand the last entry of the slice to reflect this
func (s *SkippedSequenceSlice) PushSkippedSequenceEntry(entry *SingleSkippedSequence) {
func (s *SkippedSequenceSlice) PushSkippedSequenceEntry(entry *SkippedSequenceListEntry) {
s.lock.Lock()
defer s.lock.Unlock()

if len(s.list) == 0 {
s.list = append(s.list, entry)
return
}
// adding contiguous sequence handling here, pending CBG-3853
s.list = append(s.list, entry)
// get index of last entry + last seq of entry
index := len(s.list) - 1
lastEntryLastSeq := s.list[index].getLastSeq()
if (lastEntryLastSeq + 1) == entry.getStartSeq() {
// adding contiguous sequence
// set last seq in the range to the new arriving sequence + alter timestamp to incoming entries timestamp
s.list[index].extendRange(entry.getLastSeq(), entry.getTimestamp())
} else {
s.list = append(s.list, entry)
}

}

// getOldest returns the start sequence of the first element in the skipped sequence slice
func (s *SkippedSequenceSlice) getOldest() uint64 {
s.lock.RLock()
defer s.lock.RUnlock()
Expand Down
Loading

0 comments on commit e8a27bf

Please sign in to comment.