Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Avoid loading last blks on LoadNextMsg() miss. #5584

Merged
merged 1 commit into from
Jun 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2596,28 +2596,38 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState {

// This is used to see if we can selectively jump start blocks based on filter subject and a floor block index.
// Will return -1 if no matches at all.
func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool) int {
start := uint32(math.MaxUint32)
func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool) (int, int) {
start, stop := uint32(math.MaxUint32), uint32(0)
if wc {
fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) {
if psi.fblk < start {
start = psi.fblk
}
if psi.lblk > stop {
stop = psi.lblk
}
})
} else if psi, ok := fs.psim.Find(stringToBytes(filter)); ok {
start = psi.fblk
start, stop = psi.fblk, psi.lblk
}
// Nothing found.
if start == uint32(math.MaxUint32) {
return -1
return -1, -1
}
// Here we need to translate this to index into fs.blks.
// Here we need to translate this to index into fs.blks properly.
mb := fs.bim[start]
if mb == nil {
return -1
return -1, -1
}
bi, _ := fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq))
return bi
fi, _ := fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq))
derekcollison marked this conversation as resolved.
Show resolved Hide resolved

mb = fs.bim[stop]
if mb == nil {
return -1, -1
}
li, _ := fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq))

return fi, li
}

// Optimized way for getting all num pending matching a filter subject.
Expand Down Expand Up @@ -6475,9 +6485,9 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
// Similar to above if start <= first seq.
// TODO(dlc) - For v2 track these by filter subject since they will represent filtered consumers.
if i == bi {
nbi := fs.checkSkipFirstBlock(filter, wc)
nbi, lbi := fs.checkSkipFirstBlock(filter, wc)
// Nothing available.
if nbi < 0 {
if nbi < 0 || lbi <= bi {
return nil, fs.state.LastSeq, ErrStoreEOF
}
// See if we can jump ahead here.
Expand Down
81 changes: 81 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7263,6 +7263,58 @@ func TestFileStoreFilteredPendingPSIMFirstBlockUpdateNextBlock(t *testing.T) {
require_Equal(t, psi.lblk, 4)
}

func TestFileStoreLargeSparseMsgsDoNotLoadAfterLast(t *testing.T) {
sd := t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir: sd, BlockSize: 128},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

msg := []byte("hello")
// Create 2 blocks with each, each block holds 2 msgs
for i := 0; i < 2; i++ {
fs.StoreMsg("foo.22.bar", nil, msg)
fs.StoreMsg("foo.22.baz", nil, msg)
}
// Now create 8 more blocks with just baz. So no matches for these 8 blocks
// for "foo.22.bar".
for i := 0; i < 8; i++ {
fs.StoreMsg("foo.22.baz", nil, msg)
fs.StoreMsg("foo.22.baz", nil, msg)
}
require_Equal(t, fs.numMsgBlocks(), 10)

// Remove all blk cache and fss.
fs.mu.RLock()
for _, mb := range fs.blks {
mb.mu.Lock()
mb.fss, mb.cache = nil, nil
mb.mu.Unlock()
}
fs.mu.RUnlock()

// "foo.22.bar" is at sequence 1 and 3.
// Make sure if we do a LoadNextMsg() starting at 4 that we do not load
// all the tail blocks.
_, _, err = fs.LoadNextMsg("foo.*.bar", true, 4, nil)
require_Error(t, err, ErrStoreEOF)

// Now make sure we did not load fss and cache.
var loaded int
fs.mu.RLock()
for _, mb := range fs.blks {
mb.mu.RLock()
if mb.cache != nil || mb.fss != nil {
loaded++
}
mb.mu.RUnlock()
}
fs.mu.RUnlock()
// We will load first block for starting seq 4, but no others should have loaded.
require_Equal(t, loaded, 1)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -7520,3 +7572,32 @@ func Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetweenWithWildcard(b *testin
require_NoError(b, err)
}
}

func Benchmark_FileStoreLoadNextMsgVerySparseMsgsLargeTail(b *testing.B) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: b.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage})
require_NoError(b, err)
defer fs.Stop()

// Small om purpose.
msg := []byte("ok")

// Make first msg one that would match as well.
fs.StoreMsg("foo.1.baz", nil, msg)
// Add in a bunch of msgs.
// We need to make sure we have a range of subjects that could kick in a linear scan.
for i := 0; i < 1_000_000; i++ {
subj := fmt.Sprintf("foo.%d.bar", rand.Intn(100_000)+2)
fs.StoreMsg(subj, nil, msg)
}

b.ResetTimer()

var smv StoreMsg
for i := 0; i < b.N; i++ {
// Make sure not first seq.
_, _, err := fs.LoadNextMsg("foo.*.baz", true, 2, &smv)
require_Error(b, err, ErrStoreEOF)
}
}