Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] FileStore: handle possible sequence gap in index files #940

Merged
merged 1 commit into from
Sep 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ before_script:
- $(exit $(misspell -locale US README.md | wc -l))
- staticcheck $EXCLUDE_VENDOR_AND_PROTO_DIR
script:
- set -e
- mysql -u root -e "CREATE USER 'nss'@'localhost' IDENTIFIED BY 'password'; GRANT ALL PRIVILEGES ON *.* TO 'nss'@'localhost'; CREATE DATABASE test_nats_streaming;"
- go test -i $EXCLUDE_VENDOR
- if [[ "$TRAVIS_GO_VERSION" =~ 1.12 ]]; then ./scripts/cov.sh TRAVIS; else go test -failfast $EXCLUDE_VENDOR; fi
- set +e

deploy:
provider: script
Expand Down
25 changes: 19 additions & 6 deletions server/server_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,20 +486,33 @@ func TestPersistentStoreNoPanicOnShutdown(t *testing.T) {
defer s.Shutdown()

// Start a go routine that keeps sending messages
sendQuit := make(chan bool)
sendQuit := make(chan bool, 1)
wg := &sync.WaitGroup{}
wg.Add(1)
nc, err := nats.Connect(nats.DefaultURL, nats.NoReconnect())
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
sc, err := stan.Connect(clusterName, clientName,
stan.NatsConn(nc),
stan.PubAckWait(250*time.Millisecond))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer sc.Close()
go func() {
defer wg.Done()

sc, nc := createConnectionWithNatsOpts(t, clientName, nats.NoReconnect())
defer sc.Close()
defer nc.Close()

payload := []byte("hello")
for {
select {
case <-sendQuit:
// We know that the server is shutdown, so close
// the NATS connection first so that STAN does not
// try to send the close protocol (which will timeout
// with a 2sec by default).
nc.Close()
sc.Close()
return
default:
sc.PublishAsync("foo", payload, nil)
Expand Down
4 changes: 4 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,10 @@ func getTestDefaultOptsForPersistentStore() *Options {
case stores.TypeFile:
opts.FilestoreDir = defaultDataStore
opts.FileStoreOpts.BufferSize = 1024
// Go 1.12 on macOS is very slow at doing sync writes...
if runtime.GOOS == "darwin" && strings.HasPrefix(runtime.Version(), "go1.12") {
opts.FileStoreOpts.DoSync = false
}
case stores.TypeSQL:
opts.SQLStoreOpts.Driver = testSQLDriver
opts.SQLStoreOpts.Source = testSQLSource
Expand Down
64 changes: 60 additions & 4 deletions stores/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,7 @@ var (
bkgTasksSleepDuration = defaultBkgTasksSleepDuration
cacheTTL = int64(defaultCacheTTL)
sliceCloseInterval = defaultSliceCloseInterval
fillGaps = true
)

// FileStoreTestSetBackgroundTaskInterval is used by tests to reduce the interval
Expand Down Expand Up @@ -2622,7 +2623,7 @@ func (ms *FileMsgStore) Store(m *pb.MsgProto) (uint64, error) {
ms.needSync = true

// Is there a gap in message sequence?
if ms.last > 0 && m.Sequence > ms.last+1 {
if fillGaps && ms.last > 0 && m.Sequence > ms.last+1 {
if err := ms.fillGaps(fslice, m); err != nil {
ms.unlockFiles(fslice)
return 0, err
Expand Down Expand Up @@ -2915,6 +2916,11 @@ func (ms *FileMsgStore) expireMsgs(now, maxAge int64) int64 {
// Try again in 5 secs.
ms.expiration = now + int64(5*time.Second)
return ms.expiration
} else if m == nil {
ms.log.Warnf("Skip expiration of missing sequence %v for channel %q",
ms.first, ms.channelName)
ms.skipMissingMsg(slice)
continue
}
}
}
Expand Down Expand Up @@ -2998,14 +3004,53 @@ func (ms *FileMsgStore) readMsgIndex(slice *fileSlice, seq uint64) (*msgIndex, e
// Read the index record and ensure we have what we expect
seqInIndexFile, msgIndex, err := ms.readIndex(slice.idxFile.handle)
if err != nil {
if err == io.EOF {
return ms.backtrackIndex(slice, seq, seqInIndexFile, idxFileOffset)
}
return nil, err
}
if seqInIndexFile != seq {
return nil, fmt.Errorf("wrong sequence, wanted %v got %v", seq, seqInIndexFile)
return ms.backtrackIndex(slice, seq, seqInIndexFile, idxFileOffset)
}
return msgIndex, nil
}

// Looks for the index record for the given `seq` going backward
// from the give `offset`. This is invoked when the recovered index
// at the given offset does not the requested sequence, as the result
// of gaps in the index file.
func (ms *FileMsgStore) backtrackIndex(slice *fileSlice, seq, wrongSeq uint64, offset int64) (*msgIndex, error) {
for offset -= msgIndexRecSize; offset >= 4; offset -= msgIndexRecSize {
if _, err := slice.idxFile.handle.Seek(offset, io.SeekStart); err != nil {
break
}
seqInIndexFile, msgIndex, err := ms.readIndex(slice.idxFile.handle)
if err == io.EOF {
continue
}
if err == nil && seqInIndexFile == seq {
return msgIndex, nil
}
if err != nil || seqInIndexFile < seq {
break
}
}
// Not found...
return nil, nil
}

// When a message is not found due to unexpected gap (from older
// store since now we ensure there is no gap), bump the first
// message sequence and update slice.
func (ms *FileMsgStore) skipMissingMsg(slice *fileSlice) {
ms.first++
ms.firstMsg = nil
if ms.first > ms.last {
ms.lastMsg = nil
}
slice.firstSeq = ms.first
}

// removeFirstMsg "removes" the first message of the first slice.
// If the slice is "empty" the file slice is removed.
func (ms *FileMsgStore) removeFirstMsg(mindex *msgIndex, lockFile bool) error {
Expand All @@ -3024,6 +3069,15 @@ func (ms *FileMsgStore) removeFirstMsg(mindex *msgIndex, lockFile bool) error {
if err != nil {
return err
}
// Here we are getting the first sequence from the first
// available slice, so ms.first and slice.firstSeq should
// be the same, but due to possible gaps, they may not.
// Adjust in case.
ms.first = slice.firstSeq
}
if mindex == nil {
ms.skipMissingMsg(slice)
return nil
}
// Size of the first message in this slice
firstMsgSize := mindex.msgSize
Expand Down Expand Up @@ -3274,8 +3328,10 @@ func (ms *FileMsgStore) lookup(seq uint64) (*pb.MsgProto, error) {
}
if ms.readBufSize > 0 && seq != fslice.lastSeq {
msg, err = ms.readAheadMsgs(fslice, seq)
ms.unlockFiles(fslice)
return msg, err
if err == nil {
ms.unlockFiles(fslice)
return msg, err
}
}
msgIndex, err := ms.readMsgIndex(fslice, seq)
if msgIndex != nil {
Expand Down
149 changes: 147 additions & 2 deletions stores/filestore_msg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1799,7 +1799,7 @@ func TestFSNoPanicOnRemoveMsg(t *testing.T) {

limits := testDefaultStoreLimits
limits.MaxMsgs = 5
s, err := NewFileStore(l, testFSDefaultDatastore, &limits, BufferSize(0), DoCRC(false))
s, err := NewFileStore(l, testFSDefaultDatastore, &limits, BufferSize(0))
if err != nil {
t.Fatalf("Error creating store: %v", err)
}
Expand All @@ -1820,7 +1820,7 @@ func TestFSNoPanicOnRemoveMsg(t *testing.T) {
if err != nil {
t.Fatalf("Error reading file: %v", err)
}
util.ByteOrder.PutUint64(content[4:], 2)
copy(content[4:], []byte("xxx"))
if err := ioutil.WriteFile(idxFile.name, content, 0600); err != nil {
t.Fatalf("Error writing file: %v", err)
}
Expand Down Expand Up @@ -2092,3 +2092,148 @@ func TestFSReadMsgRecord(t *testing.T) {
t.Fatalf("Expected error about wrong size, got %v", err)
}
}

func TestFSGapsInSequenceWithoutFillAndExpiration(t *testing.T) {
opts := testFSGetOptionsForGapsTests()
// Add case with smaller slice
opts = append(opts, testFSGapsOption{
name: "SmallFileSlice",
opt: SliceConfig(3, 0, 0, ""),
})
for _, o := range opts {
t.Run(o.name, func(t *testing.T) {
fillGaps = false
defer func() { fillGaps = true }()

cleanupFSDatastore(t)
defer cleanupFSDatastore(t)

s := createDefaultFileStore(t, o.opt)
defer s.Close()

limits := testDefaultStoreLimits
limits.MaxMsgs = 6
s.SetLimits(&limits)

c := storeCreateChannel(t, s, "foo")
ms := c.Msgs.(*FileMsgStore)

seqs := []uint64{1, 2, 5, 8, 9, 10}
for _, seq := range seqs {
storeMsg(t, c, "foo", seq, []byte(fmt.Sprintf("msg%d", seq)))
}
ms.Flush()
ms.Lock()
ms.cache.empty()
ms.Unlock()

for _, seq := range seqs {
msg, err := ms.Lookup(seq)
if err != nil {
t.Fatalf("Error on lookup: %v", err)
}
if msg.Sequence != seq || string(msg.Data) != fmt.Sprintf("msg%d", seq) {
t.Fatalf("Unexpected message for seq %v: %v", seq, msg)
}
}

notfound := []uint64{3, 6, 7}
for _, seq := range notfound {
msg, err := ms.Lookup(seq)
if err != nil || msg != nil {
t.Fatalf("Unexpected result err=%v msg=%v", err, msg)
}
}

// Add more messages to force 3 first to be removed,
// and in the case of the small slice option, should
// cause first slice to be removed.
// Add some more gaps..
storeMsg(t, c, "foo", 12, []byte("msg12"))
storeMsg(t, c, "foo", 14, []byte("msg14"))
storeMsg(t, c, "foo", 16, []byte("msg16"))

first, last := msgStoreFirstAndLastSequence(t, ms)
if first != 8 && last != 16 {
t.Fatalf("Expected first to be 8 and last to be 16, got %v and %v", first, last)
}

expectFirsSliceToBe := func(t *testing.T, expected int) {
ms.Lock()
ffseq := ms.firstFSlSeq
ms.Unlock()
if ffseq != expected {
t.Fatalf("First slice expected to be %v, got %v", expected, ffseq)
}
}
if o.name == "SmallFileSlice" {
expectFirsSliceToBe(t, 2)
} else {
expectFirsSliceToBe(t, 1)
}

storeMsg(t, c, "foo", 18, []byte("msg18"))
storeMsg(t, c, "foo", 19, []byte("msg19"))
storeMsg(t, c, "foo", 20, []byte("msg20"))

// Force expiration..
time.Sleep(50 * time.Millisecond)
ms.Lock()
ms.expireMsgs(time.Now().UnixNano(), int64(time.Millisecond))
ms.Unlock()

n, b, err := ms.State()
if err != nil {
t.Fatalf("Error getting state: %v", err)
}
if n != 0 || b != 0 {
t.Fatalf("Unexpected number of msgs: %v bytes: %v", n, b)
}

if o.name == "SmallFileSlice" {
expectFirsSliceToBe(t, 4)
} else {
expectFirsSliceToBe(t, 1)
}
})
}
}

func TestFSExpirationError(t *testing.T) {
cleanupFSDatastore(t)
defer cleanupFSDatastore(t)

s := createDefaultFileStore(t)
defer s.Close()

c := storeCreateChannel(t, s, "foo")
ms := c.Msgs.(*FileMsgStore)

storeMsg(t, c, "foo", 1, []byte("msg"))
ms.Flush()
ms.Lock()
ms.cache.empty()
ms.Unlock()

ms.Lock()
idxFile := ms.files[1].idxFile
ms.fm.closeLockedOrOpenedFile(idxFile)
content, err := ioutil.ReadFile(idxFile.name)
if err != nil {
t.Fatalf("Error reading file: %v", err)
}
copy(content[4:], []byte("xxx"))
if err := ioutil.WriteFile(idxFile.name, content, 0600); err != nil {
t.Fatalf("Error writing file: %v", err)
}
ms.Unlock()

time.Sleep(50 * time.Millisecond)
ms.Lock()
now := time.Now().UnixNano()
nextExpiration := ms.expireMsgs(time.Now().UnixNano(), int64(time.Millisecond))
ms.Unlock()
if nextExpiration < now+int64(4500*time.Millisecond) {
t.Fatalf("Expected next expiration to be set to 5secs from now, got %v", time.Duration(nextExpiration))
}
}