Skip to content

Commit

Permalink
Merge branch 'main' of github.com:nats-io/nats-server into GH-5471-le…
Browse files Browse the repository at this point in the history
…ak-1
  • Loading branch information
levb committed Jun 4, 2024
2 parents 7016c87 + f405dcd commit 6a09bae
Show file tree
Hide file tree
Showing 8 changed files with 362 additions and 63 deletions.
32 changes: 29 additions & 3 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2344,7 +2344,8 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
// If we do not think we should do a linear scan check how many fss we
// would need to scan vs the full range of the linear walk. Optimize for
// 25th quantile of a match in a linear walk. Filter should be a wildcard.
if !doLinearScan && wc {
// We should consult fss if our cache is not loaded and we only have fss loaded.
if !doLinearScan && wc && mb.cacheAlreadyLoaded() {
doLinearScan = len(mb.fss)*4 > int(lseq-fseq)
}

Expand Down Expand Up @@ -6341,6 +6342,11 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
if start <= fs.state.FirstSeq {
var ss SimpleState
fs.numFilteredPending(filter, &ss)
// Nothing available.
if ss.Msgs == 0 {
return nil, fs.state.LastSeq, ErrStoreEOF
}
// We can skip ahead.
if ss.First > start {
start = ss.First
}
Expand All @@ -6356,8 +6362,28 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
return sm, sm.seq, nil
} else if err != ErrStoreMsgNotFound {
return nil, 0, err
} else if expireOk {
mb.tryForceExpireCache()
} else {
// Nothing found in this block. We missed, if first block (bi) check psim.
// Similar to above if start <= first seq.
// TODO(dlc) - For v2 track these by filter subject since they will represent filtered consumers.
if i == bi {
var ss SimpleState
fs.numFilteredPending(filter, &ss)
// Nothing available.
if ss.Msgs == 0 {
return nil, fs.state.LastSeq, ErrStoreEOF
}
// See if we can jump ahead here.
// Right now we can only spin on first, so if we have interior sparseness need to favor checking per block fss if loaded.
// For v2 will track all blocks that have matches for psim.
if nbi, _ := fs.selectMsgBlockWithIndex(ss.First); nbi > i {
i = nbi - 1 // For the iterator condition i++
}
}
// Check is we can expire.
if expireOk {
mb.tryForceExpireCache()
}
}
}
}
Expand Down
180 changes: 171 additions & 9 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6953,9 +6953,7 @@ func Benchmark_FileStoreSelectMsgBlock(b *testing.B) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: b.TempDir(), BlockSize: 128},
StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
if err != nil {
b.Fatalf("Unexpected error: %v", err)
}
require_NoError(b, err)
defer fs.Stop()

subj, msg := "A", bytes.Repeat([]byte("ABC"), 33) // ~100bytes
Expand Down Expand Up @@ -6985,9 +6983,7 @@ func Benchmark_FileStoreLoadNextMsgSameFilterAsStream(b *testing.B) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: b.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
if err != nil {
b.Fatalf("Unexpected error: %v", err)
}
require_NoError(b, err)
defer fs.Stop()

// Small om purpose.
Expand All @@ -7013,9 +7009,7 @@ func Benchmark_FileStoreLoadNextMsgLiteralSubject(b *testing.B) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: b.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
if err != nil {
b.Fatalf("Unexpected error: %v", err)
}
require_NoError(b, err)
defer fs.Stop()

// Small om purpose.
Expand All @@ -7039,3 +7033,171 @@ func Benchmark_FileStoreLoadNextMsgLiteralSubject(b *testing.B) {
require_NoError(b, err)
}
}

func Benchmark_FileStoreLoadNextMsgNoMsgsFirstSeq(b *testing.B) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: b.TempDir(), BlockSize: 8192},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
require_NoError(b, err)
defer fs.Stop()

// Small om purpose.
msg := []byte("ok")

