diff --git a/server/clustering.go b/server/clustering.go index 9ae5c449..1f2dec07 100644 --- a/server/clustering.go +++ b/server/clustering.go @@ -20,6 +20,7 @@ import ( "os" "path/filepath" "sync" + "sync/atomic" "time" "github.com/hashicorp/raft" @@ -446,6 +447,32 @@ func (s *StanServer) getClusteringPeerAddr(raftName, nodeID string) string { return fmt.Sprintf("%s.%s.%s", s.opts.ID, nodeID, raftName) } +// Returns the message store first and last sequence. +// When in clustered mode, if the first and last are 0, returns the value of +// the last sequence that we possibly got from the last snapshot. If a node +// restores a snapshot that let's say has first=1 and last=100, but when it +// tries to get these messages from the leader, the leader does not send them +// back because they have all expired, the node will not store anything. +// If we just rely on store's first/last, this node would use and report 0 +// for channel's first and last while when all messages have expired, it should +// be last+1/last. +func (s *StanServer) getChannelFirstAndlLastSeq(c *channel) (uint64, uint64, error) { + first, last, err := c.store.Msgs.FirstAndLastSequence() + if !s.isClustered { + return first, last, err + } + if err != nil { + return 0, 0, err + } + if first == 0 && last == 0 { + if fseq := atomic.LoadUint64(&c.firstSeq); fseq != 0 { + first = fseq + last = fseq - 1 + } + } + return first, last, nil +} + // Apply log is invoked once a log entry is committed. // It returns a value which will be made available in the // ApplyFuture returned by Raft.Apply method if that @@ -456,13 +483,16 @@ func (r *raftFSM) Apply(l *raft.Log) interface{} { if err := op.Unmarshal(l.Data); err != nil { panic(err) } + // We don't want snapshot Persist() and Apply() to execute concurrently, + // so use common lock. + r.Lock() + defer r.Unlock() switch op.OpType { case spb.RaftOperation_Publish: // Message replication. var ( - c *channel - err error - lastSeq uint64 + c *channel + err error ) for _, msg := range op.PublishBatch.Messages { // This is a batch for a given channel, so lookup channel once. @@ -472,18 +502,25 @@ func (r *raftFSM) Apply(l *raft.Log) interface{} { // just bail out. if err == ErrChanDelInProgress { return nil + } else if err == nil && !c.lSeqChecked { + // If msg.Sequence is > 1, then make sure we have no gap. + if msg.Sequence > 1 { + // We pass `1` for the `first` sequence. The function we call + // will do the right thing when it comes to restore possible + // missing messages. + err = s.raft.fsm.restoreMsgsFromSnapshot(c, 1, msg.Sequence-1, true) + } + if err == nil { + c.lSeqChecked = true + } } - lastSeq, err = c.store.Msgs.LastSequence() - } - if err == nil && lastSeq < msg.Sequence-1 { - err = s.raft.fsm.restoreMsgsFromSnapshot(c, lastSeq+1, msg.Sequence-1) } if err == nil { _, err = c.store.Msgs.Store(msg) } if err != nil { - return fmt.Errorf("failed to store replicated message %d on channel %s: %v", - msg.Sequence, msg.Subject, err) + panic(fmt.Errorf("failed to store replicated message %d on channel %s: %v", + msg.Sequence, msg.Subject, err)) } } return nil diff --git a/server/clustering_test.go b/server/clustering_test.go index e0d52e54..d70f638a 100644 --- a/server/clustering_test.go +++ b/server/clustering_test.go @@ -68,7 +68,7 @@ func shutdownAndCleanupState(t *testing.T, s *StanServer, nodeID string) { os.RemoveAll(filepath.Join(defaultDataStore, nodeID)) os.RemoveAll(filepath.Join(defaultRaftLog, nodeID)) case stores.TypeSQL: - test.CleanupSQLDatastore(t, testSQLDriver, testSQLSource+nodeID) + test.CleanupSQLDatastore(t, testSQLDriver, testSQLSource+"_"+nodeID) default: t.Fatalf("This test needs to be updated for store type: %v", persistentStoreType) } @@ -4713,18 +4713,19 @@ func TestClusteringSubDontStallDueToMsgExpiration(t *testing.T) { } } - checkReceived := func() { + checkReceived := func(t *testing.T) { + t.Helper() select { case <-ch: case <-time.After(2 * time.Second): t.Fatalf("Failed to receive messages") } } - checkReceived() + checkReceived(t) - // We aremove all state from node "b", but even if we didn't, on restart, + // We remove all state from node "b", but even if we didn't, on restart, // since "b" store would be behind the rest, it would be emptied because - // the current first message is move than the "b"'s last sequence. + // the current first message is more than the "b"'s last sequence. shutdownAndCleanupState(t, s2, "b") for i := 0; i < secondBatch; i++ { @@ -4733,7 +4734,7 @@ func TestClusteringSubDontStallDueToMsgExpiration(t *testing.T) { } } - checkReceived() + checkReceived(t) // Wait for messages to expire time.Sleep(100 * time.Millisecond) @@ -4795,12 +4796,133 @@ func TestClusteringSubDontStallDueToMsgExpiration(t *testing.T) { if err := sc.Publish("foo", []byte("hello")); err != nil { t.Fatalf("Error on publish: %v", err) } - checkReceived() + checkReceived(t) sc.Close() s3.Shutdown() } +func TestClusteringStoreFirstLastDontFallToZero(t *testing.T) { + resetPreviousHTTPConnections() + cleanupDatastore(t) + defer cleanupDatastore(t) + cleanupRaftLog(t) + defer cleanupRaftLog(t) + + // For this test, use a central NATS server. + ns := natsdTest.RunDefaultServer() + defer ns.Shutdown() + + ttl := 100 * time.Millisecond + + // Configure first server + s1sOpts := getTestDefaultOptsForClustering("a", false) + s1sOpts.Clustering.Peers = []string{"a", "b", "c"} + s1sOpts.MaxAge = ttl + s1 := runServerWithOpts(t, s1sOpts, nil) + defer s1.Shutdown() + + // Configure second server. + s2sOpts := getTestDefaultOptsForClustering("b", false) + s2sOpts.Clustering.Peers = []string{"a", "b", "c"} + s2sOpts.MaxAge = ttl + s2 := runServerWithOpts(t, s2sOpts, nil) + defer s2.Shutdown() + + getLeader(t, 10*time.Second, s1, s2) + + sc := NewDefaultConnection(t) + defer sc.Close() + for i := 0; i < 10; i++ { + if err := sc.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + } + sc.Close() + + time.Sleep(200 * time.Millisecond) + + if err := s1.raft.Snapshot().Error(); err != nil { + t.Fatalf("Error on snapshot: %v", err) + } + if err := s2.raft.Snapshot().Error(); err != nil { + t.Fatalf("Error on snapshot: %v", err) + } + + // Configure third server. + s3sOpts := getTestDefaultOptsForClustering("c", false) + s3sOpts.Clustering.Peers = []string{"a", "b", "c"} + s3sOpts.MaxAge = ttl + s3 := runServerWithOpts(t, s3sOpts, nil) + defer s3.Shutdown() + + // Wait for "foo" to be re-created on node "c" and we get + // the proper first/last + waitFor(t, 5*time.Second, 15*time.Millisecond, func() error { + s3.channels.RLock() + c, ok := s3.channels.channels["foo"] + s3.channels.RUnlock() + if !ok { + return fmt.Errorf("Channel foo still not created") + } + // We need to grab the FSM lock to safely access this field. + if _, lastSeq, _ := s3.getChannelFirstAndlLastSeq(c); lastSeq != 10 { + t.Fatalf("Expected lastSeq to be 10, got %v", lastSeq) + } + return nil + }) + + if err := s3.raft.Snapshot().Error(); err != nil { + t.Fatalf("Error on snapshot: %v", err) + } + + shutdownAndCleanupState(t, s1, "a") + shutdownAndCleanupState(t, s2, "b") + + // Restart s1 + s1nOpts := defaultMonitorOptions + s1 = runServerWithOpts(t, s1sOpts, &s1nOpts) + defer s1.Shutdown() + + leader := getLeader(t, 10*time.Second, s1, s3) + if leader != s3 { + t.Fatalf("s3 should have been leader") + } + + waitFor(t, 5*time.Second, 15*time.Millisecond, func() error { + s1.channels.RLock() + _, ok := s1.channels.channels["foo"] + s1.channels.RUnlock() + if !ok { + return fmt.Errorf("Channel foo still not created") + } + return nil + }) + + shutdownAndCleanupState(t, s3, "c") + s3 = runServerWithOpts(t, s3sOpts, nil) + defer s3.Shutdown() + + leader = getLeader(t, 10*time.Second, s1, s3) + if leader != s1 { + t.Fatalf("s1 should have been leader") + } + + resp, body := getBody(t, ChannelsPath+"?channel=foo", expectedJSON) + defer resp.Body.Close() + + cz := Channelz{} + if err := json.Unmarshal(body, &cz); err != nil { + t.Fatalf("Got an error unmarshalling the body: %v", err) + } + resp.Body.Close() + if cz.FirstSeq != 11 || cz.LastSeq != 10 { + t.Fatalf("Expected first/last seq to be 11, 10, got %v, %v", + cz.FirstSeq, cz.LastSeq) + } + leader.Shutdown() +} + func TestClusteringNoRaceOnChannelMonitor(t *testing.T) { resetPreviousHTTPConnections() cleanupDatastore(t) @@ -5541,16 +5663,50 @@ func TestClusteringSubSentAckReplResumeOnClusterRestart(t *testing.T) { sc.Close() } -type msgStoreDoesntFlush struct { +// This mocked MsgStore servers two purposes, to make sure +// that Flush() is invoked when a Snapshot is done and will +// simulate not recovering all messages (done by skipping +// storing some). +type msgStoreCaptureFlush struct { + sync.Mutex stores.MsgStore + firstSeq uint64 + lastSeq uint64 + ch chan struct{} } -func (s *msgStoreDoesntFlush) Store(m *pb.MsgProto) (uint64, error) { +func (s *msgStoreCaptureFlush) Store(m *pb.MsgProto) (uint64, error) { // To simulate a no flush, we are actually skipping storing // the message. + if s.firstSeq == 0 || m.Sequence < s.firstSeq { + s.firstSeq = m.Sequence + } + if m.Sequence > s.lastSeq { + s.lastSeq = m.Sequence + } return m.Sequence, nil } +func (s *msgStoreCaptureFlush) FirstAndLastSequence() (uint64, uint64, error) { + return s.firstSeq, s.lastSeq, nil +} + +func (s *msgStoreCaptureFlush) LastSequence() (uint64, error) { + return s.lastSeq, nil +} + +func (s *msgStoreCaptureFlush) Flush() error { + s.Lock() + if s.ch != nil { + select { + case s.ch <- struct{}{}: + default: + } + } + s.Unlock() + return s.MsgStore.Flush() +} + func TestClusteringGapsAfterSnapshotAndNoFlush(t *testing.T) { cleanupDatastore(t) defer cleanupDatastore(t) @@ -5592,7 +5748,10 @@ func TestClusteringGapsAfterSnapshotAndNoFlush(t *testing.T) { c := s2.channels.get("foo") c.store.Msgs.Flush() // Replace with a store that does not write messages - c.store.Msgs = &msgStoreDoesntFlush{c.store.Msgs} + ms := &msgStoreCaptureFlush{MsgStore: c.store.Msgs, firstSeq: 1, lastSeq: 1} + s2.raft.fsm.Lock() + c.store.Msgs = ms + s2.raft.fsm.Unlock() for i := 0; i < 100; i++ { if err := sc.Publish("foo", []byte("hello")); err != nil { @@ -5600,9 +5759,21 @@ func TestClusteringGapsAfterSnapshotAndNoFlush(t *testing.T) { } } + ms.Lock() + fch := make(chan struct{}, 1) + ms.ch = fch + ms.Unlock() + if err := s2.raft.Snapshot().Error(); err != nil { t.Fatalf("Error on snapshot: %v", err) } + // Make sure that store was flushed + select { + case <-fch: + case <-time.After(2 * time.Second): + t.Fatalf("MsgStore was not flushed during snapshot") + } + s2.Shutdown() for i := 0; i < 10; i++ { @@ -5627,6 +5798,8 @@ func TestClusteringGapsAfterSnapshotAndNoFlush(t *testing.T) { s1.Shutdown() s3.Shutdown() + // Restart in non cluster mode and consume all messages from S2. + // Make sure that there is not one with empty content. s2.Shutdown() s2sOpts.Clustering.Clustered = false s2 = runServerWithOpts(t, s2sOpts, nil) @@ -5744,9 +5917,111 @@ func TestClusteringRestoreSnapshotWithSomeMsgsNoLongerAvail(t *testing.T) { s2 := runServerWithOpts(t, s2sOpts, nil) defer s2.Shutdown() + getLeader(t, 10*time.Second, s1, s2) + + sc := NewDefaultConnection(t) + defer sc.Close() + + for i := 0; i < 10; i++ { + if err := sc.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + } + + // Create a snapshot that will indicate that there is messages from 1 to 10. + if err := s1.raft.Snapshot().Error(); err != nil { + t.Fatalf("Error on snapshot: %v", err) + } + + // We are going to check many different cases. + // 1- all messages in the snapshot are restored + // 2- some messages in the snapshot are restored + // 3- no message in the snapshot are restored + + check := func(t *testing.T, expectedFirst, expectedLast uint64) { + t.Helper() + s3sOpts := getTestDefaultOptsForClustering("c", false) + s3sOpts.MaxMsgs = 10 + s3 := runServerWithOpts(t, s3sOpts, nil) + defer func() { + // Shutdown s3 and cleanup its state so it will have nothing in its stores + // and will restore from the snapshot. + shutdownAndCleanupState(t, s3, "c") + }() + + waitFor(t, 5*time.Second, 15*time.Millisecond, func() error { + c := s3.channels.get("foo") + if c == nil { + return fmt.Errorf("Channel foo not recreated yet") + } + first, last, err := s3.getChannelFirstAndlLastSeq(c) + if err != nil { + return fmt.Errorf("Error getting first/last seq: %v", err) + } + if first != expectedFirst { + return fmt.Errorf("Expected first to be %v, got %v", expectedFirst, first) + } + if last != expectedLast { + return fmt.Errorf("Expected last to be %v, got %v", expectedLast, last) + } + return nil + }) + } + + // 1- all messages in the snapshot are restored + check(t, 1, 10) + + // 2- some messages in the snapshot are restored + // To do so, send 2 more messages what will make messages 1 and 2 disappear + for i := 0; i < 2; i++ { + if err := sc.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + } + check(t, 3, 12) + + // 3- no message in the snapshot are restored + // To do so, send enough messages so that the original 10 are gone. + for i := 0; i < 10; i++ { + if err := sc.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + } + check(t, 13, 22) + + sc.Close() +} + +func TestClusteringRestoreSnapshotCreateSnapshotAfterMsgsExpired(t *testing.T) { + // This test checks that when a node restores from a snapshot + // that channel has messages from 1 to 10, but when fetching them + // realize that they have expired, it will create its own snapshot + // to reflect the new state so that if it were to restart, it + // would not need to try to restore those known expired messages. + cleanupDatastore(t) + defer cleanupDatastore(t) + cleanupRaftLog(t) + defer cleanupRaftLog(t) + + // For this test, use a central NATS server. + ns := natsdTest.RunDefaultServer() + defer ns.Shutdown() + + // Configure first server + s1sOpts := getTestDefaultOptsForClustering("a", true) + s1sOpts.MaxAge = 100 * time.Millisecond + s1 := runServerWithOpts(t, s1sOpts, nil) + defer s1.Shutdown() + + // Configure second server. + s2sOpts := getTestDefaultOptsForClustering("b", false) + s2sOpts.MaxAge = 100 * time.Millisecond + s2 := runServerWithOpts(t, s2sOpts, nil) + defer s2.Shutdown() + // Configure third server. s3sOpts := getTestDefaultOptsForClustering("c", false) - s3sOpts.MaxMsgs = 10 + s3sOpts.MaxAge = 100 * time.Millisecond s3 := runServerWithOpts(t, s3sOpts, nil) defer s3.Shutdown() @@ -5760,46 +6035,172 @@ func TestClusteringRestoreSnapshotWithSomeMsgsNoLongerAvail(t *testing.T) { t.Fatalf("Error on publish: %v", err) } } + sc.Close() // Create a snapshot that will indicate that there is messages from 1 to 10. if err := s1.raft.Snapshot().Error(); err != nil { t.Fatalf("Error on snapshot: %v", err) } - // Shutdown s3 and cleanup its state so it will have nothing in its stores - // and will restore from the snapshot. - shutdownAndCleanupState(t, s3, "c") + // Wait for msgs to expire + waitFor(t, time.Second, 50*time.Millisecond, func() error { + c := s1.channels.get("foo") + n, _, _ := c.store.Msgs.State() + if n != 0 { + return fmt.Errorf("Not all messages expired, still %v", n) + } + return nil + }) - // Send 2 more messages that will make messages 1 and 2 disappear - for i := 0; i < 2; i++ { + // Restart s3 server, wait for it to report that there are no message (all expired) + s3.Shutdown() + s3 = runServerWithOpts(t, s3sOpts, nil) + defer s3.Shutdown() + + waitFor(t, 5*time.Second, 15*time.Millisecond, func() error { + c := s3.channels.get("foo") + if c == nil { + return fmt.Errorf("channel still not created") + } + first, last, _ := s3.getChannelFirstAndlLastSeq(c) + if first != 11 || last != 10 { + return fmt.Errorf("first and last should be 11 - 10, got %v - %v", first, last) + } + return nil + }) + + // Now stop all servers, and restart s3. If s3 has performed its own + // snapshot, it should be able to start on its own. + s1.Shutdown() + s2.Shutdown() + s3.Shutdown() + + errCh := make(chan error, 1) + go func() { + s, err := RunServerWithOpts(s3sOpts, nil) + if s != nil { + s.Shutdown() + } + errCh <- err + }() + + select { + case <-time.After(5 * time.Second): + t.Fatalf("Server is stuck starting") + case e := <-errCh: + if e != nil { + t.Fatalf(e.Error()) + } + } +} + +func TestClusteringRestoreSnapshotMsgsBailIfNoLeader(t *testing.T) { + if persistentStoreType != stores.TypeFile { + t.Skip("test works only for file stores") + } + restoreMsgsAttempts = 5 + restoreMsgsRcvTimeout = 50 * time.Millisecond + restoreMsgsSleepBetweenAttempts = 10 * time.Millisecond + defer func() { + restoreMsgsAttempts = defaultRestoreMsgsAttempts + restoreMsgsRcvTimeout = defaultRestoreMsgsRcvTimeout + restoreMsgsSleepBetweenAttempts = defaultRestoreMsgsSleepBetweenAttempts + }() + + cleanupDatastore(t) + defer cleanupDatastore(t) + cleanupRaftLog(t) + defer cleanupRaftLog(t) + + // For this test, use a central NATS server. + ns := natsdTest.RunDefaultServer() + defer ns.Shutdown() + + // Configure first server + s1sOpts := getTestDefaultOptsForClustering("a", false) + s1sOpts.Clustering.Peers = []string{"a", "b", "c"} + s1 := runServerWithOpts(t, s1sOpts, nil) + defer s1.Shutdown() + + // Configure second server. + s2sOpts := getTestDefaultOptsForClustering("b", false) + s2sOpts.Clustering.Peers = []string{"a", "b", "c"} + s2 := runServerWithOpts(t, s2sOpts, nil) + defer s2.Shutdown() + + leader := getLeader(t, 10*time.Second, s1, s2) + + sc := NewDefaultConnection(t) + defer sc.Close() + + for i := 0; i < 10; i++ { if err := sc.Publish("foo", []byte("hello")); err != nil { t.Fatalf("Error on publish: %v", err) } } sc.Close() - // Start s3. It will restore from the snapshot that says that - // channel has message 1 to 10, and then should receive 2 raft - // logs with messages 11 and 12. - // When s3 will ask the leader for messages 1 and 2, it should - // get empty responses indicating that these messages are gone, - // but should be able to request messages 3 to 10, then 11 and - // 12 will be replayed from raft logs. - s3 = runServerWithOpts(t, s3sOpts, nil) + // Create a snapshot that will indicate that there is messages from 1 to 10. + if err := leader.raft.Snapshot().Error(); err != nil { + t.Fatalf("Error on snapshot: %v", err) + } + + s3sOpts := getTestDefaultOptsForClustering("c", false) + s3sOpts.Clustering.Peers = []string{"a", "b", "c"} + s3 := runServerWithOpts(t, s3sOpts, nil) defer s3.Shutdown() + // Wait for this snapshot to be sent to s3 waitFor(t, 5*time.Second, 15*time.Millisecond, func() error { c := s3.channels.get("foo") if c == nil { - return fmt.Errorf("Channel foo not recreated yet") - } - first, last, err := c.store.Msgs.FirstAndLastSequence() - if err != nil { - return fmt.Errorf("Error getting first/last seq: %v", err) + return fmt.Errorf("channel still not created") } - if first != 3 || last != 12 { - return fmt.Errorf("Expected first=3 last=12, got %v and %v", first, last) + first, last, _ := s3.getChannelFirstAndlLastSeq(c) + if first != 1 || last != 10 { + return fmt.Errorf("first and last should be 1 - 10, got %v - %v", first, last) } return nil }) + + snaps, err := ioutil.ReadDir(filepath.Join(defaultRaftLog, "c", clusterName, "snapshots")) + if err != nil { + t.Fatalf("Error reading snapshots directory: %v", err) + } + if len(snaps) == 0 { + t.Skip("Snapshot was not installed, skipping test") + } + + // Shutdown all servers, then restart s3. + s1.Shutdown() + s2.Shutdown() + s3.Shutdown() + + // Since s3 will have made a snapshot of its own, the only way it + // would get stuck waiting for a leader is if its store is not + // consistent with the snapshot info. So remove s3's message dat file. + if err := os.Remove(filepath.Join(defaultDataStore, "c", "foo", "msgs.1.dat")); err != nil { + t.Fatalf("error removing file: %v", err) + } + + errCh := make(chan error, 1) + go func() { + // We are expecting this to fail to start + s, err := RunServerWithOpts(s3sOpts, nil) + if err == nil { + s.Shutdown() + errCh <- fmt.Errorf("server did not fail to start") + return + } + errCh <- nil + }() + + select { + case <-time.After(time.Duration(restoreMsgsAttempts+2) * (restoreMsgsSleepBetweenAttempts + restoreMsgsRcvTimeout)): + t.Fatalf("Server should have exited after a certain number of failed attempts") + case e := <-errCh: + if e != nil { + t.Fatalf(e.Error()) + } + } } diff --git a/server/monitor.go b/server/monitor.go index a1b7b544..4aca994c 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -21,7 +21,6 @@ import ( "runtime" "sort" "strconv" - "sync/atomic" "time" gnatsd "github.com/nats-io/gnatsd/server" @@ -501,18 +500,12 @@ func (s *StanServer) updateChannelz(cz *Channelz, c *channel, subsOption int) er if err != nil { return fmt.Errorf("unable to get message state: %v", err) } - fseq, lseq, err := c.store.Msgs.FirstAndLastSequence() + fseq, lseq, err := s.getChannelFirstAndlLastSeq(c) if err != nil { return fmt.Errorf("unable to get first and last sequence: %v", err) } cz.Msgs = msgs cz.Bytes = bytes - if fseq == 0 && lseq == 0 && s.isClustered { - fseq = atomic.LoadUint64(&c.firstSeq) - if fseq > 1 { - lseq = fseq - 1 - } - } cz.FirstSeq = fseq cz.LastSeq = lseq if subsOption == 1 { diff --git a/server/server.go b/server/server.go index 9003a4a6..934da41c 100644 --- a/server/server.go +++ b/server/server.go @@ -436,6 +436,11 @@ type channel struct { stan *StanServer activity *channelActivity nextSubID uint64 + + // Used in cluster mode. This is to know if the message store + // last sequence should be checked before storing a message in + // Apply(). Protected by the raft's FSM lock. + lSeqChecked bool } type channelActivity struct { @@ -2002,12 +2007,10 @@ func (s *StanServer) leadershipAcquired() error { channels := s.channels.getAll() for _, c := range channels { // Update next sequence to assign. - lastSequence, err := c.store.Msgs.LastSequence() + _, lastSequence, err := s.getChannelFirstAndlLastSeq(c) if err != nil { return err } - // It is possible that nextSequence be set when restoring - // from snapshots. Set it to the max value. if c.nextSequence <= lastSequence { c.nextSequence = lastSequence + 1 } diff --git a/server/snapshot.go b/server/snapshot.go index 868d611d..1398f26f 100644 --- a/server/snapshot.go +++ b/server/snapshot.go @@ -26,6 +26,18 @@ import ( "github.com/nats-io/nats-streaming-server/util" ) +const ( + defaultRestoreMsgsAttempts = 10 + defaultRestoreMsgsRcvTimeout = 2 * time.Second + defaultRestoreMsgsSleepBetweenAttempts = time.Second +) + +var ( + restoreMsgsAttempts = defaultRestoreMsgsAttempts + restoreMsgsRcvTimeout = defaultRestoreMsgsRcvTimeout + restoreMsgsSleepBetweenAttempts = defaultRestoreMsgsSleepBetweenAttempts +) + // serverSnapshot implements the raft.FSMSnapshot interface by snapshotting // StanServer state. type serverSnapshot struct { @@ -53,6 +65,11 @@ func (s *serverSnapshot) Persist(sink raft.SnapshotSink) (err error) { snap := &spb.RaftSnapshot{} + // We don't want Persit() and Apply() to be invoked concurrently, + // so use common lock. + s.raft.fsm.Lock() + defer s.raft.fsm.Unlock() + s.snapshotClients(snap, sink) if err := s.snapshotChannels(snap); err != nil { @@ -150,7 +167,7 @@ func (s *serverSnapshot) snapshotChannels(snap *spb.RaftSnapshot) error { if err := c.store.Msgs.Flush(); err != nil { return err } - first, last, err := c.store.Msgs.FirstAndLastSequence() + first, last, err := s.getChannelFirstAndlLastSeq(c) if err != nil { return err } @@ -229,6 +246,27 @@ func (r *raftFSM) Restore(snapshot io.ReadCloser) (retErr error) { r.Lock() defer r.Unlock() + shouldSnapshot := false + defer func() { + if retErr == nil && shouldSnapshot { + s.wg.Add(1) + go func() { + defer s.wg.Done() + // s.raft.Snapshot() is actually s.raft.Raft.Snapshot() + // (our raft structure embeds Raft). But it is set after + // the call to NewRaft() - which invokes this function on + // startup. However, this runs under the server's lock, so + // simply grab the lock to ensure that Snapshot() will be + // able to execute. + // Also don't use startGoRoutine() here since it needs the + // server lock, which we already have. + s.mu.Lock() + s.raft.Snapshot().Error() + s.mu.Unlock() + }() + } + }() + // This function may be invoked directly from raft.NewRaft() when // the node is initialized and if there were exisiting local snapshots, // or later, when catching up with a leader. We behave differently @@ -256,7 +294,13 @@ func (r *raftFSM) Restore(snapshot io.ReadCloser) (retErr error) { }() } else { s.log.Noticef("restoring from snapshot") - defer s.log.Noticef("done restoring from snapshot") + defer func() { + if retErr == nil { + s.log.Noticef("done restoring from snapshot") + } else { + s.log.Errorf("error restoring from snapshot: %v", retErr) + } + }() } // We need to drop current state. The server will recover from snapshot @@ -301,7 +345,9 @@ func (r *raftFSM) Restore(snapshot io.ReadCloser) (retErr error) { if err := r.restoreClientsFromSnapshot(serverSnap); err != nil { return err } - return r.restoreChannelsFromSnapshot(serverSnap, inNewRaftCall) + var err error + shouldSnapshot, err = r.restoreChannelsFromSnapshot(serverSnap, inNewRaftCall) + return err } func (r *raftFSM) restoreClientsFromSnapshot(serverSnap *spb.RaftSnapshot) error { @@ -314,9 +360,11 @@ func (r *raftFSM) restoreClientsFromSnapshot(serverSnap *spb.RaftSnapshot) error return nil } -func (r *raftFSM) restoreChannelsFromSnapshot(serverSnap *spb.RaftSnapshot, inNewRaftCall bool) error { +func (r *raftFSM) restoreChannelsFromSnapshot(serverSnap *spb.RaftSnapshot, inNewRaftCall bool) (bool, error) { s := r.server + shouldSnapshot := false + var channelsBeforeRestore map[string]*channel if !inNewRaftCall { channelsBeforeRestore = s.channels.getAll() @@ -324,33 +372,52 @@ func (r *raftFSM) restoreChannelsFromSnapshot(serverSnap *spb.RaftSnapshot, inNe for _, sc := range serverSnap.Channels { c, err := s.lookupOrCreateChannel(sc.Channel) if err != nil { - return err + return false, err } - // Do not restore messages from snapshot if the server - // just started and is recovering from its own snapshot. - if !inNewRaftCall { - if err := r.restoreMsgsFromSnapshot(c, sc.First, sc.Last); err != nil { - return err + // Keep track of first/last sequence in this channel's snapshot. + // It can help to detect that all messages in the leader have expired + // and this is what we should report/use as our store first/last. + atomic.StoreUint64(&c.firstSeq, sc.First) + + // Even on startup (when inNewRaftCall==true), we need to call + // restoreMsgsFromSnapshot() to make sure our store is consistent. + // If we skip it (like we used to), then it is possible that this + // node misses some messages from the snapshot and will then start + // to apply remaining logs. It could even then become leader in + // the process with missing messages. So if we need to restore some + // messages and fail, keep trying until a leader is avail and + // able to serve this node. + ok := false + for i := 0; i < restoreMsgsAttempts; i++ { + if err := r.restoreMsgsFromSnapshot(c, sc.First, sc.Last, false); err != nil { + s.log.Errorf("channel %q - unable to restore messages, can't start until leader is available", + sc.Channel) + time.Sleep(restoreMsgsSleepBetweenAttempts) + } else { + ok = true + break } - delete(channelsBeforeRestore, sc.Channel) } - // Only set nextSequence if new value is greater than what - // is currently set. - if c.nextSequence <= sc.Last { - c.nextSequence = sc.Last + 1 - if sc.Last > 0 { - // If store's seq is 0 but sc.Last is not, it likely means - // that all messages expired and our store is empty. We - // need to report to monitoring page the value of nextSequence - // instead of 0. - // We use a different field in the channel structure that - // uses atomic since it can be read concurrently in monitor's - // channelsz endpoint. - if seq, _ := c.store.Msgs.FirstSequence(); seq == 0 { - atomic.StoreUint64(&c.firstSeq, c.nextSequence) - } - } + if !ok { + err = fmt.Errorf("channel %q - unable to restore messages, aborting", sc.Channel) + s.log.Fatalf(err.Error()) + // In tests, we use a "dummy" logger, so process will not exit + // (and we would not want that anyway), so make sure we return + // an error. + return false, err } + if !inNewRaftCall { + delete(channelsBeforeRestore, sc.Channel) + } + // restoreMsgsFromSnapshot may have updated c.firstSeq. If that is the + // case it means that when restoring, we realized that some messages + // have expired/removed. If so, try to do a snapshot once we have + // finished with restoring. + shouldSnapshot = atomic.LoadUint64(&c.firstSeq) != sc.First + + // Ensure we check store last sequence before storing msgs in Apply(). + c.lSeqChecked = false + for _, ss := range sc.Subscriptions { s.recoverOneSub(c, ss.State, nil, ss.AcksPending) c.ss.Lock() @@ -375,33 +442,61 @@ func (r *raftFSM) restoreChannelsFromSnapshot(serverSnap *spb.RaftSnapshot, inNe s.processDeleteChannel(name) } } - return nil + return shouldSnapshot, nil } -func (r *raftFSM) restoreMsgsFromSnapshot(c *channel, first, last uint64) error { - storeFirst, storeLast, err := c.store.Msgs.FirstAndLastSequence() +func (r *raftFSM) restoreMsgsFromSnapshot(c *channel, first, last uint64, fromApply bool) error { + storeLast, err := c.store.Msgs.LastSequence() if err != nil { return err } - // If the leader's first sequence is more than our lastSequence+1, - // then we need to empty the store. We don't want to have gaps. - // Same if our first is strictly greater than the leader, or our - // last sequence is more than the leader - if first > storeLast+1 || storeFirst > first || storeLast > last { - if err := c.store.Msgs.Empty(); err != nil { - return err + + // We are here in several situations: + // - on startup, we may have a snapshot with a channel first/last index. + // It does not mean that there should not be more messages in the channel, + // and there may be raft logs to apply following this snapshot. But + // we still want to make sure that our store is consistent. + // - at runtime, we have fallen behind and the leader sent us a snapshot. + // we care about restoring/fetching only messages past `storeLast`+1 up + // to `last` + // - in Apply(), we are storing a message and this function is invoked with + // `last` equal to that message sequence - 1. We need to possibly fetch + // messages from `storeLast`+1 up to `last`. + + // If we are invoked from Apply(), we are about to store a message + // at sequence `last`+1, so we want to ensure that we have all messages + // from `storeLast`+1 to `last` included. Set `first` to `storeLast`+1. + if fromApply { + first = storeLast + 1 + } + + // If the first sequence in the channel is past our store last sequence + 1, + // then we need to empty our store. + if first > storeLast+1 { + if storeLast != 0 { + if err := c.store.Msgs.Empty(); err != nil { + return err + } } - } else if storeLast == last { - // We may have a message with lower sequence than the leader, - // but our last sequence is the same, so nothing to do. - return nil - } else if storeLast > 0 { - // first is less than what we already have, just started - // at our next sequence. + } else if first <= storeLast { + // Start to store at storeLast+1 first = storeLast + 1 } - // All messages were expired, we still need to save some information - // that will allow us to remember the last sequence. + + // With the above done, we now need to make sure that we don't try + // to restore from a too old position. If `first` is smaller than + // snapFSeq, then set the floor at that value. + // Note that c.firstSeq is only set from a thread invoking this + // function or within this function itself. However, it is read + // from other threads, so use atomic operation here for consistency, + // but not really required. + if fseq := atomic.LoadUint64(&c.firstSeq); fseq > first { + first = fseq + } + + // Now check if we have anything to do. This condition will be true + // if we try to restore from a channel where all messages have expired + // or if we have all messages we need. if first > last { return nil } @@ -423,6 +518,7 @@ func (r *raftFSM) restoreMsgsFromSnapshot(c *channel, first, last uint64) error reqEnd uint64 batch = uint64(100) halfBatch = batch / 2 + stored = false ) for seq := first; seq <= last; seq++ { if seq == reqNext { @@ -440,7 +536,7 @@ func (r *raftFSM) restoreMsgsFromSnapshot(c *channel, first, last uint64) error reqStart = reqEnd + 1 } } - resp, err := sub.NextMsg(2 * time.Second) + resp, err := sub.NextMsg(restoreMsgsRcvTimeout) if err != nil { return err } @@ -455,6 +551,19 @@ func (r *raftFSM) restoreMsgsFromSnapshot(c *channel, first, last uint64) error if _, err := c.store.Msgs.Store(msg); err != nil { return err } + stored = true + } else { + // Since this message is not in the leader, it means that + // messages may have expired or be removed due to limit, + // so we need to empty our store before storing the next + // valid message. + if seq == first || stored { + if err := c.store.Msgs.Empty(); err != nil { + return err + } + stored = false + } + atomic.StoreUint64(&c.firstSeq, seq+1) } select { case <-r.server.shutdownCh: