From fc3b208b5b255df290d9c118cb31152675da4a0b Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Sat, 7 Sep 2019 13:32:24 -0600 Subject: [PATCH] [FIXED] FileStore: handle possible sequence gap in index files Differentiate error reading from the index file as opposed to not find a given sequence. Gaps should not exist but if that happens it would prevent messages from being expired or next messages to be delivered. Resolves #928 Signed-off-by: Ivan Kozlovic --- .travis.yml | 2 + server/server_run_test.go | 25 ++++-- server/server_test.go | 4 + stores/filestore.go | 64 ++++++++++++++- stores/filestore_msg_test.go | 149 ++++++++++++++++++++++++++++++++++- 5 files changed, 232 insertions(+), 12 deletions(-) diff --git a/.travis.yml b/.travis.yml index dd1f4947..433ad358 100644 --- a/.travis.yml +++ b/.travis.yml @@ -28,9 +28,11 @@ before_script: - $(exit $(misspell -locale US README.md | wc -l)) - staticcheck $EXCLUDE_VENDOR_AND_PROTO_DIR script: +- set -e - mysql -u root -e "CREATE USER 'nss'@'localhost' IDENTIFIED BY 'password'; GRANT ALL PRIVILEGES ON *.* TO 'nss'@'localhost'; CREATE DATABASE test_nats_streaming;" - go test -i $EXCLUDE_VENDOR - if [[ "$TRAVIS_GO_VERSION" =~ 1.12 ]]; then ./scripts/cov.sh TRAVIS; else go test -failfast $EXCLUDE_VENDOR; fi +- set +e deploy: provider: script diff --git a/server/server_run_test.go b/server/server_run_test.go index c30fc19e..19f76659 100644 --- a/server/server_run_test.go +++ b/server/server_run_test.go @@ -486,20 +486,33 @@ func TestPersistentStoreNoPanicOnShutdown(t *testing.T) { defer s.Shutdown() // Start a go routine that keeps sending messages - sendQuit := make(chan bool) + sendQuit := make(chan bool, 1) wg := &sync.WaitGroup{} wg.Add(1) + nc, err := nats.Connect(nats.DefaultURL, nats.NoReconnect()) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + sc, err := stan.Connect(clusterName, clientName, + stan.NatsConn(nc), + stan.PubAckWait(250*time.Millisecond)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer sc.Close() go func() { defer wg.Done() - - sc, nc := createConnectionWithNatsOpts(t, clientName, nats.NoReconnect()) - defer sc.Close() - defer nc.Close() - payload := []byte("hello") for { select { case <-sendQuit: + // We know that the server is shutdown, so close + // the NATS connection first so that STAN does not + // try to send the close protocol (which will timeout + // with a 2sec by default). + nc.Close() + sc.Close() return default: sc.PublishAsync("foo", payload, nil) diff --git a/server/server_test.go b/server/server_test.go index 0327e30a..28159d84 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -432,6 +432,10 @@ func getTestDefaultOptsForPersistentStore() *Options { case stores.TypeFile: opts.FilestoreDir = defaultDataStore opts.FileStoreOpts.BufferSize = 1024 + // Go 1.12 on macOS is very slow at doing sync writes... + if runtime.GOOS == "darwin" && strings.HasPrefix(runtime.Version(), "go1.12") { + opts.FileStoreOpts.DoSync = false + } case stores.TypeSQL: opts.SQLStoreOpts.Driver = testSQLDriver opts.SQLStoreOpts.Source = testSQLSource diff --git a/stores/filestore.go b/stores/filestore.go index 0e19fcbc..dafd0172 100644 --- a/stores/filestore.go +++ b/stores/filestore.go @@ -672,6 +672,7 @@ var ( bkgTasksSleepDuration = defaultBkgTasksSleepDuration cacheTTL = int64(defaultCacheTTL) sliceCloseInterval = defaultSliceCloseInterval + fillGaps = true ) // FileStoreTestSetBackgroundTaskInterval is used by tests to reduce the interval @@ -2622,7 +2623,7 @@ func (ms *FileMsgStore) Store(m *pb.MsgProto) (uint64, error) { ms.needSync = true // Is there a gap in message sequence? - if ms.last > 0 && m.Sequence > ms.last+1 { + if fillGaps && ms.last > 0 && m.Sequence > ms.last+1 { if err := ms.fillGaps(fslice, m); err != nil { ms.unlockFiles(fslice) return 0, err @@ -2915,6 +2916,11 @@ func (ms *FileMsgStore) expireMsgs(now, maxAge int64) int64 { // Try again in 5 secs. ms.expiration = now + int64(5*time.Second) return ms.expiration + } else if m == nil { + ms.log.Warnf("Skip expiration of missing sequence %v for channel %q", + ms.first, ms.channelName) + ms.skipMissingMsg(slice) + continue } } } @@ -2998,14 +3004,53 @@ func (ms *FileMsgStore) readMsgIndex(slice *fileSlice, seq uint64) (*msgIndex, e // Read the index record and ensure we have what we expect seqInIndexFile, msgIndex, err := ms.readIndex(slice.idxFile.handle) if err != nil { + if err == io.EOF { + return ms.backtrackIndex(slice, seq, seqInIndexFile, idxFileOffset) + } return nil, err } if seqInIndexFile != seq { - return nil, fmt.Errorf("wrong sequence, wanted %v got %v", seq, seqInIndexFile) + return ms.backtrackIndex(slice, seq, seqInIndexFile, idxFileOffset) } return msgIndex, nil } +// Looks for the index record for the given `seq` going backward +// from the give `offset`. This is invoked when the recovered index +// at the given offset does not the requested sequence, as the result +// of gaps in the index file. +func (ms *FileMsgStore) backtrackIndex(slice *fileSlice, seq, wrongSeq uint64, offset int64) (*msgIndex, error) { + for offset -= msgIndexRecSize; offset >= 4; offset -= msgIndexRecSize { + if _, err := slice.idxFile.handle.Seek(offset, io.SeekStart); err != nil { + break + } + seqInIndexFile, msgIndex, err := ms.readIndex(slice.idxFile.handle) + if err == io.EOF { + continue + } + if err == nil && seqInIndexFile == seq { + return msgIndex, nil + } + if err != nil || seqInIndexFile < seq { + break + } + } + // Not found... + return nil, nil +} + +// When a message is not found due to unexpected gap (from older +// store since now we ensure there is no gap), bump the first +// message sequence and update slice. +func (ms *FileMsgStore) skipMissingMsg(slice *fileSlice) { + ms.first++ + ms.firstMsg = nil + if ms.first > ms.last { + ms.lastMsg = nil + } + slice.firstSeq = ms.first +} + // removeFirstMsg "removes" the first message of the first slice. // If the slice is "empty" the file slice is removed. func (ms *FileMsgStore) removeFirstMsg(mindex *msgIndex, lockFile bool) error { @@ -3024,6 +3069,15 @@ func (ms *FileMsgStore) removeFirstMsg(mindex *msgIndex, lockFile bool) error { if err != nil { return err } + // Here we are getting the first sequence from the first + // available slice, so ms.first and slice.firstSeq should + // be the same, but due to possible gaps, they may not. + // Adjust in case. + ms.first = slice.firstSeq + } + if mindex == nil { + ms.skipMissingMsg(slice) + return nil } // Size of the first message in this slice firstMsgSize := mindex.msgSize @@ -3274,8 +3328,10 @@ func (ms *FileMsgStore) lookup(seq uint64) (*pb.MsgProto, error) { } if ms.readBufSize > 0 && seq != fslice.lastSeq { msg, err = ms.readAheadMsgs(fslice, seq) - ms.unlockFiles(fslice) - return msg, err + if err == nil { + ms.unlockFiles(fslice) + return msg, err + } } msgIndex, err := ms.readMsgIndex(fslice, seq) if msgIndex != nil { diff --git a/stores/filestore_msg_test.go b/stores/filestore_msg_test.go index 6fdc2d84..7d65dc99 100644 --- a/stores/filestore_msg_test.go +++ b/stores/filestore_msg_test.go @@ -1799,7 +1799,7 @@ func TestFSNoPanicOnRemoveMsg(t *testing.T) { limits := testDefaultStoreLimits limits.MaxMsgs = 5 - s, err := NewFileStore(l, testFSDefaultDatastore, &limits, BufferSize(0), DoCRC(false)) + s, err := NewFileStore(l, testFSDefaultDatastore, &limits, BufferSize(0)) if err != nil { t.Fatalf("Error creating store: %v", err) } @@ -1820,7 +1820,7 @@ func TestFSNoPanicOnRemoveMsg(t *testing.T) { if err != nil { t.Fatalf("Error reading file: %v", err) } - util.ByteOrder.PutUint64(content[4:], 2) + copy(content[4:], []byte("xxx")) if err := ioutil.WriteFile(idxFile.name, content, 0600); err != nil { t.Fatalf("Error writing file: %v", err) } @@ -2092,3 +2092,148 @@ func TestFSReadMsgRecord(t *testing.T) { t.Fatalf("Expected error about wrong size, got %v", err) } } + +func TestFSGapsInSequenceWithoutFillAndExpiration(t *testing.T) { + opts := testFSGetOptionsForGapsTests() + // Add case with smaller slice + opts = append(opts, testFSGapsOption{ + name: "SmallFileSlice", + opt: SliceConfig(3, 0, 0, ""), + }) + for _, o := range opts { + t.Run(o.name, func(t *testing.T) { + fillGaps = false + defer func() { fillGaps = true }() + + cleanupFSDatastore(t) + defer cleanupFSDatastore(t) + + s := createDefaultFileStore(t, o.opt) + defer s.Close() + + limits := testDefaultStoreLimits + limits.MaxMsgs = 6 + s.SetLimits(&limits) + + c := storeCreateChannel(t, s, "foo") + ms := c.Msgs.(*FileMsgStore) + + seqs := []uint64{1, 2, 5, 8, 9, 10} + for _, seq := range seqs { + storeMsg(t, c, "foo", seq, []byte(fmt.Sprintf("msg%d", seq))) + } + ms.Flush() + ms.Lock() + ms.cache.empty() + ms.Unlock() + + for _, seq := range seqs { + msg, err := ms.Lookup(seq) + if err != nil { + t.Fatalf("Error on lookup: %v", err) + } + if msg.Sequence != seq || string(msg.Data) != fmt.Sprintf("msg%d", seq) { + t.Fatalf("Unexpected message for seq %v: %v", seq, msg) + } + } + + notfound := []uint64{3, 6, 7} + for _, seq := range notfound { + msg, err := ms.Lookup(seq) + if err != nil || msg != nil { + t.Fatalf("Unexpected result err=%v msg=%v", err, msg) + } + } + + // Add more messages to force 3 first to be removed, + // and in the case of the small slice option, should + // cause first slice to be removed. + // Add some more gaps.. + storeMsg(t, c, "foo", 12, []byte("msg12")) + storeMsg(t, c, "foo", 14, []byte("msg14")) + storeMsg(t, c, "foo", 16, []byte("msg16")) + + first, last := msgStoreFirstAndLastSequence(t, ms) + if first != 8 && last != 16 { + t.Fatalf("Expected first to be 8 and last to be 16, got %v and %v", first, last) + } + + expectFirsSliceToBe := func(t *testing.T, expected int) { + ms.Lock() + ffseq := ms.firstFSlSeq + ms.Unlock() + if ffseq != expected { + t.Fatalf("First slice expected to be %v, got %v", expected, ffseq) + } + } + if o.name == "SmallFileSlice" { + expectFirsSliceToBe(t, 2) + } else { + expectFirsSliceToBe(t, 1) + } + + storeMsg(t, c, "foo", 18, []byte("msg18")) + storeMsg(t, c, "foo", 19, []byte("msg19")) + storeMsg(t, c, "foo", 20, []byte("msg20")) + + // Force expiration.. + time.Sleep(50 * time.Millisecond) + ms.Lock() + ms.expireMsgs(time.Now().UnixNano(), int64(time.Millisecond)) + ms.Unlock() + + n, b, err := ms.State() + if err != nil { + t.Fatalf("Error getting state: %v", err) + } + if n != 0 || b != 0 { + t.Fatalf("Unexpected number of msgs: %v bytes: %v", n, b) + } + + if o.name == "SmallFileSlice" { + expectFirsSliceToBe(t, 4) + } else { + expectFirsSliceToBe(t, 1) + } + }) + } +} + +func TestFSExpirationError(t *testing.T) { + cleanupFSDatastore(t) + defer cleanupFSDatastore(t) + + s := createDefaultFileStore(t) + defer s.Close() + + c := storeCreateChannel(t, s, "foo") + ms := c.Msgs.(*FileMsgStore) + + storeMsg(t, c, "foo", 1, []byte("msg")) + ms.Flush() + ms.Lock() + ms.cache.empty() + ms.Unlock() + + ms.Lock() + idxFile := ms.files[1].idxFile + ms.fm.closeLockedOrOpenedFile(idxFile) + content, err := ioutil.ReadFile(idxFile.name) + if err != nil { + t.Fatalf("Error reading file: %v", err) + } + copy(content[4:], []byte("xxx")) + if err := ioutil.WriteFile(idxFile.name, content, 0600); err != nil { + t.Fatalf("Error writing file: %v", err) + } + ms.Unlock() + + time.Sleep(50 * time.Millisecond) + ms.Lock() + now := time.Now().UnixNano() + nextExpiration := ms.expireMsgs(time.Now().UnixNano(), int64(time.Millisecond)) + ms.Unlock() + if nextExpiration < now+int64(4500*time.Millisecond) { + t.Fatalf("Expected next expiration to be set to 5secs from now, got %v", time.Duration(nextExpiration)) + } +}