// Add in a bunch of msgs
for i := 0; i < 1_000_000; i++ {
fs.StoreMsg("foo.bar", nil, msg)
}

b.ResetTimer()

var smv StoreMsg
for i := 0; i < b.N; i++ {
// This should error with EOF
_, _, err := fs.LoadNextMsg("foo.baz", false, 1, &smv)
if err != ErrStoreEOF {
b.Fatalf("Wrong error, expected EOF got %v", err)
}
}
}

func Benchmark_FileStoreLoadNextMsgNoMsgsNotFirstSeq(b *testing.B) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: b.TempDir(), BlockSize: 8192},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
require_NoError(b, err)
defer fs.Stop()

// Small om purpose.
msg := []byte("ok")

// Add in a bunch of msgs
for i := 0; i < 1_000_000; i++ {
fs.StoreMsg("foo.bar", nil, msg)
}

b.ResetTimer()

var smv StoreMsg
for i := 0; i < b.N; i++ {
// This should error with EOF
// Make sure the sequence is not first seq of 1.
_, _, err := fs.LoadNextMsg("foo.baz", false, 10, &smv)
if err != ErrStoreEOF {
b.Fatalf("Wrong error, expected EOF got %v", err)
}
}
}

func Benchmark_FileStoreLoadNextMsgVerySparseMsgsFirstSeq(b *testing.B) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: b.TempDir(), BlockSize: 8192},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
require_NoError(b, err)
defer fs.Stop()

// Small om purpose.
msg := []byte("ok")

// Add in a bunch of msgs
for i := 0; i < 1_000_000; i++ {
fs.StoreMsg("foo.bar", nil, msg)
}
// Make last msg one that would match.
fs.StoreMsg("foo.baz", nil, msg)

b.ResetTimer()

var smv StoreMsg
for i := 0; i < b.N; i++ {
_, _, err := fs.LoadNextMsg("foo.baz", false, 1, &smv)
require_NoError(b, err)
}
}

func Benchmark_FileStoreLoadNextMsgVerySparseMsgsNotFirstSeq(b *testing.B) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: b.TempDir(), BlockSize: 8192},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
require_NoError(b, err)
defer fs.Stop()

// Small om purpose.
msg := []byte("ok")

// Add in a bunch of msgs
for i := 0; i < 1_000_000; i++ {
fs.StoreMsg("foo.bar", nil, msg)
}
// Make last msg one that would match.
fs.StoreMsg("foo.baz", nil, msg)

b.ResetTimer()

var smv StoreMsg
for i := 0; i < b.N; i++ {
// Make sure not first seq.
_, _, err := fs.LoadNextMsg("foo.baz", false, 10, &smv)
require_NoError(b, err)
}
}

func Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetween(b *testing.B) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: b.TempDir(), BlockSize: 8192},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
require_NoError(b, err)
defer fs.Stop()

// Small om purpose.
msg := []byte("ok")

// Make first msg one that would match as well.
fs.StoreMsg("foo.baz", nil, msg)
// Add in a bunch of msgs
for i := 0; i < 1_000_000; i++ {
fs.StoreMsg("foo.bar", nil, msg)
}
// Make last msg one that would match as well.
fs.StoreMsg("foo.baz", nil, msg)

b.ResetTimer()

var smv StoreMsg
for i := 0; i < b.N; i++ {
// Make sure not first seq.
_, _, err := fs.LoadNextMsg("foo.baz", false, 2, &smv)
require_NoError(b, err)
}
}

func Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetweenWithWildcard(b *testing.B) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: b.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage})
require_NoError(b, err)
defer fs.Stop()

// Small om purpose.
msg := []byte("ok")

// Make first msg one that would match as well.
fs.StoreMsg("foo.1.baz", nil, msg)
// Add in a bunch of msgs.
// We need to make sure we have a range of subjects that could kick in a linear scan.
for i := 0; i < 1_000_000; i++ {
subj := fmt.Sprintf("foo.%d.bar", rand.Intn(100_000)+2)
fs.StoreMsg(subj, nil, msg)
}
// Make last msg one that would match as well.
fs.StoreMsg("foo.1.baz", nil, msg)

