diff --git a/db/skipped_sequence.go b/db/skipped_sequence.go index 7cd797163c..272284bd01 100644 --- a/db/skipped_sequence.go +++ b/db/skipped_sequence.go @@ -24,89 +24,123 @@ const ( DefaultClipCapacityHeadroom = 1000 ) -type SkippedSequenceListEntry interface { - getTimestamp() int64 - getLastSeq() uint64 - setLastSeq(seq uint64) - getStartSeq() uint64 - isRange() bool - getNumSequencesInEntry() int -} - // SkippedSequenceSlice stores the set of skipped sequences as an ordered slice of single skipped sequences // or skipped sequence ranges type SkippedSequenceSlice struct { - list []SkippedSequenceListEntry + list []*SkippedSequenceListEntry ClipCapacityHeadroom int lock sync.RWMutex } -var _ SkippedSequenceListEntry = &SingleSkippedSequence{} - -// SingleSkippedSequence contains a single skipped sequence value + unix timestamp of the time it's created -type SingleSkippedSequence struct { - seq uint64 - timestamp int64 +// SkippedSequenceListEntry contains start + end sequence for a range of skipped sequences + +// a timestamp at which the entry was created in unix format. If an entry is a singular skipped sequence the start and +// end sequence will be equal +type SkippedSequenceListEntry struct { + start uint64 // start sequence of a range + end uint64 // end sequence of the range (0 if a singular skipped sequence) + timestamp int64 // timestamp this entry was created in unix format } func NewSkippedSequenceSlice(clipHeadroom int) *SkippedSequenceSlice { return &SkippedSequenceSlice{ - list: []SkippedSequenceListEntry{}, + list: []*SkippedSequenceListEntry{}, ClipCapacityHeadroom: clipHeadroom, } } -// NewSingleSkippedSequenceEntry returns a SingleSkippedSequence with the specified sequence and the current -// time in unix time -func NewSingleSkippedSequenceEntry(sequence uint64) *SingleSkippedSequence { - return &SingleSkippedSequence{ - seq: sequence, +// NewSkippedSequenceRangeEntry returns a SkippedSequenceListEntry with the specified sequence range and the current +// timestamp in unix time +func NewSkippedSequenceRangeEntry(start, end uint64) *SkippedSequenceListEntry { + return &SkippedSequenceListEntry{ + start: start, + end: end, + timestamp: time.Now().Unix(), + } +} + +// NewSkippedSequenceRangeEntryAt returns a SkippedSequenceListEntry with the specified sequences and the supplied +// timestamp in unix time +func NewSkippedSequenceRangeEntryAt(start, end uint64, timeStamp int64) *SkippedSequenceListEntry { + return &SkippedSequenceListEntry{ + start: start, + end: end, + timestamp: timeStamp, + } +} + +// NewSingleSkippedSequenceEntry returns a SkippedSequenceListEntry with start and end seq defined as equal +// with the current timestamp in unix time +func NewSingleSkippedSequenceEntry(seq uint64) *SkippedSequenceListEntry { + return &SkippedSequenceListEntry{ + start: seq, + end: seq, timestamp: time.Now().Unix(), } } +// NewSingleSkippedSequenceEntryAt returns a SkippedSequenceListEntry with start and end seq defined as equal +// and the supplied timestamp in unix time +func NewSingleSkippedSequenceEntryAt(seq uint64, timeStamp int64) *SkippedSequenceListEntry { + return &SkippedSequenceListEntry{ + start: seq, + end: seq, + timestamp: timeStamp, + } +} + // getTimestamp returns the timestamp of the entry -func (s *SingleSkippedSequence) getTimestamp() int64 { +func (s *SkippedSequenceListEntry) getTimestamp() int64 { return s.timestamp } -// getLastSeq gets the last sequence in the range on the entry, for single items the sequence will be returned -func (s *SingleSkippedSequence) getLastSeq() uint64 { - return s.seq +// getStartSeq gets the start sequence in the range on the entry, for single items the sequence (startSeq) will be returned +func (s *SkippedSequenceListEntry) getStartSeq() uint64 { + return s.start } -// setLastSeq sets the last sequence in the range on the entry, for single items its a no-op -func (s *SingleSkippedSequence) setLastSeq(seq uint64) { - // no-op +// getLastSeq gets the last sequence in the range on the entry, for single items the sequence 0 will be returned +func (s *SkippedSequenceListEntry) getLastSeq() uint64 { + return s.end } -// getStartSeq gets the start sequence in the range on the entry, for single items the sequence will be returned -func (s *SingleSkippedSequence) getStartSeq() uint64 { - return s.seq +// setLastSeq sets the last sequence in the range on the entry +func (s *SkippedSequenceListEntry) setLastSeq(seq uint64) { + s.end = seq } -// isRange returns true if the entry is a sequence range entry, false if not -func (s *SingleSkippedSequence) isRange() bool { - return false +// setStartSeq sets the last sequence in the range on the entry +func (s *SkippedSequenceListEntry) setStartSeq(seq uint64) { + s.start = seq } -// getNumSequencesInEntry returns the number of sequences a entry in skipped slice holds, +// getNumSequencesInEntry returns the number of sequences an entry in skipped slice holds, // for single entries it will return just 1 -func (s *SingleSkippedSequence) getNumSequencesInEntry() int { - return 1 +func (s *SkippedSequenceListEntry) getNumSequencesInEntry() int64 { + if s.end == s.start { + return 1 + } + numSequences := (s.end - s.start) + 1 + return int64(numSequences) +} + +// extendRange will set the range last seq to the incoming contiguous entry's last seq +// + set the timestamp. +func (s *SkippedSequenceListEntry) extendRange(lastSeq uint64, timeStamp int64) { + s.timestamp = timeStamp + s.setLastSeq(lastSeq) } // Contains returns true if a given sequence exists in the skipped sequence slice func (s *SkippedSequenceSlice) Contains(x uint64) bool { s.lock.RLock() defer s.lock.RUnlock() - _, found := s.findSequence(x) + _, found := s._findSequence(x) return found } // SkippedSequenceCompact will compact the entries with timestamp old enough. It will also clip // the capacity of the slice to length + 100 if the current capacity is 2.5x the length -func (s *SkippedSequenceSlice) SkippedSequenceCompact(ctx context.Context, maxWait int64) (numSequencesCompacted int) { +func (s *SkippedSequenceSlice) SkippedSequenceCompact(ctx context.Context, maxWait int64) (numSequencesCompacted int64) { s.lock.Lock() defer s.lock.Unlock() @@ -128,13 +162,13 @@ func (s *SkippedSequenceSlice) SkippedSequenceCompact(ctx context.Context, maxWa s.list = slices.Delete(s.list, 0, indexToDelete+1) } // resize slice to reclaim memory if we need to - s.clip(ctx) + s._clip(ctx) return numSequencesCompacted } -// clip will clip the capacity of the slice to the current length plus the configured headroom in ClipCapacityHeadroom +// _clip will clip the capacity of the slice to the current length plus the configured headroom in ClipCapacityHeadroom // if the current capacity of the slice in 2.5x the length -func (s *SkippedSequenceSlice) clip(ctx context.Context) { +func (s *SkippedSequenceSlice) _clip(ctx context.Context) { // threshold is 2.5x the current length of the slice threshold := 2.5 * float64(len(s.list)) @@ -147,49 +181,76 @@ func (s *SkippedSequenceSlice) clip(ctx context.Context) { } } -// findSequence will use binary search to search the elements in the slice for a given sequence -func (s *SkippedSequenceSlice) findSequence(x uint64) (int, bool) { +// _findSequence will use binary search to search the elements in the slice for a given sequence +func (s *SkippedSequenceSlice) _findSequence(x uint64) (int, bool) { i, found := slices.BinarySearchFunc(s.list, x, binarySearchFunc) return i, found } -func binarySearchFunc(a SkippedSequenceListEntry, seq uint64) int { - singleSeq, ok := a.(*SingleSkippedSequence) - if ok { - if singleSeq.seq > seq { - return 1 - } - if singleSeq.seq == seq { - return 0 - } - return -1 +// binarySearchFunc contains the custom search function for searching the skipped sequence slice for a particular sequence +func binarySearchFunc(a *SkippedSequenceListEntry, seq uint64) int { + if a.getStartSeq() > seq { + return 1 + } + + if a.getLastSeq() >= seq { + return 0 } - // should never get here as it stands, will have extra handling here pending CBG-3853 - return 1 + return -1 } // removeSeq will remove a given sequence from the slice if it exists func (s *SkippedSequenceSlice) removeSeq(x uint64) error { s.lock.Lock() defer s.lock.Unlock() - index, found := s.findSequence(x) + index, found := s._findSequence(x) if !found { return fmt.Errorf("sequence %d not found in the skipped list", x) } - // handle border cases where x is equal to start or end sequence on range (or just sequence for single entries) - if !s.list[index].isRange() { + + // take the element at the index and handle cases required to removal of a sequence + rangeElem := s.list[index] + numSequences := rangeElem.getNumSequencesInEntry() + + if numSequences == 1 { // if not a range, we can just remove the single entry from the slice s.list = slices.Delete(s.list, index, index+1) return nil } - // more range handling here CBG-3853, temporarily error as we shouldn't get to this point at this time - return fmt.Errorf("entered range handling code") + + // range entry handling + // if x == startSeq set startSeq+1, if x == lastSeq set lastSeq-1 + if rangeElem.getStartSeq() == x { + rangeElem.setStartSeq(x + 1) + return nil + } + if rangeElem.getLastSeq() == x { + rangeElem.setLastSeq(x - 1) + return nil + } + + if numSequences == 3 { + // if we get here then x is in middle of the 3 sequence range. x being == startSeq or + // lastSeq is handled above + // add new single entry at elem + 1 then modify range + newElem := NewSingleSkippedSequenceEntryAt(rangeElem.getLastSeq(), rangeElem.getTimestamp()) + s._insert(index+1, newElem) + // make rangeElem a single entry + rangeElem.setLastSeq(rangeElem.getStartSeq()) + return nil + } + + // if we get here we can assume that startSeq < x < lastSeq + // split index range + newElem := NewSkippedSequenceRangeEntryAt(x+1, rangeElem.getLastSeq(), rangeElem.getTimestamp()) + s._insert(index+1, newElem) + rangeElem.setLastSeq(x - 1) + + return nil } -// insert will insert element in middle of slice maintaining order of rest of slice -func (s *SkippedSequenceSlice) insert(index int, entry SkippedSequenceListEntry) { - s.lock.Lock() - defer s.lock.Unlock() +// _insert will insert element in middle of slice maintaining order of rest of slice +func (s *SkippedSequenceSlice) _insert(index int, entry *SkippedSequenceListEntry) { s.list = append(s.list, nil) copy(s.list[index+1:], s.list[index:]) s.list[index] = entry @@ -197,7 +258,7 @@ func (s *SkippedSequenceSlice) insert(index int, entry SkippedSequenceListEntry) // PushSkippedSequenceEntry will append a new skipped sequence entry to the end of the slice, if adding a contiguous // sequence function will expand the last entry of the slice to reflect this -func (s *SkippedSequenceSlice) PushSkippedSequenceEntry(entry *SingleSkippedSequence) { +func (s *SkippedSequenceSlice) PushSkippedSequenceEntry(entry *SkippedSequenceListEntry) { s.lock.Lock() defer s.lock.Unlock() @@ -205,10 +266,20 @@ func (s *SkippedSequenceSlice) PushSkippedSequenceEntry(entry *SingleSkippedSequ s.list = append(s.list, entry) return } - // adding contiguous sequence handling here, pending CBG-3853 - s.list = append(s.list, entry) + // get index of last entry + last seq of entry + index := len(s.list) - 1 + lastEntryLastSeq := s.list[index].getLastSeq() + if (lastEntryLastSeq + 1) == entry.getStartSeq() { + // adding contiguous sequence + // set last seq in the range to the new arriving sequence + alter timestamp to incoming entries timestamp + s.list[index].extendRange(entry.getLastSeq(), entry.getTimestamp()) + } else { + s.list = append(s.list, entry) + } + } +// getOldest returns the start sequence of the first element in the skipped sequence slice func (s *SkippedSequenceSlice) getOldest() uint64 { s.lock.RLock() defer s.lock.RUnlock() diff --git a/db/skipped_sequence_test.go b/db/skipped_sequence_test.go index 83ec31ea72..6d76f6dce3 100644 --- a/db/skipped_sequence_test.go +++ b/db/skipped_sequence_test.go @@ -23,7 +23,7 @@ import ( // - Populate 10 single skipped sequence items in the slice // - Assert that each one is added in the correct order // - Assert that timestamp is increasing from the last entry (or equal to) -// - Pending CBG-3853 add contiguous sequence to slice and assert that it replaces the last element with a range entry +// - Add contiguous sequence to slice and assert that it extends the last element with a range func TestPushSingleSkippedSequence(t *testing.T) { skippedSlice := NewSkippedSequenceSlice(DefaultClipCapacityHeadroom) @@ -33,27 +33,91 @@ func TestPushSingleSkippedSequence(t *testing.T) { var prevTime int64 = 0 for j := 0; j < 10; j++ { - assert.Equal(t, uint64(j*2), skippedSlice.list[j].getLastSeq()) + assert.Equal(t, uint64(j*2), skippedSlice.list[j].getStartSeq()) + assert.GreaterOrEqual(t, skippedSlice.list[j].getTimestamp(), prevTime) + prevTime = skippedSlice.list[j].getTimestamp() + } + // add a new single entry that is contiguous with end of the slice which should replace last + // single entry with a range + skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(19)) + // grab last entry in list + index := len(skippedSlice.list) - 1 + entry := skippedSlice.list[index] + + // assert last entry is range entry and start + end sequence on range is as expected + assert.False(t, entry.singleEntry()) + assert.Equal(t, uint64(18), entry.getStartSeq()) + assert.Equal(t, uint64(19), entry.getLastSeq()) +} + +// TestPushSkippedSequenceRange: +// - Create slice of range sequence entries and assert contents of slice are as expected +// - Attempt to add a new range that is contiguous with entry at end of slice +// - Assert that the last entry of the slice is expended to include the new range +func TestPushSkippedSequenceRange(t *testing.T) { + skippedSlice := NewSkippedSequenceSlice(DefaultClipCapacityHeadroom) + + for i := 0; i < 10; i++ { + start := i * 10 + skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(uint64(start), uint64(start+5))) + } + + var prevTime int64 = 0 + for j := 0; j < 10; j++ { + start := j * 10 + end := start + 5 + assert.Equal(t, uint64(start), skippedSlice.list[j].getStartSeq()) + assert.Equal(t, uint64(end), skippedSlice.list[j].getLastSeq()) assert.GreaterOrEqual(t, skippedSlice.list[j].getTimestamp(), prevTime) prevTime = skippedSlice.list[j].getTimestamp() } - // Pending CBG-3853, add a new single entry that is contiguous with end of the slice which should repalce last - // entry with a range + // add a new range entry that is contiguous with end of the slice which should alter range last element in list + skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(96, 110)) + // grab last entry in list + index := len(skippedSlice.list) - 1 + entry := skippedSlice.list[index] + + // assert last entry is range entry and start + end sequence on range is as expected + assert.False(t, entry.singleEntry()) + assert.Equal(t, uint64(90), entry.getStartSeq()) + assert.Equal(t, uint64(110), entry.getLastSeq()) + + // add new single entry that is not contiguous with last element on slice + skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(500)) + + // add new range that is contiguous with the single entry on the last element of the slice + garbage timestamp + // for later assertion + newTimeStamp := time.Now().Unix() + 10000 + skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntryAt(501, 510, newTimeStamp)) + + index = len(skippedSlice.list) - 1 + entry = skippedSlice.list[index] + // assert that last element in list is a range and holds sequences we expect + timestamp + // is what the new pushed range above holds + assert.False(t, entry.singleEntry()) + assert.Equal(t, uint64(500), entry.getStartSeq()) + assert.Equal(t, uint64(510), entry.getLastSeq()) + assert.Equal(t, newTimeStamp, entry.getTimestamp()) } -func BenchmarkPushSingleSkippedSequence(b *testing.B) { +func BenchmarkPushSkippedSequenceEntry(b *testing.B) { benchmarks := []struct { name string rangeEntries bool }{ {name: "single_entries", rangeEntries: false}, + {name: "range_entries", rangeEntries: true}, } for _, bm := range benchmarks { skippedSlice := NewSkippedSequenceSlice(DefaultClipCapacityHeadroom) b.Run(bm.name, func(b *testing.B) { for i := 0; i < b.N; i++ { - skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(uint64(i * 2))) + if !bm.rangeEntries { + skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(uint64(i * 2))) + } else { + skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(uint64(i*10), uint64(i*10)+5)) + } } }) } @@ -62,46 +126,74 @@ func BenchmarkPushSingleSkippedSequence(b *testing.B) { // TestIsSequenceSkipped: // - Create a skipped slice // - Test each sequence added returns true for Contains +// - For range entries, assert that each boundary of the range returns true, in addition to a sequence that +// is in the middle of the range // - Assert that Contains returns false for a sequence that doesn't exist in the slice -// -// Will add more test cases to this as development continues (for sequence ranges + mixed single and ranges) pending CBG-3853 +// - Then add this sequence and search again for it, asserting Contains returns true now func TestIsSequenceSkipped(t *testing.T) { testCases := []struct { - name string - inputList []uint64 + name string + rangeItems bool + inputList []uint64 }{ { name: "list_full_single_items", inputList: []uint64{2, 6, 100, 200, 500}, }, + { + name: "list_full_range_items", + inputList: []uint64{5, 15, 25, 35, 45}, + rangeItems: true, + }, } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { skippedSlice := NewSkippedSequenceSlice(DefaultClipCapacityHeadroom) - for _, input := range testCase.inputList { - skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(input)) - } - for _, v := range testCase.inputList { - assert.True(t, skippedSlice.Contains(v)) + if !testCase.rangeItems { + for _, input := range testCase.inputList { + skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(input)) + } + for _, v := range testCase.inputList { + assert.True(t, skippedSlice.Contains(v)) + } + } else { + for _, input := range testCase.inputList { + skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(input, input+5)) + } + for _, v := range testCase.inputList { + assert.True(t, skippedSlice.Contains(v)) + assert.True(t, skippedSlice.Contains(v+5)) + assert.True(t, skippedSlice.Contains(v+2)) + } } - // try a non existent sequence - assert.False(t, skippedSlice.Contains(150)) + // try a currently non-existent sequence + assert.False(t, skippedSlice.Contains(550)) + + // push this sequence and assert Contains returns true after + skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(550)) + assert.True(t, skippedSlice.Contains(550)) + // push another range much higher, assert Contains works as expected + skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(60000, 70000)) + assert.True(t, skippedSlice.Contains(60000)) + assert.True(t, skippedSlice.Contains(70000)) + assert.True(t, skippedSlice.Contains(65000)) }) } } -func BenchmarkIsSkippedFunction(b *testing.B) { +func BenchmarkContainsFunction(b *testing.B) { benchmarks := []struct { - name string - rangeEntries bool - inputSlice *SkippedSequenceSlice + name string + inputSlice *SkippedSequenceSlice }{ - {name: "single_entries_large_slice", rangeEntries: false, inputSlice: setupBenchmark(true, DefaultClipCapacityHeadroom)}, - {name: "single_entries_small_slice", rangeEntries: false, inputSlice: setupBenchmark(false, DefaultClipCapacityHeadroom)}, + {name: "single_entries_large_slice", inputSlice: setupBenchmark(true, false, DefaultClipCapacityHeadroom)}, + {name: "single_entries_small_slice", inputSlice: setupBenchmark(false, false, DefaultClipCapacityHeadroom)}, + {name: "range_entries_large_slice", inputSlice: setupBenchmark(true, true, DefaultClipCapacityHeadroom)}, + {name: "range_entries_small_slice", inputSlice: setupBenchmark(false, true, DefaultClipCapacityHeadroom)}, } for _, bm := range benchmarks { b.Run(bm.name, func(b *testing.B) { @@ -115,53 +207,220 @@ func BenchmarkIsSkippedFunction(b *testing.B) { // TestRemoveSeqFromSkipped: // - Create skipped sequence slice // - Remove a sequence from that slice and assert the resulting slice is as expected +// - Assert on timestamps being preserved throughout the slice // - Attempt to remove a non-existent sequence and assert it returns an error -// -// Will add more test cases to this as development continues (for sequence ranges + mixed single and ranges) pending CBG-3853 +// - Test features edge cases where we remove start or end seq on a range. Then another edge case of removing a +// sequence that is startSeq+1 or lastSeq-1 thus altering existing range and inserting a new single sequence in place func TestRemoveSeqFromSkipped(t *testing.T) { testCases := []struct { - name string - inputList []uint64 - expectedList []uint64 - remove uint64 - errorRemove uint64 + name string + inputList [][]uint64 + expected [][]uint64 + remove uint64 + errorRemove uint64 + rangeItems bool }{ { - name: "list_full_single_items", - inputList: []uint64{2, 6, 100, 200, 500}, - expectedList: []uint64{2, 6, 200, 500}, - remove: 100, - errorRemove: 150, + name: "list_full_single_items", + inputList: [][]uint64{{2}, {6}, {100}, {200}, {500}}, + expected: [][]uint64{{2, 2}, {6, 6}, {200, 200}, {500, 500}}, + remove: 100, + errorRemove: 150, + }, + { + name: "list_full_range_items", + inputList: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}}, + expected: [][]uint64{{5, 10}, {15, 20}, {25, 26}, {28, 30}, {35, 40}, {45, 50}}, + remove: 27, + errorRemove: 500, + rangeItems: true, + }, + { + name: "list_full_range_items_remove_startSeq", + inputList: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}}, + expected: [][]uint64{{5, 10}, {15, 20}, {26, 30}, {35, 40}, {45, 50}}, + remove: 25, + errorRemove: 500, + rangeItems: true, + }, + { + name: "list_full_range_items_remove_endSeq", + inputList: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}}, + expected: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 39}, {45, 50}}, + remove: 40, + errorRemove: 500, + rangeItems: true, + }, + { + name: "list_full_range_items_remove_startSeq+1", + inputList: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}}, + expected: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 35}, {37, 40}, {45, 50}}, + remove: 36, + errorRemove: 500, + rangeItems: true, + }, + { + name: "list_full_range_items_remove_endSeq-1", + inputList: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}}, + expected: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 38}, {40, 40}, {45, 50}}, + remove: 39, + errorRemove: 500, + rangeItems: true, + }, + { + name: "list_with_length_1_range_removal", + inputList: [][]uint64{{5, 10}, {15, 20}, {22}, {25, 30}}, + expected: [][]uint64{{5, 10}, {15, 20}, {25, 30}}, + remove: 22, + errorRemove: 500, + rangeItems: true, + }, + { + name: "list_with_length_2_range_removal_startSeq", + inputList: [][]uint64{{5, 10}, {15, 20}, {22, 23}, {26, 27}, {35, 40}}, + expected: [][]uint64{{5, 10}, {15, 20}, {23, 23}, {26, 27}, {35, 40}}, + remove: 22, + errorRemove: 500, + rangeItems: true, + }, + { + name: "list_with_length_2_range_removal_lastSeq", + inputList: [][]uint64{{5, 10}, {15, 20}, {22, 23}, {26, 27}, {35, 40}}, + expected: [][]uint64{{5, 10}, {15, 20}, {22, 23}, {26, 26}, {35, 40}}, + remove: 27, + errorRemove: 500, + rangeItems: true, }, } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { skippedSlice := NewSkippedSequenceSlice(DefaultClipCapacityHeadroom) - for _, input := range testCase.inputList { - skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(input)) + if !testCase.rangeItems { + for _, input := range testCase.inputList { + skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(input[0])) + } + } else { + for _, input := range testCase.inputList { + if len(input) == 1 { + skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(input[0])) + } else { + skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(input[0], input[1])) + } + } } err := skippedSlice.removeSeq(testCase.remove) require.NoError(t, err) for i := 0; i < len(skippedSlice.list); i++ { - assert.Equal(t, testCase.expectedList[i], skippedSlice.list[i].getLastSeq()) + // if we have expected entry of just {{x, x}}, then we expect this entry to be single skipped entry + if testCase.expected[i][0] == testCase.expected[i][1] { + assert.True(t, skippedSlice.list[i].singleEntry()) + } else { + assert.False(t, skippedSlice.list[i].singleEntry()) + } + assert.Equal(t, testCase.expected[i][0], skippedSlice.list[i].getStartSeq()) + assert.Equal(t, testCase.expected[i][1], skippedSlice.list[i].getLastSeq()) + } + + // assert on timestamps being preserved, all timestamps must be increasing or equal as we iterate through + // the slice proving after insertion of new elements in middle of slice timestamps are preserved + var prevTime int64 + for _, v := range skippedSlice.list { + assert.GreaterOrEqual(t, v.getTimestamp(), prevTime) + prevTime = v.getTimestamp() } + // attempt remove on non existent sequence err = skippedSlice.removeSeq(testCase.errorRemove) require.Error(t, err) }) } } +// TestRemoveSeqFromThreeSequenceRange: +// - Create slice of ranges with a range of just three in there too +// - Grab timestamp of the three sequence range entry +// - Attempt to remove middle sequence from the three sequence range +// - Assert that there are two single items inserted in the middle of the slice to preserve the order +// - Assert that timestamp is preserved +// - Add two more three sequence ranges and remove the start/last seq from those respectively +// - Assert the resulting slice is as expected +func TestRemoveSeqFromThreeSequenceRange(t *testing.T) { + + skippedSlice := NewSkippedSequenceSlice(DefaultClipCapacityHeadroom) + inputList := [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}} + expected := [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}, {60, 60}, {62, 62}, {70, 75}} + + for _, v := range inputList { + skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(v[0], v[1])) + } + skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(60, 62)) + skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(70, 75)) + + i, found := skippedSlice._findSequence(60) + assert.True(t, found) + + // grab timestamp from range that is getting split + timestampAtSequence := skippedSlice.list[i].getTimestamp() + + // remove seq in middle of above range + err := skippedSlice.removeSeq(61) + require.NoError(t, err) + + skippedLen := len(skippedSlice.list) + for i := 0; i < skippedLen; i++ { + if expected[i][0] == expected[i][1] { + assert.True(t, skippedSlice.list[i].singleEntry()) + } else { + assert.False(t, skippedSlice.list[i].singleEntry()) + } + assert.Equal(t, expected[i][0], skippedSlice.list[i].getStartSeq()) + assert.Equal(t, expected[i][1], skippedSlice.list[i].getLastSeq()) + } + + // assert that items second and third from last are single items now + assert.True(t, skippedSlice.list[skippedLen-3].singleEntry()) + assert.True(t, skippedSlice.list[skippedLen-2].singleEntry()) + // assert that items second and third from last timestamps are preserved + assert.Equal(t, timestampAtSequence, skippedSlice.list[skippedLen-3].getTimestamp()) + assert.Equal(t, timestampAtSequence, skippedSlice.list[skippedLen-2].getTimestamp()) + + // push two new three seq ranges and remove the start seq from one of those ranges, + // then last seq from the other range + expected = [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}, {60, 60}, {62, 62}, {70, 75}, {81, 82}, {85, 86}} + skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(80, 82)) + + skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(85, 87)) + + // remove start seq from range 80-32 + err = skippedSlice.removeSeq(80) + require.NoError(t, err) + // remove last seq from range 85-87 + err = skippedSlice.removeSeq(87) + require.NoError(t, err) + + skippedLen = len(skippedSlice.list) + for j := 0; j < skippedLen; j++ { + if expected[j][0] == expected[j][1] { + assert.True(t, skippedSlice.list[j].singleEntry()) + } else { + assert.False(t, skippedSlice.list[j].singleEntry()) + } + assert.Equal(t, expected[j][0], skippedSlice.list[j].getStartSeq()) + assert.Equal(t, expected[j][1], skippedSlice.list[j].getLastSeq()) + } +} + func BenchmarkRemoveSeqFromSkippedList(b *testing.B) { benchmarks := []struct { - name string - rangeEntries bool - inputSlice *SkippedSequenceSlice + name string + inputSlice *SkippedSequenceSlice }{ - {name: "single_entries_large_slice", rangeEntries: false, inputSlice: setupBenchmark(true, DefaultClipCapacityHeadroom)}, - {name: "single_entries_small_slice", rangeEntries: false, inputSlice: setupBenchmark(false, DefaultClipCapacityHeadroom)}, + {name: "single_entries_large_slice", inputSlice: setupBenchmark(true, false, DefaultClipCapacityHeadroom)}, + {name: "single_entries_small_slice", inputSlice: setupBenchmark(false, false, DefaultClipCapacityHeadroom)}, + {name: "range_entries_large_slice", inputSlice: setupBenchmark(true, true, DefaultClipCapacityHeadroom)}, + {name: "range_entries_small_slice", inputSlice: setupBenchmark(false, true, DefaultClipCapacityHeadroom)}, } for _, bm := range benchmarks { b.Run(bm.name, func(b *testing.B) { @@ -174,40 +433,62 @@ func BenchmarkRemoveSeqFromSkippedList(b *testing.B) { // TestInsertItemInSlice: // - Create skipped sequence slice -// - Insert a new value in the slice at index 2 to maintain order +// - Insert a new value in the slice at index specified to maintain order // - Assert the resulting slice is correct -// -// Will add more test cases to this as development continues (for sequence ranges + mixed single and ranges) pending CBG-3853 +// - Assert on resulting slice func TestInsertItemInSlice(t *testing.T) { testCases := []struct { - name string - inputList []uint64 - expectedList []uint64 - insert uint64 - index int + name string + inputList [][]uint64 + expected [][]uint64 + insert uint64 + index int + rangeItems bool }{ { - name: "single_items", - inputList: []uint64{2, 6, 100, 200, 500}, - expectedList: []uint64{2, 6, 70, 100, 200, 500}, - insert: 70, - index: 2, + name: "single_items", + inputList: [][]uint64{{2}, {6}, {100}, {200}, {500}}, + expected: [][]uint64{{2, 2}, {6, 6}, {70, 70}, {100, 100}, {200, 200}, {500, 500}}, + insert: 70, + index: 2, + }, + { + name: "range_items", + inputList: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}}, + expected: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {32, 32}, {35, 40}, {45, 50}}, + insert: 32, + index: 3, + rangeItems: true, }, } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { skippedSlice := NewSkippedSequenceSlice(DefaultClipCapacityHeadroom) - for _, input := range testCase.inputList { - skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(input)) + if !testCase.rangeItems { + for _, input := range testCase.inputList { + skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(input[0])) + } + } else { + for _, input := range testCase.inputList { + skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(input[0], input[1])) + } } // attempt to insert at test case index to keep order - skippedSlice.insert(testCase.index, NewSingleSkippedSequenceEntry(testCase.insert)) + skippedSlice._insert(testCase.index, NewSingleSkippedSequenceEntry(testCase.insert)) for i := 0; i < len(skippedSlice.list); i++ { - assert.Equal(t, testCase.expectedList[i], skippedSlice.list[i].getLastSeq()) + if testCase.expected[i][0] == testCase.expected[i][1] { + assert.True(t, skippedSlice.list[i].singleEntry()) + } else { + assert.False(t, skippedSlice.list[i].singleEntry()) + } + // if we expect range at this index, assert on it + assert.Equal(t, testCase.expected[i][0], skippedSlice.list[i].getStartSeq()) + assert.Equal(t, testCase.expected[i][1], skippedSlice.list[i].getLastSeq()) } + }) } } @@ -218,14 +499,20 @@ func BenchmarkInsertSkippedItem(b *testing.B) { rangeEntries bool inputSlice *SkippedSequenceSlice }{ - {name: "single_entries_large_slice", rangeEntries: false, inputSlice: setupBenchmark(true, DefaultClipCapacityHeadroom)}, - {name: "single_entries_small_slice", rangeEntries: false, inputSlice: setupBenchmark(false, DefaultClipCapacityHeadroom)}, + {name: "single_entries_large_slice", rangeEntries: false, inputSlice: setupBenchmark(true, false, DefaultClipCapacityHeadroom)}, + {name: "single_entries_small_slice", rangeEntries: false, inputSlice: setupBenchmark(false, false, DefaultClipCapacityHeadroom)}, + {name: "range_entries_large_slice", rangeEntries: true, inputSlice: setupBenchmark(true, true, DefaultClipCapacityHeadroom)}, + {name: "range_entries_small_slice", rangeEntries: true, inputSlice: setupBenchmark(false, true, DefaultClipCapacityHeadroom)}, } for _, bm := range benchmarks { b.Run(bm.name, func(b *testing.B) { sequenceNum := 40000 for i := 0; i < b.N; i++ { - bm.inputSlice.insert(i, NewSingleSkippedSequenceEntry(uint64(sequenceNum*i))) + if !bm.rangeEntries { + bm.inputSlice._insert(i, NewSingleSkippedSequenceEntry(uint64(sequenceNum*i))) + } else { + bm.inputSlice._insert(i, NewSkippedSequenceRangeEntry(uint64(sequenceNum*i), uint64(sequenceNum*i)+5)) + } } }) } @@ -235,38 +522,63 @@ func BenchmarkInsertSkippedItem(b *testing.B) { // - Create skipped sequence slice with old timestamp // - Push new entry with future timestamp // - Run compact and assert that each item is compacted apart from the last added item -// -// Will add more test cases to this as development continues (for sequence ranges + mixed single and ranges) pending CBG-3853 +// - Assert on number sequences removed func TestCompactSkippedList(t *testing.T) { testCases := []struct { - name string - inputList []uint64 - expectedList []uint64 - numRemoved int + name string + inputList [][]uint64 + expected [][]uint64 + numRemoved int64 + rangeItems bool }{ { - name: "single_items", - inputList: []uint64{2, 6, 100, 200, 500}, - expectedList: []uint64{600}, - numRemoved: 5, + name: "single_items", + inputList: [][]uint64{{2}, {6}, {100}, {200}, {500}}, + expected: [][]uint64{{600, 600}}, + numRemoved: 5, + }, + { + name: "range_items", + inputList: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}, {55, 60}}, + expected: [][]uint64{{600, 605}}, + numRemoved: 36, + rangeItems: true, }, } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { skippedSlice := NewSkippedSequenceSlice(DefaultClipCapacityHeadroom) + inputTime := time.Now().Unix() - 1000 - for _, input := range testCase.inputList { - skippedSlice.PushSkippedSequenceEntry(testSingleSkippedEntryOldTimestamp(input)) + if !testCase.rangeItems { + for _, input := range testCase.inputList { + // add single entries with old timestamps for compaction + skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntryAt(input[0], inputTime)) + } + } else { + for _, input := range testCase.inputList { + // add range entries with old timestamps for compaction + skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntryAt(input[0], input[1], inputTime)) + } } + // alter timestamp so we don't compact this entry - entry := NewSingleSkippedSequenceEntry(600) - entry.timestamp = entry.timestamp + 10000 + var entry *SkippedSequenceListEntry + futureTime := time.Now().Unix() + 10000 + if !testCase.rangeItems { + entry = NewSingleSkippedSequenceEntryAt(600, futureTime) + } else { + entry = NewSkippedSequenceRangeEntryAt(600, 605, futureTime) + } skippedSlice.PushSkippedSequenceEntry(entry) numRemoved := skippedSlice.SkippedSequenceCompact(base.TestCtx(t), 1) require.Len(t, skippedSlice.list, 1) - assert.Equal(t, uint64(600), skippedSlice.list[0].getLastSeq()) + assert.Equal(t, testCase.expected[0][0], skippedSlice.list[0].getStartSeq()) + assert.Equal(t, testCase.expected[0][1], skippedSlice.list[0].getLastSeq()) + + // assert on num sequences removed assert.Equal(t, testCase.numRemoved, numRemoved) }) } @@ -275,30 +587,47 @@ func TestCompactSkippedList(t *testing.T) { // TestCompactSkippedListClipHandling: // - Create new skipped sequence slice with old timestamps + clip headroom defined at 100 // - Push new future entry on the slice -// - Run compact and assert that the capacity of the slice is correct after clip is run -// -// Will add more test cases to this as development continues (for sequence ranges + mixed single and ranges) pending CBG-3853 +// - Run compact and assert that the capacity of the slice is correct after _clip is run func TestCompactSkippedListClipHandling(t *testing.T) { testCases := []struct { name string expectedCap int + rangeItems bool }{ { name: "single_items", expectedCap: 100, }, + { + name: "range_items", + expectedCap: 100, + }, } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { // define clip headroom at 100 for test skippedSlice := NewSkippedSequenceSlice(100) + inputTime := time.Now().Unix() - 1000 - for i := 0; i < 100; i++ { - skippedSlice.PushSkippedSequenceEntry(testSingleSkippedEntryOldTimestamp(uint64(i * 2))) + if !testCase.rangeItems { + for i := 0; i < 100; i++ { + // add single entries with old timestamps for compaction + skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntryAt(uint64(i*2), inputTime)) + } + } else { + for i := 0; i < 100; i++ { + // add range entries with old timestamps for compaction + skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntryAt(uint64(i*2), uint64(i*2)+5, inputTime)) + } } // alter timestamp so we don't compact this entry - entry := NewSingleSkippedSequenceEntry(600) - entry.timestamp = entry.timestamp + 10000 + var entry *SkippedSequenceListEntry + futureTime := time.Now().Unix() + 10000 + if !testCase.rangeItems { + entry = NewSingleSkippedSequenceEntryAt(600, futureTime) + } else { + entry = NewSkippedSequenceRangeEntryAt(600, 605, futureTime) + } skippedSlice.PushSkippedSequenceEntry(entry) skippedSlice.SkippedSequenceCompact(base.TestCtx(t), 1) @@ -314,8 +643,10 @@ func BenchmarkCompactSkippedList(b *testing.B) { rangeEntries bool inputSlice *SkippedSequenceSlice }{ - {name: "single_entries_large_slice", rangeEntries: false, inputSlice: setupBenchmarkToCompact(true, 100)}, - {name: "single_entries_small_slice", rangeEntries: false, inputSlice: setupBenchmarkToCompact(false, 100)}, + {name: "single_entries_large_slice", rangeEntries: false, inputSlice: setupBenchmarkToCompact(true, false, 100)}, + {name: "single_entries_small_slice", rangeEntries: false, inputSlice: setupBenchmarkToCompact(false, false, 100)}, + {name: "range_entries_large_slice", rangeEntries: true, inputSlice: setupBenchmarkToCompact(true, true, 100)}, + {name: "range_entries_small_slice", rangeEntries: true, inputSlice: setupBenchmarkToCompact(false, true, 100)}, } for _, bm := range benchmarks { b.Run(bm.name, func(b *testing.B) { @@ -326,26 +657,20 @@ func BenchmarkCompactSkippedList(b *testing.B) { } } -// testSingleSkippedEntryOldTimestamp is creating a new single skipped sequence entry with an old timestamp -func testSingleSkippedEntryOldTimestamp(seq uint64) *SingleSkippedSequence { - now := time.Now().Unix() - assignedTimestamp := now - 1000 - return &SingleSkippedSequence{ - seq: seq, - timestamp: assignedTimestamp, - } -} - +// TestGetOldestSkippedSequence: +// - Create slice of no items, single items and range items +// - Assert that getOldest() correctly returns the oldest sequence in the slice, 0 if the slice is empty func TestGetOldestSkippedSequence(t *testing.T) { testCases := []struct { - name string - inputList []uint64 - expected uint64 - empty bool + name string + inputList [][]uint64 + expected uint64 + empty bool + rangeItems bool }{ { name: "single_items", - inputList: []uint64{2, 6, 100, 200, 500}, + inputList: [][]uint64{{2}, {6}, {100}, {200}, {500}}, expected: 2, }, { @@ -353,13 +678,24 @@ func TestGetOldestSkippedSequence(t *testing.T) { empty: true, expected: 0, }, + { + name: "range_items", + inputList: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}}, + expected: 5, + }, } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { skippedSlice := NewSkippedSequenceSlice(DefaultClipCapacityHeadroom) if !testCase.empty { - for _, v := range testCase.inputList { - skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(v)) + if !testCase.rangeItems { + for _, v := range testCase.inputList { + skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(v[0])) + } + } else { + for _, v := range testCase.inputList { + skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(v[0], v[1])) + } } } assert.Equal(t, testCase.expected, skippedSlice.getOldest()) @@ -368,33 +704,60 @@ func TestGetOldestSkippedSequence(t *testing.T) { } // setupBenchmark sets up a skipped sequence slice for benchmark tests -func setupBenchmark(largeSlice bool, clipHeadroom int) *SkippedSequenceSlice { +func setupBenchmark(largeSlice bool, rangeEntries bool, clipHeadroom int) *SkippedSequenceSlice { skippedSlice := NewSkippedSequenceSlice(clipHeadroom) if largeSlice { for i := 0; i < 10000; i++ { - skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(uint64(i * 2))) + if rangeEntries { + skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(uint64(i*2), uint64(i*2)+5)) + } else { + skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(uint64(i * 2))) + } } } else { for i := 0; i < 100; i++ { - skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(uint64(i * 2))) + if rangeEntries { + skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(uint64(i*2), uint64(i*2)+5)) + } else { + skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(uint64(i * 2))) + } } } return skippedSlice } // setupBenchmarkToCompact sets up a skipped sequence slice for compaction based benchmark tests -func setupBenchmarkToCompact(largeSlice bool, clipHeadroom int) *SkippedSequenceSlice { +func setupBenchmarkToCompact(largeSlice bool, rangeEntries bool, clipHeadroom int) *SkippedSequenceSlice { skippedSlice := NewSkippedSequenceSlice(clipHeadroom) + inputTime := time.Now().Unix() - 1000 if largeSlice { for i := 0; i < 10000; i++ { - skippedSlice.PushSkippedSequenceEntry(testSingleSkippedEntryOldTimestamp(uint64(i * 2))) + if rangeEntries { + // add range entries with old timestamps for compaction + skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntryAt(uint64(i*2), uint64(i*2)+5, inputTime)) + } else { + // add single entries with old timestamps for compaction + skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntryAt(uint64(i*2), inputTime)) + } } } else { for i := 0; i < 100; i++ { - skippedSlice.PushSkippedSequenceEntry(testSingleSkippedEntryOldTimestamp(uint64(i * 2))) + if rangeEntries { + // add range entries with old timestamps for compaction + skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntryAt(uint64(i*2), uint64(i*2)+5, inputTime)) + } else { + // add single entries with old timestamps for compaction + skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntryAt(uint64(i*2), inputTime)) + } } } // have one entry to not be compacted skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(60000)) return skippedSlice } + +// singleEntry returns true if the entry is a single sequence entry, false if not. Used for testing purposes +func (s *SkippedSequenceListEntry) singleEntry() bool { + // if no star and end seq equal then it's a single entry + return s.start == s.end +}