-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CBG-3853: Implementation of range sequence entry for skipped sequence slice #6764
Changes from 5 commits
f647afe
6b8d43d
ca92d1d
5bd6a02
1ec43ad
6b1db5f
6df3cf8
45dbf87
632a7b3
463583b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,10 +27,12 @@ const ( | |
type SkippedSequenceListEntry interface { | ||
getTimestamp() int64 | ||
getLastSeq() uint64 | ||
setStartSeq(seq uint64) | ||
setLastSeq(seq uint64) | ||
getStartSeq() uint64 | ||
isRange() bool | ||
getNumSequencesInEntry() int | ||
getNumSequencesInEntry() int64 | ||
setRangeForIncomingContiguousEntry(lastSeq uint64, timeStamp int64) | ||
} | ||
|
||
// SkippedSequenceSlice stores the set of skipped sequences as an ordered slice of single skipped sequences | ||
|
@@ -42,13 +44,21 @@ type SkippedSequenceSlice struct { | |
} | ||
|
||
var _ SkippedSequenceListEntry = &SingleSkippedSequence{} | ||
var _ SkippedSequenceListEntry = &SkippedSequenceRange{} | ||
|
||
// SingleSkippedSequence contains a single skipped sequence value + unix timestamp of the time it's created | ||
type SingleSkippedSequence struct { | ||
seq uint64 | ||
timestamp int64 | ||
} | ||
|
||
// SkippedSequenceRange contains a skipped sequence range + unix timestamp of the time it's created | ||
type SkippedSequenceRange struct { | ||
start uint64 | ||
end uint64 | ||
timestamp int64 | ||
} | ||
|
||
func NewSkippedSequenceSlice(clipHeadroom int) *SkippedSequenceSlice { | ||
return &SkippedSequenceSlice{ | ||
list: []SkippedSequenceListEntry{}, | ||
|
@@ -58,10 +68,20 @@ func NewSkippedSequenceSlice(clipHeadroom int) *SkippedSequenceSlice { | |
|
||
// NewSingleSkippedSequenceEntry returns a SingleSkippedSequence with the specified sequence and the current | ||
// time in unix time | ||
func NewSingleSkippedSequenceEntry(sequence uint64) *SingleSkippedSequence { | ||
func NewSingleSkippedSequenceEntry(sequence uint64, timeStamp int64) *SingleSkippedSequence { | ||
gregns1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return &SingleSkippedSequence{ | ||
seq: sequence, | ||
timestamp: time.Now().Unix(), | ||
timestamp: timeStamp, | ||
} | ||
} | ||
|
||
// NewSkippedSequenceRangeEntry returns a SkippedSequenceRange with the specified sequence range and the current | ||
// time in unix time | ||
func NewSkippedSequenceRangeEntry(start, end uint64, timeStamp int64) *SkippedSequenceRange { | ||
return &SkippedSequenceRange{ | ||
start: start, | ||
end: end, | ||
timestamp: timeStamp, | ||
} | ||
} | ||
|
||
|
@@ -75,11 +95,16 @@ func (s *SingleSkippedSequence) getLastSeq() uint64 { | |
return s.seq | ||
} | ||
|
||
// setLastSeq sets the last sequence in the range on the entry, for single items its a no-op | ||
// setLastSeq sets the last sequence in the range on the entry, for single items it's a no-op | ||
func (s *SingleSkippedSequence) setLastSeq(seq uint64) { | ||
// no-op | ||
} | ||
|
||
// setStartSeq sets the last sequence in the range on the entry, for single items it's a no-op | ||
func (s *SingleSkippedSequence) setStartSeq(seq uint64) { | ||
// no-op | ||
} | ||
|
||
// getStartSeq gets the start sequence in the range on the entry, for single items the sequence will be returned | ||
func (s *SingleSkippedSequence) getStartSeq() uint64 { | ||
return s.seq | ||
|
@@ -90,12 +115,62 @@ func (s *SingleSkippedSequence) isRange() bool { | |
return false | ||
} | ||
|
||
// getNumSequencesInEntry returns the number of sequences a entry in skipped slice holds, | ||
// getNumSequencesInEntry returns the number of sequences an entry in skipped slice holds, | ||
// for single entries it will return just 1 | ||
func (s *SingleSkippedSequence) getNumSequencesInEntry() int { | ||
func (s *SingleSkippedSequence) getNumSequencesInEntry() int64 { | ||
return 1 | ||
} | ||
|
||
// setRangeForIncomingContiguousEntry will set the range last seq to the incoming contiguous entry's last seq | ||
// + set the timestamp. | ||
func (s *SingleSkippedSequence) setRangeForIncomingContiguousEntry(lastSeq uint64, timeStamp int64) { | ||
// no-op | ||
} | ||
|
||
// getTimestamp returns the timestamp of the entry | ||
func (s *SkippedSequenceRange) getTimestamp() int64 { | ||
return s.timestamp | ||
} | ||
|
||
// getStartSeq gets the start sequence in the range on the entry, for single items the sequence will be returned | ||
func (s *SkippedSequenceRange) getStartSeq() uint64 { | ||
return s.start | ||
} | ||
|
||
// getLastSeq gets the last sequence in the range on the entry, for single items the sequence will be returned | ||
func (s *SkippedSequenceRange) getLastSeq() uint64 { | ||
return s.end | ||
} | ||
|
||
// isRange returns true if the entry is a sequence range entry, false if not | ||
func (s *SkippedSequenceRange) isRange() bool { | ||
return true | ||
} | ||
|
||
// setLastSeq sets the last sequence in the range on the entry, for single items it's a no-op | ||
func (s *SkippedSequenceRange) setLastSeq(seq uint64) { | ||
s.end = seq | ||
} | ||
|
||
// setStartSeq sets the last sequence in the range on the entry, for single items it's a no-op | ||
func (s *SkippedSequenceRange) setStartSeq(seq uint64) { | ||
s.start = seq | ||
} | ||
|
||
// getNumSequencesInEntry returns the number of sequences an entry in skipped slice holds, | ||
// for single entries it will return just 1 | ||
func (s *SkippedSequenceRange) getNumSequencesInEntry() int64 { | ||
numSequences := (s.end - s.start) + 1 | ||
return int64(numSequences) | ||
} | ||
|
||
// setRangeForIncomingContiguousEntry will set the range last seq to the incoming contiguous entry's last seq | ||
// + set the timestamp. | ||
func (s *SkippedSequenceRange) setRangeForIncomingContiguousEntry(lastSeq uint64, timeStamp int64) { | ||
gregns1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
s.timestamp = timeStamp | ||
s.setLastSeq(lastSeq) | ||
} | ||
|
||
// Contains returns true if a given sequence exists in the skipped sequence slice | ||
func (s *SkippedSequenceSlice) Contains(x uint64) bool { | ||
s.lock.RLock() | ||
|
@@ -106,11 +181,11 @@ func (s *SkippedSequenceSlice) Contains(x uint64) bool { | |
|
||
// SkippedSequenceCompact will compact the entries with timestamp old enough. It will also clip | ||
// the capacity of the slice to length + 100 if the current capacity is 2.5x the length | ||
func (s *SkippedSequenceSlice) SkippedSequenceCompact(ctx context.Context, maxWait int64) (numSequencesCompacted int) { | ||
func (s *SkippedSequenceSlice) SkippedSequenceCompact(ctx context.Context, maxWait int64) (numSequencesCompacted int64) { | ||
s.lock.Lock() | ||
defer s.lock.Unlock() | ||
|
||
timeNow := time.Now().Unix() | ||
timeNow := getCurrentUnixTime() | ||
indexToDelete := -1 | ||
for _, v := range s.list { | ||
timeStamp := v.getTimestamp() | ||
|
@@ -153,19 +228,23 @@ func (s *SkippedSequenceSlice) findSequence(x uint64) (int, bool) { | |
return i, found | ||
} | ||
|
||
// binarySearchFunc contains the custom search function for searching the skipped sequence slice for a particular sequence | ||
func binarySearchFunc(a SkippedSequenceListEntry, seq uint64) int { | ||
singleSeq, ok := a.(*SingleSkippedSequence) | ||
if ok { | ||
if singleSeq.seq > seq { | ||
if !a.isRange() { | ||
if a.getStartSeq() > seq { | ||
return 1 | ||
} else if a.getStartSeq() == seq { | ||
return 0 | ||
} | ||
if singleSeq.seq == seq { | ||
return -1 | ||
} else { | ||
if a.getStartSeq() > seq { | ||
gregns1 marked this conversation as resolved.
Show resolved
Hide resolved
gregns1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return 1 | ||
} else if a.getLastSeq() >= seq { | ||
return 0 | ||
} | ||
return -1 | ||
} | ||
// should never get here as it stands, will have extra handling here pending CBG-3853 | ||
return 1 | ||
} | ||
|
||
// removeSeq will remove a given sequence from the slice if it exists | ||
|
@@ -181,34 +260,105 @@ func (s *SkippedSequenceSlice) removeSeq(x uint64) error { | |
// if not a range, we can just remove the single entry from the slice | ||
s.list = slices.Delete(s.list, index, index+1) | ||
return nil | ||
} else { | ||
// range entry handling | ||
rangeElem := s.list[index] | ||
if rangeElem.getNumSequencesInEntry() == 1 { | ||
gregns1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// simple delete case | ||
s.list = slices.Delete(s.list, index, index+1) | ||
return nil | ||
} | ||
|
||
// if x == startSeq set startSeq+1, if x == lastSeq set lastSeq-1 | ||
if rangeElem.getStartSeq() == x { | ||
rangeElem.setStartSeq(x + 1) | ||
return nil | ||
} | ||
if rangeElem.getLastSeq() == x { | ||
rangeElem.setLastSeq(x - 1) | ||
return nil | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If Does that impact anything the next time it's run through code that does There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the original approach was if the above code lines makes a range of 1, we were going to delete this and insert a new single entry in its place. Adam suggested we could avoid extra allocations by just altering the range to be one sequence. It seems to work well! |
||
|
||
if rangeElem.getNumSequencesInEntry() == 3 { | ||
// if we get here then x is in middle of the 3 sequence range. x being == startSeq or | ||
// lastSeq is handled above | ||
// add new single entry at elem + 1 then modify range | ||
newElem := NewSingleSkippedSequenceEntry(rangeElem.getLastSeq(), rangeElem.getTimestamp()) | ||
s.insert(index+1, newElem) | ||
rangeElem.setLastSeq(x - 1) | ||
return nil | ||
} | ||
|
||
// handling for if x is startSeq+1 or lastSeq-1 | ||
if rangeElem.getStartSeq() == x-1 { | ||
// insert single skipped entry + alter start seq to x + 1 | ||
newElem := NewSingleSkippedSequenceEntry(x-1, rangeElem.getTimestamp()) | ||
s.insert(index, newElem) | ||
rangeElem.setStartSeq(x + 1) | ||
return nil | ||
} | ||
|
||
if rangeElem.getLastSeq() == x+1 { | ||
// insert single skipped entry at index+1 + alter last seq to x - 1 | ||
newElem := NewSingleSkippedSequenceEntry(x+1, rangeElem.getTimestamp()) | ||
s.insert(index+1, newElem) | ||
rangeElem.setLastSeq(x - 1) | ||
return nil | ||
} | ||
|
||
// if we get here we can assume that startSeq < x < lastSeq | ||
// split index range | ||
newElem := NewSkippedSequenceRangeEntry(x+1, rangeElem.getLastSeq(), rangeElem.getTimestamp()) | ||
s.insert(index+1, newElem) | ||
rangeElem.setLastSeq(x - 1) | ||
} | ||
// more range handling here CBG-3853, temporarily error as we shouldn't get to this point at this time | ||
return fmt.Errorf("entered range handling code") | ||
return nil | ||
} | ||
|
||
// insert will insert element in middle of slice maintaining order of rest of slice | ||
func (s *SkippedSequenceSlice) insert(index int, entry SkippedSequenceListEntry) { | ||
s.lock.Lock() | ||
defer s.lock.Unlock() | ||
gregns1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
s.list = append(s.list, nil) | ||
copy(s.list[index+1:], s.list[index:]) | ||
s.list[index] = entry | ||
} | ||
|
||
// PushSkippedSequenceEntry will append a new skipped sequence entry to the end of the slice, if adding a contiguous | ||
// sequence function will expand the last entry of the slice to reflect this | ||
func (s *SkippedSequenceSlice) PushSkippedSequenceEntry(entry *SingleSkippedSequence) { | ||
func (s *SkippedSequenceSlice) PushSkippedSequenceEntry(entry SkippedSequenceListEntry) { | ||
s.lock.Lock() | ||
defer s.lock.Unlock() | ||
|
||
if len(s.list) == 0 { | ||
s.list = append(s.list, entry) | ||
return | ||
} | ||
// adding contiguous sequence handling here, pending CBG-3853 | ||
s.list = append(s.list, entry) | ||
// get index of last entry + last seq of entry | ||
index := len(s.list) - 1 | ||
lastEntryLastSeq := s.list[index].getLastSeq() | ||
gregns1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (lastEntryLastSeq + 1) == entry.getStartSeq() { | ||
// adding contiguous sequence | ||
if s.list[index].isRange() { | ||
// set last seq in the range to the new arriving sequence + alter timestamp to incoming entries timestamp | ||
s.list[index].setRangeForIncomingContiguousEntry(entry.getLastSeq(), entry.getTimestamp()) | ||
} else { | ||
// take the sequence from the single entry and create a new range entry in its place | ||
lastEntryStartSeq := s.list[index].getStartSeq() | ||
if entry.isRange() { | ||
// avoid extra allocation by altering its start seq and simply replacing last entry with the incoming entry | ||
entry.setStartSeq(lastEntryStartSeq) | ||
s.list[index] = entry | ||
} else { | ||
endSeq := entry.getLastSeq() | ||
s.list[index] = NewSkippedSequenceRangeEntry(lastEntryStartSeq, endSeq, getCurrentUnixTime()) | ||
} | ||
} | ||
} else { | ||
s.list = append(s.list, entry) | ||
} | ||
|
||
} | ||
|
||
// getOldest returns the start sequence of the first element in the skipped sequence slice | ||
func (s *SkippedSequenceSlice) getOldest() uint64 { | ||
s.lock.RLock() | ||
defer s.lock.RUnlock() | ||
|
@@ -218,3 +368,8 @@ func (s *SkippedSequenceSlice) getOldest() uint64 { | |
// grab fist element in slice and take the start seq of that range/single sequence | ||
return s.list[0].getStartSeq() | ||
} | ||
|
||
// getCurrentUnixTime will return the current time in unix time format | ||
gregns1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
func getCurrentUnixTime() int64 { | ||
return time.Now().Unix() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious what the overhead of the interface will be vs. a single concrete type that can handle single sequences and ranges. ( https://syslog.ravelin.com/go-interfaces-but-at-what-cost-961e0f58a07b )
Most of the difference in behaviour is already in
isRange
checks, and it'd save all of the no-op method implementations.Having said that, this would be closer to a micro-optimisation than the planned refactor of skipped sequences, so I'm OK waiting for all of that work to be done first, since the refactor to change the interface to a single type wouldn't be too intrusive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with this idea, and am fine doing it in a later optimization. However, it does make me wonder whether anything that's a no-op belongs on the interface. (setStartSeq/setLastSeq/extendRange). Looking at the code, this goes back to the fact that we're using isRange instead of a type switch. Was this intentional for performance reasons?