b.ResetTimer()

var smv StoreMsg
for i := 0; i < b.N; i++ {
// Make sure not first seq.
_, _, err := fs.LoadNextMsg("foo.*.baz", true, 2, &smv)
require_NoError(b, err)
}
}
3 changes: 2 additions & 1 deletion server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2496,7 +2496,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
}

case <-cistc:
mset.checkInterestState()
// We may be adjusting some things with consumers so do this in its own go routine.
go mset.checkInterestState()

case <-datc:
if mset == nil || isRecovering {
Expand Down
85 changes: 64 additions & 21 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,14 +465,28 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
}
if toScan < toExclude {
ss.Msgs, ss.First = 0, 0
for seq := first; seq <= last; seq++ {
if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && isMatch(sm.subj) {
ss.Msgs++
if ss.First == 0 {
ss.First = seq

update := func(sm *StoreMsg) {
ss.Msgs++
if ss.First == 0 {
ss.First = sm.seq
}
if seen != nil {
seen[sm.subj] = true
}
}
// Check if easier to just scan msgs vs the sequence range.
// This can happen with lots of interior deletes.
if last-first > uint64(len(ms.msgs)) {
for _, sm := range ms.msgs {
if sm.seq >= first && sm.seq <= last && !seen[sm.subj] && isMatch(sm.subj) {
update(sm)
}
if seen != nil {
seen[sm.subj] = true
}
} else {
for seq := first; seq <= last; seq++ {
if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && isMatch(sm.subj) {
update(sm)
}
}
}
Expand All @@ -482,17 +496,29 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
var adjust uint64
var tss *SimpleState

for seq := ms.state.FirstSeq; seq < first; seq++ {
if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && isMatch(sm.subj) {
if lastPerSubject {
tss, _ = ms.fss.Find(stringToBytes(sm.subj))
}
// If we are last per subject, make sure to only adjust if all messages are before our first.
if tss == nil || tss.Last < first {
adjust++
update := func(sm *StoreMsg) {
if lastPerSubject {
tss, _ = ms.fss.Find(stringToBytes(sm.subj))
}
// If we are last per subject, make sure to only adjust if all messages are before our first.
if tss == nil || tss.Last < first {
adjust++
}
if seen != nil {
seen[sm.subj] = true
}
}
// Check if easier to just scan msgs vs the sequence range.
if first-ms.state.FirstSeq > uint64(len(ms.msgs)) {
for _, sm := range ms.msgs {
if sm.seq < first && !seen[sm.subj] && isMatch(sm.subj) {
update(sm)
}
if seen != nil {
seen[sm.subj] = true
}
} else {
for seq := ms.state.FirstSeq; seq < first; seq++ {
if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && isMatch(sm.subj) {
update(sm)
}
}
}
Expand All @@ -507,10 +533,27 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
}
ss.Msgs -= adjust
if needScanFirst {
for seq := first; seq < last; seq++ {
if sm, ok := ms.msgs[seq]; ok && isMatch(sm.subj) {
ss.First = seq
break
// Check if easier to just scan msgs vs the sequence range.
// Since we will need to scan all of the msgs vs below where we break on the first match,
// we will only do so if a few orders of magnitude lower.
if last-first > 100*uint64(len(ms.msgs)) {
low := ms.state.LastSeq
for _, sm := range ms.msgs {
if sm.seq >= first && sm.seq < last && isMatch(sm.subj) {
if sm.seq < low {
low = sm.seq
}
}
}
if low < ms.state.LastSeq {
ss.First = low
}
} else {
for seq := first; seq < last; seq++ {
if sm, ok := ms.msgs[seq]; ok && isMatch(sm.subj) {
ss.First = seq
break
}
}
}
}
Expand Down
Loading

0 comments on commit 6a09bae

Please sign in to comment.