From d42951d0947a006bed7269acc85e8f5031953320 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 15 May 2019 18:14:18 -0600 Subject: [PATCH 1/2] [FIXED] Clustering: channel first/last sequence may fall to zero This could happen if the leader took a snapshot while messages were not yet expired, then a node is started without state and tries to restore from this snapshot. If the messages have expired by then, no message would be stored. If that node later did a snapshot itself, it would persist in it the first/last being zero. If no message are published and this node becomes leader, it would start storing messages at the wrong sequence and would also send the bad snapshot to other nodes. Resolves #833 Signed-off-by: Ivan Kozlovic --- server/clustering.go | 55 ++++++-- server/clustering_test.go | 278 ++++++++++++++++++++++++++++++++------ server/monitor.go | 9 +- server/server.go | 9 +- server/snapshot.go | 141 +++++++++++++------ 5 files changed, 385 insertions(+), 107 deletions(-) diff --git a/server/clustering.go b/server/clustering.go index 9ae5c449..1f2dec07 100644 --- a/server/clustering.go +++ b/server/clustering.go @@ -20,6 +20,7 @@ import ( "os" "path/filepath" "sync" + "sync/atomic" "time" "github.com/hashicorp/raft" @@ -446,6 +447,32 @@ func (s *StanServer) getClusteringPeerAddr(raftName, nodeID string) string { return fmt.Sprintf("%s.%s.%s", s.opts.ID, nodeID, raftName) } +// Returns the message store first and last sequence. +// When in clustered mode, if the first and last are 0, returns the value of +// the last sequence that we possibly got from the last snapshot. If a node +// restores a snapshot that let's say has first=1 and last=100, but when it +// tries to get these messages from the leader, the leader does not send them +// back because they have all expired, the node will not store anything. +// If we just rely on store's first/last, this node would use and report 0 +// for channel's first and last while when all messages have expired, it should +// be last+1/last. +func (s *StanServer) getChannelFirstAndlLastSeq(c *channel) (uint64, uint64, error) { + first, last, err := c.store.Msgs.FirstAndLastSequence() + if !s.isClustered { + return first, last, err + } + if err != nil { + return 0, 0, err + } + if first == 0 && last == 0 { + if fseq := atomic.LoadUint64(&c.firstSeq); fseq != 0 { + first = fseq + last = fseq - 1 + } + } + return first, last, nil +} + // Apply log is invoked once a log entry is committed. // It returns a value which will be made available in the // ApplyFuture returned by Raft.Apply method if that @@ -456,13 +483,16 @@ func (r *raftFSM) Apply(l *raft.Log) interface{} { if err := op.Unmarshal(l.Data); err != nil { panic(err) } + // We don't want snapshot Persist() and Apply() to execute concurrently, + // so use common lock. + r.Lock() + defer r.Unlock() switch op.OpType { case spb.RaftOperation_Publish: // Message replication. var ( - c *channel - err error - lastSeq uint64 + c *channel + err error ) for _, msg := range op.PublishBatch.Messages { // This is a batch for a given channel, so lookup channel once. @@ -472,18 +502,25 @@ func (r *raftFSM) Apply(l *raft.Log) interface{} { // just bail out. if err == ErrChanDelInProgress { return nil + } else if err == nil && !c.lSeqChecked { + // If msg.Sequence is > 1, then make sure we have no gap. + if msg.Sequence > 1 { + // We pass `1` for the `first` sequence. The function we call + // will do the right thing when it comes to restore possible + // missing messages. + err = s.raft.fsm.restoreMsgsFromSnapshot(c, 1, msg.Sequence-1, true) + } + if err == nil { + c.lSeqChecked = true + } } - lastSeq, err = c.store.Msgs.LastSequence() - } - if err == nil && lastSeq < msg.Sequence-1 { - err = s.raft.fsm.restoreMsgsFromSnapshot(c, lastSeq+1, msg.Sequence-1) } if err == nil { _, err = c.store.Msgs.Store(msg) } if err != nil { - return fmt.Errorf("failed to store replicated message %d on channel %s: %v", - msg.Sequence, msg.Subject, err) + panic(fmt.Errorf("failed to store replicated message %d on channel %s: %v", + msg.Sequence, msg.Subject, err)) } } return nil diff --git a/server/clustering_test.go b/server/clustering_test.go index e0d52e54..5334d43f 100644 --- a/server/clustering_test.go +++ b/server/clustering_test.go @@ -68,7 +68,7 @@ func shutdownAndCleanupState(t *testing.T, s *StanServer, nodeID string) { os.RemoveAll(filepath.Join(defaultDataStore, nodeID)) os.RemoveAll(filepath.Join(defaultRaftLog, nodeID)) case stores.TypeSQL: - test.CleanupSQLDatastore(t, testSQLDriver, testSQLSource+nodeID) + test.CleanupSQLDatastore(t, testSQLDriver, testSQLSource+"_"+nodeID) default: t.Fatalf("This test needs to be updated for store type: %v", persistentStoreType) } @@ -4713,18 +4713,19 @@ func TestClusteringSubDontStallDueToMsgExpiration(t *testing.T) { } } - checkReceived := func() { + checkReceived := func(t *testing.T) { + t.Helper() select { case <-ch: case <-time.After(2 * time.Second): t.Fatalf("Failed to receive messages") } } - checkReceived() + checkReceived(t) - // We aremove all state from node "b", but even if we didn't, on restart, + // We remove all state from node "b", but even if we didn't, on restart, // since "b" store would be behind the rest, it would be emptied because - // the current first message is move than the "b"'s last sequence. + // the current first message is more than the "b"'s last sequence. shutdownAndCleanupState(t, s2, "b") for i := 0; i < secondBatch; i++ { @@ -4733,7 +4734,7 @@ func TestClusteringSubDontStallDueToMsgExpiration(t *testing.T) { } } - checkReceived() + checkReceived(t) // Wait for messages to expire time.Sleep(100 * time.Millisecond) @@ -4795,12 +4796,133 @@ func TestClusteringSubDontStallDueToMsgExpiration(t *testing.T) { if err := sc.Publish("foo", []byte("hello")); err != nil { t.Fatalf("Error on publish: %v", err) } - checkReceived() + checkReceived(t) sc.Close() s3.Shutdown() } +func TestClusteringStoreFirstLastDontFallToZero(t *testing.T) { + resetPreviousHTTPConnections() + cleanupDatastore(t) + defer cleanupDatastore(t) + cleanupRaftLog(t) + defer cleanupRaftLog(t) + + // For this test, use a central NATS server. + ns := natsdTest.RunDefaultServer() + defer ns.Shutdown() + + ttl := 100 * time.Millisecond + + // Configure first server + s1sOpts := getTestDefaultOptsForClustering("a", false) + s1sOpts.Clustering.Peers = []string{"a", "b", "c"} + s1sOpts.MaxAge = ttl + s1 := runServerWithOpts(t, s1sOpts, nil) + defer s1.Shutdown() + + // Configure second server. + s2sOpts := getTestDefaultOptsForClustering("b", false) + s2sOpts.Clustering.Peers = []string{"a", "b", "c"} + s2sOpts.MaxAge = ttl + s2 := runServerWithOpts(t, s2sOpts, nil) + defer s2.Shutdown() + + getLeader(t, 10*time.Second, s1, s2) + + sc := NewDefaultConnection(t) + defer sc.Close() + for i := 0; i < 10; i++ { + if err := sc.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + } + sc.Close() + + time.Sleep(200 * time.Millisecond) + + if err := s1.raft.Snapshot().Error(); err != nil { + t.Fatalf("Error on snapshot: %v", err) + } + if err := s2.raft.Snapshot().Error(); err != nil { + t.Fatalf("Error on snapshot: %v", err) + } + + // Configure third server. + s3sOpts := getTestDefaultOptsForClustering("c", false) + s3sOpts.Clustering.Peers = []string{"a", "b", "c"} + s3sOpts.MaxAge = ttl + s3 := runServerWithOpts(t, s3sOpts, nil) + defer s3.Shutdown() + + // Wait for "foo" to be re-created on node "c" and we get + // the proper first/last + waitFor(t, 5*time.Second, 15*time.Millisecond, func() error { + s3.channels.RLock() + c, ok := s3.channels.channels["foo"] + s3.channels.RUnlock() + if !ok { + return fmt.Errorf("Channel foo still not created") + } + // We need to grab the FSM lock to safely access this field. + if _, lastSeq, _ := s3.getChannelFirstAndlLastSeq(c); lastSeq != 10 { + t.Fatalf("Expected lastSeq to be 10, got %v", lastSeq) + } + return nil + }) + + if err := s3.raft.Snapshot().Error(); err != nil { + t.Fatalf("Error on snapshot: %v", err) + } + + shutdownAndCleanupState(t, s1, "a") + shutdownAndCleanupState(t, s2, "b") + + // Restart s1 + s1nOpts := defaultMonitorOptions + s1 = runServerWithOpts(t, s1sOpts, &s1nOpts) + defer s1.Shutdown() + + leader := getLeader(t, 10*time.Second, s1, s3) + if leader != s3 { + t.Fatalf("s3 should have been leader") + } + + waitFor(t, 5*time.Second, 15*time.Millisecond, func() error { + s1.channels.RLock() + _, ok := s1.channels.channels["foo"] + s1.channels.RUnlock() + if !ok { + return fmt.Errorf("Channel foo still not created") + } + return nil + }) + + shutdownAndCleanupState(t, s3, "c") + s3 = runServerWithOpts(t, s3sOpts, nil) + defer s3.Shutdown() + + leader = getLeader(t, 10*time.Second, s1, s3) + if leader != s1 { + t.Fatalf("s1 should have been leader") + } + + resp, body := getBody(t, ChannelsPath+"?channel=foo", expectedJSON) + defer resp.Body.Close() + + cz := Channelz{} + if err := json.Unmarshal(body, &cz); err != nil { + t.Fatalf("Got an error unmarshalling the body: %v", err) + } + resp.Body.Close() + if cz.FirstSeq != 11 || cz.LastSeq != 10 { + t.Fatalf("Expected first/last seq to be 11, 10, got %v, %v", + cz.FirstSeq, cz.LastSeq) + } + leader.Shutdown() +} + func TestClusteringNoRaceOnChannelMonitor(t *testing.T) { resetPreviousHTTPConnections() cleanupDatastore(t) @@ -5541,16 +5663,50 @@ func TestClusteringSubSentAckReplResumeOnClusterRestart(t *testing.T) { sc.Close() } -type msgStoreDoesntFlush struct { +// This mocked MsgStore servers two purposes, to make sure +// that Flush() is invoked when a Snapshot is done and will +// simulate not recovering all messages (done by skipping +// storing some). +type msgStoreCaptureFlush struct { + sync.Mutex stores.MsgStore + firstSeq uint64 + lastSeq uint64 + ch chan struct{} } -func (s *msgStoreDoesntFlush) Store(m *pb.MsgProto) (uint64, error) { +func (s *msgStoreCaptureFlush) Store(m *pb.MsgProto) (uint64, error) { // To simulate a no flush, we are actually skipping storing // the message. + if s.firstSeq == 0 || m.Sequence < s.firstSeq { + s.firstSeq = m.Sequence + } + if m.Sequence > s.lastSeq { + s.lastSeq = m.Sequence + } return m.Sequence, nil } +func (s *msgStoreCaptureFlush) FirstAndLastSequence() (uint64, uint64, error) { + return s.firstSeq, s.lastSeq, nil +} + +func (s *msgStoreCaptureFlush) LastSequence() (uint64, error) { + return s.lastSeq, nil +} + +func (s *msgStoreCaptureFlush) Flush() error { + s.Lock() + if s.ch != nil { + select { + case s.ch <- struct{}{}: + default: + } + } + s.Unlock() + return s.MsgStore.Flush() +} + func TestClusteringGapsAfterSnapshotAndNoFlush(t *testing.T) { cleanupDatastore(t) defer cleanupDatastore(t) @@ -5592,7 +5748,10 @@ func TestClusteringGapsAfterSnapshotAndNoFlush(t *testing.T) { c := s2.channels.get("foo") c.store.Msgs.Flush() // Replace with a store that does not write messages - c.store.Msgs = &msgStoreDoesntFlush{c.store.Msgs} + ms := &msgStoreCaptureFlush{MsgStore: c.store.Msgs, firstSeq: 1, lastSeq: 1} + s2.raft.fsm.Lock() + c.store.Msgs = ms + s2.raft.fsm.Unlock() for i := 0; i < 100; i++ { if err := sc.Publish("foo", []byte("hello")); err != nil { @@ -5600,9 +5759,21 @@ func TestClusteringGapsAfterSnapshotAndNoFlush(t *testing.T) { } } + ms.Lock() + fch := make(chan struct{}, 1) + ms.ch = fch + ms.Unlock() + if err := s2.raft.Snapshot().Error(); err != nil { t.Fatalf("Error on snapshot: %v", err) } + // Make sure that store was flushed + select { + case <-fch: + case <-time.After(2 * time.Second): + t.Fatalf("MsgStore was not flushed during snapshot") + } + s2.Shutdown() for i := 0; i < 10; i++ { @@ -5627,6 +5798,8 @@ func TestClusteringGapsAfterSnapshotAndNoFlush(t *testing.T) { s1.Shutdown() s3.Shutdown() + // Restart in non cluster mode and consume all messages from S2. + // Make sure that there is not one with empty content. s2.Shutdown() s2sOpts.Clustering.Clustered = false s2 = runServerWithOpts(t, s2sOpts, nil) @@ -5744,13 +5917,7 @@ func TestClusteringRestoreSnapshotWithSomeMsgsNoLongerAvail(t *testing.T) { s2 := runServerWithOpts(t, s2sOpts, nil) defer s2.Shutdown() - // Configure third server. - s3sOpts := getTestDefaultOptsForClustering("c", false) - s3sOpts.MaxMsgs = 10 - s3 := runServerWithOpts(t, s3sOpts, nil) - defer s3.Shutdown() - - getLeader(t, 10*time.Second, s1, s2, s3) + getLeader(t, 10*time.Second, s1, s2) sc := NewDefaultConnection(t) defer sc.Close() @@ -5766,40 +5933,61 @@ func TestClusteringRestoreSnapshotWithSomeMsgsNoLongerAvail(t *testing.T) { t.Fatalf("Error on snapshot: %v", err) } - // Shutdown s3 and cleanup its state so it will have nothing in its stores - // and will restore from the snapshot. - shutdownAndCleanupState(t, s3, "c") + // We are going to check many different cases. + // 1- all messages in the snapshot are restored + // 2- some messages in the snapshot are restored + // 3- no message in the snapshot are restored - // Send 2 more messages that will make messages 1 and 2 disappear + check := func(t *testing.T, expectedFirst, expectedLast uint64) { + t.Helper() + s3sOpts := getTestDefaultOptsForClustering("c", false) + s3sOpts.MaxMsgs = 10 + s3 := runServerWithOpts(t, s3sOpts, nil) + defer func() { + // Shutdown s3 and cleanup its state so it will have nothing in its stores + // and will restore from the snapshot. + shutdownAndCleanupState(t, s3, "c") + }() + + waitFor(t, 5*time.Second, 15*time.Millisecond, func() error { + c := s3.channels.get("foo") + if c == nil { + return fmt.Errorf("Channel foo not recreated yet") + } + first, last, err := s3.getChannelFirstAndlLastSeq(c) + if err != nil { + return fmt.Errorf("Error getting first/last seq: %v", err) + } + if first != expectedFirst { + return fmt.Errorf("Expected first to be %v, got %v", expectedFirst, first) + } + if last != expectedLast { + return fmt.Errorf("Expected last to be %v, got %v", expectedLast, last) + } + return nil + }) + } + + // 1- all messages in the snapshot are restored + check(t, 1, 10) + + // 2- some messages in the snapshot are restored + // To do so, send 2 more messages what will make messages 1 and 2 disappear for i := 0; i < 2; i++ { if err := sc.Publish("foo", []byte("hello")); err != nil { t.Fatalf("Error on publish: %v", err) } } - sc.Close() + check(t, 3, 12) - // Start s3. It will restore from the snapshot that says that - // channel has message 1 to 10, and then should receive 2 raft - // logs with messages 11 and 12. - // When s3 will ask the leader for messages 1 and 2, it should - // get empty responses indicating that these messages are gone, - // but should be able to request messages 3 to 10, then 11 and - // 12 will be replayed from raft logs. - s3 = runServerWithOpts(t, s3sOpts, nil) - defer s3.Shutdown() - - waitFor(t, 5*time.Second, 15*time.Millisecond, func() error { - c := s3.channels.get("foo") - if c == nil { - return fmt.Errorf("Channel foo not recreated yet") - } - first, last, err := c.store.Msgs.FirstAndLastSequence() - if err != nil { - return fmt.Errorf("Error getting first/last seq: %v", err) - } - if first != 3 || last != 12 { - return fmt.Errorf("Expected first=3 last=12, got %v and %v", first, last) + // 3- no message in the snapshot are restored + // To do so, send enough messages so that the original 10 are gone. + for i := 0; i < 10; i++ { + if err := sc.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) } - return nil - }) + } + check(t, 13, 22) + + sc.Close() } diff --git a/server/monitor.go b/server/monitor.go index a1b7b544..4aca994c 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -21,7 +21,6 @@ import ( "runtime" "sort" "strconv" - "sync/atomic" "time" gnatsd "github.com/nats-io/gnatsd/server" @@ -501,18 +500,12 @@ func (s *StanServer) updateChannelz(cz *Channelz, c *channel, subsOption int) er if err != nil { return fmt.Errorf("unable to get message state: %v", err) } - fseq, lseq, err := c.store.Msgs.FirstAndLastSequence() + fseq, lseq, err := s.getChannelFirstAndlLastSeq(c) if err != nil { return fmt.Errorf("unable to get first and last sequence: %v", err) } cz.Msgs = msgs cz.Bytes = bytes - if fseq == 0 && lseq == 0 && s.isClustered { - fseq = atomic.LoadUint64(&c.firstSeq) - if fseq > 1 { - lseq = fseq - 1 - } - } cz.FirstSeq = fseq cz.LastSeq = lseq if subsOption == 1 { diff --git a/server/server.go b/server/server.go index 9003a4a6..934da41c 100644 --- a/server/server.go +++ b/server/server.go @@ -436,6 +436,11 @@ type channel struct { stan *StanServer activity *channelActivity nextSubID uint64 + + // Used in cluster mode. This is to know if the message store + // last sequence should be checked before storing a message in + // Apply(). Protected by the raft's FSM lock. + lSeqChecked bool } type channelActivity struct { @@ -2002,12 +2007,10 @@ func (s *StanServer) leadershipAcquired() error { channels := s.channels.getAll() for _, c := range channels { // Update next sequence to assign. - lastSequence, err := c.store.Msgs.LastSequence() + _, lastSequence, err := s.getChannelFirstAndlLastSeq(c) if err != nil { return err } - // It is possible that nextSequence be set when restoring - // from snapshots. Set it to the max value. if c.nextSequence <= lastSequence { c.nextSequence = lastSequence + 1 } diff --git a/server/snapshot.go b/server/snapshot.go index 868d611d..3e501da3 100644 --- a/server/snapshot.go +++ b/server/snapshot.go @@ -53,6 +53,11 @@ func (s *serverSnapshot) Persist(sink raft.SnapshotSink) (err error) { snap := &spb.RaftSnapshot{} + // We don't want Persit() and Apply() to be invoked concurrently, + // so use common lock. + s.raft.fsm.Lock() + defer s.raft.fsm.Unlock() + s.snapshotClients(snap, sink) if err := s.snapshotChannels(snap); err != nil { @@ -150,7 +155,7 @@ func (s *serverSnapshot) snapshotChannels(snap *spb.RaftSnapshot) error { if err := c.store.Msgs.Flush(); err != nil { return err } - first, last, err := c.store.Msgs.FirstAndLastSequence() + first, last, err := s.getChannelFirstAndlLastSeq(c) if err != nil { return err } @@ -256,7 +261,13 @@ func (r *raftFSM) Restore(snapshot io.ReadCloser) (retErr error) { }() } else { s.log.Noticef("restoring from snapshot") - defer s.log.Noticef("done restoring from snapshot") + defer func() { + if retErr == nil { + s.log.Noticef("done restoring from snapshot") + } else { + s.log.Errorf("error restoring from snapshot: %v", retErr) + } + }() } // We need to drop current state. The server will recover from snapshot @@ -326,31 +337,35 @@ func (r *raftFSM) restoreChannelsFromSnapshot(serverSnap *spb.RaftSnapshot, inNe if err != nil { return err } - // Do not restore messages from snapshot if the server - // just started and is recovering from its own snapshot. - if !inNewRaftCall { - if err := r.restoreMsgsFromSnapshot(c, sc.First, sc.Last); err != nil { - return err + // Keep track of first/last sequence in this channel's snapshot. + // It can help to detect that all messages in the leader have expired + // and this is what we should report/use as our store first/last. + // This is running under the FSM lock which protects this field. + atomic.StoreUint64(&c.firstSeq, sc.First) + + // Even on startup (when inNewRaftCall==true), we need to call + // restoreMsgsFromSnapshot() to make sure our store is consistent. + // If we skip it (like we use to), then it is possible that this + // node misses some messages from the snapshot and will then start + // to apply remaining logs. It could even then become leader in + // the process with missing messages. So if we need to restore some + // messages and fail, keep trying until a leader is avail and + // able to serve this node. + for { + if err := r.restoreMsgsFromSnapshot(c, sc.First, sc.Last, false); err != nil { + s.log.Errorf("unable to restore messages, can't start until leader is available") + time.Sleep(time.Second) + } else { + break } - delete(channelsBeforeRestore, sc.Channel) } - // Only set nextSequence if new value is greater than what - // is currently set. - if c.nextSequence <= sc.Last { - c.nextSequence = sc.Last + 1 - if sc.Last > 0 { - // If store's seq is 0 but sc.Last is not, it likely means - // that all messages expired and our store is empty. We - // need to report to monitoring page the value of nextSequence - // instead of 0. - // We use a different field in the channel structure that - // uses atomic since it can be read concurrently in monitor's - // channelsz endpoint. - if seq, _ := c.store.Msgs.FirstSequence(); seq == 0 { - atomic.StoreUint64(&c.firstSeq, c.nextSequence) - } - } + if !inNewRaftCall { + delete(channelsBeforeRestore, sc.Channel) } + + // Ensure we check store last sequence before storing msgs in Apply(). + c.lSeqChecked = false + for _, ss := range sc.Subscriptions { s.recoverOneSub(c, ss.State, nil, ss.AcksPending) c.ss.Lock() @@ -378,30 +393,58 @@ func (r *raftFSM) restoreChannelsFromSnapshot(serverSnap *spb.RaftSnapshot, inNe return nil } -func (r *raftFSM) restoreMsgsFromSnapshot(c *channel, first, last uint64) error { - storeFirst, storeLast, err := c.store.Msgs.FirstAndLastSequence() +func (r *raftFSM) restoreMsgsFromSnapshot(c *channel, first, last uint64, fromApply bool) error { + storeLast, err := c.store.Msgs.LastSequence() if err != nil { return err } - // If the leader's first sequence is more than our lastSequence+1, - // then we need to empty the store. We don't want to have gaps. - // Same if our first is strictly greater than the leader, or our - // last sequence is more than the leader - if first > storeLast+1 || storeFirst > first || storeLast > last { - if err := c.store.Msgs.Empty(); err != nil { - return err + + // We are here in several situations: + // - on startup, we may have a snapshot with a channel first/last index. + // It does not mean that there should not be more messages in the channel, + // and there may be raft logs to apply following this snapshot. But + // we still want to make sure that our store is consistent. + // - at runtime, we have fallen behind and the leader sent us a snapshot. + // we care about restoring/fetching only messages past `storeLast`+1 up + // to `last` + // - in Apply(), we are storing a message and this function is invoked with + // `last` equal to that message sequence - 1. We need to possibly fetch + // messages from `storeLast`+1 up to `last`. + + // If we are invoked from Apply(), we are about to store a message + // at sequence `last`+1, so we want to ensure that we have all messages + // from `storeLast`+1 to `last` included. Set `first` to `storeLast`+1. + if fromApply { + first = storeLast + 1 + } + + // If the first sequence in the channel is past our store last sequence + 1, + // then we need to empty our store. + if first > storeLast+1 { + if storeLast != 0 { + if err := c.store.Msgs.Empty(); err != nil { + return err + } } - } else if storeLast == last { - // We may have a message with lower sequence than the leader, - // but our last sequence is the same, so nothing to do. - return nil - } else if storeLast > 0 { - // first is less than what we already have, just started - // at our next sequence. + } else if first <= storeLast { + // Start to store at storeLast+1 first = storeLast + 1 } - // All messages were expired, we still need to save some information - // that will allow us to remember the last sequence. + + // With the above done, we now need to make sure that we don't try + // to restore from a too old position. If `first` is smaller than + // snapFSeq, then set the floor at that value. + // Note that c.firstSeq is only set from a thread invoking this + // function or within this function itself. However, it is read + // from other threads, so use atomic operation here for consistency, + // but not really required. + if fseq := atomic.LoadUint64(&c.firstSeq); fseq > first { + first = fseq + } + + // Now check if we have anything to do. This condition will be true + // if we try to restore from a channel where all messages have expired + // or if we have all messages we need. if first > last { return nil } @@ -423,6 +466,7 @@ func (r *raftFSM) restoreMsgsFromSnapshot(c *channel, first, last uint64) error reqEnd uint64 batch = uint64(100) halfBatch = batch / 2 + stored = false ) for seq := first; seq <= last; seq++ { if seq == reqNext { @@ -455,6 +499,19 @@ func (r *raftFSM) restoreMsgsFromSnapshot(c *channel, first, last uint64) error if _, err := c.store.Msgs.Store(msg); err != nil { return err } + stored = true + } else { + // Since this message is not in the leader, it means that + // messages may have expired or be removed due to limit, + // so we need to empty our store before storing the next + // valid message. + if seq == first || stored { + if err := c.store.Msgs.Empty(); err != nil { + return err + } + stored = false + } + atomic.StoreUint64(&c.firstSeq, seq+1) } select { case <-r.server.shutdownCh: From cff1dfe4932501d8181a69452495c8bcdf24a18a Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Sun, 19 May 2019 14:28:20 -0600 Subject: [PATCH 2/2] Updates based on code review - bail out after a number of failed attempts to restore msgs - create snapshot on success if restore msgs indicates that first in channel has moved. Signed-off-by: Ivan Kozlovic --- server/clustering_test.go | 213 ++++++++++++++++++++++++++++++++++++++ server/snapshot.go | 72 +++++++++++-- 2 files changed, 275 insertions(+), 10 deletions(-) diff --git a/server/clustering_test.go b/server/clustering_test.go index 5334d43f..d70f638a 100644 --- a/server/clustering_test.go +++ b/server/clustering_test.go @@ -5991,3 +5991,216 @@ func TestClusteringRestoreSnapshotWithSomeMsgsNoLongerAvail(t *testing.T) { sc.Close() } + +func TestClusteringRestoreSnapshotCreateSnapshotAfterMsgsExpired(t *testing.T) { + // This test checks that when a node restores from a snapshot + // that channel has messages from 1 to 10, but when fetching them + // realize that they have expired, it will create its own snapshot + // to reflect the new state so that if it were to restart, it + // would not need to try to restore those known expired messages. + cleanupDatastore(t) + defer cleanupDatastore(t) + cleanupRaftLog(t) + defer cleanupRaftLog(t) + + // For this test, use a central NATS server. + ns := natsdTest.RunDefaultServer() + defer ns.Shutdown() + + // Configure first server + s1sOpts := getTestDefaultOptsForClustering("a", true) + s1sOpts.MaxAge = 100 * time.Millisecond + s1 := runServerWithOpts(t, s1sOpts, nil) + defer s1.Shutdown() + + // Configure second server. + s2sOpts := getTestDefaultOptsForClustering("b", false) + s2sOpts.MaxAge = 100 * time.Millisecond + s2 := runServerWithOpts(t, s2sOpts, nil) + defer s2.Shutdown() + + // Configure third server. + s3sOpts := getTestDefaultOptsForClustering("c", false) + s3sOpts.MaxAge = 100 * time.Millisecond + s3 := runServerWithOpts(t, s3sOpts, nil) + defer s3.Shutdown() + + getLeader(t, 10*time.Second, s1, s2, s3) + + sc := NewDefaultConnection(t) + defer sc.Close() + + for i := 0; i < 10; i++ { + if err := sc.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + } + sc.Close() + + // Create a snapshot that will indicate that there is messages from 1 to 10. + if err := s1.raft.Snapshot().Error(); err != nil { + t.Fatalf("Error on snapshot: %v", err) + } + + // Wait for msgs to expire + waitFor(t, time.Second, 50*time.Millisecond, func() error { + c := s1.channels.get("foo") + n, _, _ := c.store.Msgs.State() + if n != 0 { + return fmt.Errorf("Not all messages expired, still %v", n) + } + return nil + }) + + // Restart s3 server, wait for it to report that there are no message (all expired) + s3.Shutdown() + s3 = runServerWithOpts(t, s3sOpts, nil) + defer s3.Shutdown() + + waitFor(t, 5*time.Second, 15*time.Millisecond, func() error { + c := s3.channels.get("foo") + if c == nil { + return fmt.Errorf("channel still not created") + } + first, last, _ := s3.getChannelFirstAndlLastSeq(c) + if first != 11 || last != 10 { + return fmt.Errorf("first and last should be 11 - 10, got %v - %v", first, last) + } + return nil + }) + + // Now stop all servers, and restart s3. If s3 has performed its own + // snapshot, it should be able to start on its own. + s1.Shutdown() + s2.Shutdown() + s3.Shutdown() + + errCh := make(chan error, 1) + go func() { + s, err := RunServerWithOpts(s3sOpts, nil) + if s != nil { + s.Shutdown() + } + errCh <- err + }() + + select { + case <-time.After(5 * time.Second): + t.Fatalf("Server is stuck starting") + case e := <-errCh: + if e != nil { + t.Fatalf(e.Error()) + } + } +} + +func TestClusteringRestoreSnapshotMsgsBailIfNoLeader(t *testing.T) { + if persistentStoreType != stores.TypeFile { + t.Skip("test works only for file stores") + } + restoreMsgsAttempts = 5 + restoreMsgsRcvTimeout = 50 * time.Millisecond + restoreMsgsSleepBetweenAttempts = 10 * time.Millisecond + defer func() { + restoreMsgsAttempts = defaultRestoreMsgsAttempts + restoreMsgsRcvTimeout = defaultRestoreMsgsRcvTimeout + restoreMsgsSleepBetweenAttempts = defaultRestoreMsgsSleepBetweenAttempts + }() + + cleanupDatastore(t) + defer cleanupDatastore(t) + cleanupRaftLog(t) + defer cleanupRaftLog(t) + + // For this test, use a central NATS server. + ns := natsdTest.RunDefaultServer() + defer ns.Shutdown() + + // Configure first server + s1sOpts := getTestDefaultOptsForClustering("a", false) + s1sOpts.Clustering.Peers = []string{"a", "b", "c"} + s1 := runServerWithOpts(t, s1sOpts, nil) + defer s1.Shutdown() + + // Configure second server. + s2sOpts := getTestDefaultOptsForClustering("b", false) + s2sOpts.Clustering.Peers = []string{"a", "b", "c"} + s2 := runServerWithOpts(t, s2sOpts, nil) + defer s2.Shutdown() + + leader := getLeader(t, 10*time.Second, s1, s2) + + sc := NewDefaultConnection(t) + defer sc.Close() + + for i := 0; i < 10; i++ { + if err := sc.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + } + sc.Close() + + // Create a snapshot that will indicate that there is messages from 1 to 10. + if err := leader.raft.Snapshot().Error(); err != nil { + t.Fatalf("Error on snapshot: %v", err) + } + + s3sOpts := getTestDefaultOptsForClustering("c", false) + s3sOpts.Clustering.Peers = []string{"a", "b", "c"} + s3 := runServerWithOpts(t, s3sOpts, nil) + defer s3.Shutdown() + + // Wait for this snapshot to be sent to s3 + waitFor(t, 5*time.Second, 15*time.Millisecond, func() error { + c := s3.channels.get("foo") + if c == nil { + return fmt.Errorf("channel still not created") + } + first, last, _ := s3.getChannelFirstAndlLastSeq(c) + if first != 1 || last != 10 { + return fmt.Errorf("first and last should be 1 - 10, got %v - %v", first, last) + } + return nil + }) + + snaps, err := ioutil.ReadDir(filepath.Join(defaultRaftLog, "c", clusterName, "snapshots")) + if err != nil { + t.Fatalf("Error reading snapshots directory: %v", err) + } + if len(snaps) == 0 { + t.Skip("Snapshot was not installed, skipping test") + } + + // Shutdown all servers, then restart s3. + s1.Shutdown() + s2.Shutdown() + s3.Shutdown() + + // Since s3 will have made a snapshot of its own, the only way it + // would get stuck waiting for a leader is if its store is not + // consistent with the snapshot info. So remove s3's message dat file. + if err := os.Remove(filepath.Join(defaultDataStore, "c", "foo", "msgs.1.dat")); err != nil { + t.Fatalf("error removing file: %v", err) + } + + errCh := make(chan error, 1) + go func() { + // We are expecting this to fail to start + s, err := RunServerWithOpts(s3sOpts, nil) + if err == nil { + s.Shutdown() + errCh <- fmt.Errorf("server did not fail to start") + return + } + errCh <- nil + }() + + select { + case <-time.After(time.Duration(restoreMsgsAttempts+2) * (restoreMsgsSleepBetweenAttempts + restoreMsgsRcvTimeout)): + t.Fatalf("Server should have exited after a certain number of failed attempts") + case e := <-errCh: + if e != nil { + t.Fatalf(e.Error()) + } + } +} diff --git a/server/snapshot.go b/server/snapshot.go index 3e501da3..1398f26f 100644 --- a/server/snapshot.go +++ b/server/snapshot.go @@ -26,6 +26,18 @@ import ( "github.com/nats-io/nats-streaming-server/util" ) +const ( + defaultRestoreMsgsAttempts = 10 + defaultRestoreMsgsRcvTimeout = 2 * time.Second + defaultRestoreMsgsSleepBetweenAttempts = time.Second +) + +var ( + restoreMsgsAttempts = defaultRestoreMsgsAttempts + restoreMsgsRcvTimeout = defaultRestoreMsgsRcvTimeout + restoreMsgsSleepBetweenAttempts = defaultRestoreMsgsSleepBetweenAttempts +) + // serverSnapshot implements the raft.FSMSnapshot interface by snapshotting // StanServer state. type serverSnapshot struct { @@ -234,6 +246,27 @@ func (r *raftFSM) Restore(snapshot io.ReadCloser) (retErr error) { r.Lock() defer r.Unlock() + shouldSnapshot := false + defer func() { + if retErr == nil && shouldSnapshot { + s.wg.Add(1) + go func() { + defer s.wg.Done() + // s.raft.Snapshot() is actually s.raft.Raft.Snapshot() + // (our raft structure embeds Raft). But it is set after + // the call to NewRaft() - which invokes this function on + // startup. However, this runs under the server's lock, so + // simply grab the lock to ensure that Snapshot() will be + // able to execute. + // Also don't use startGoRoutine() here since it needs the + // server lock, which we already have. + s.mu.Lock() + s.raft.Snapshot().Error() + s.mu.Unlock() + }() + } + }() + // This function may be invoked directly from raft.NewRaft() when // the node is initialized and if there were exisiting local snapshots, // or later, when catching up with a leader. We behave differently @@ -312,7 +345,9 @@ func (r *raftFSM) Restore(snapshot io.ReadCloser) (retErr error) { if err := r.restoreClientsFromSnapshot(serverSnap); err != nil { return err } - return r.restoreChannelsFromSnapshot(serverSnap, inNewRaftCall) + var err error + shouldSnapshot, err = r.restoreChannelsFromSnapshot(serverSnap, inNewRaftCall) + return err } func (r *raftFSM) restoreClientsFromSnapshot(serverSnap *spb.RaftSnapshot) error { @@ -325,9 +360,11 @@ func (r *raftFSM) restoreClientsFromSnapshot(serverSnap *spb.RaftSnapshot) error return nil } -func (r *raftFSM) restoreChannelsFromSnapshot(serverSnap *spb.RaftSnapshot, inNewRaftCall bool) error { +func (r *raftFSM) restoreChannelsFromSnapshot(serverSnap *spb.RaftSnapshot, inNewRaftCall bool) (bool, error) { s := r.server + shouldSnapshot := false + var channelsBeforeRestore map[string]*channel if !inNewRaftCall { channelsBeforeRestore = s.channels.getAll() @@ -335,33 +372,48 @@ func (r *raftFSM) restoreChannelsFromSnapshot(serverSnap *spb.RaftSnapshot, inNe for _, sc := range serverSnap.Channels { c, err := s.lookupOrCreateChannel(sc.Channel) if err != nil { - return err + return false, err } // Keep track of first/last sequence in this channel's snapshot. // It can help to detect that all messages in the leader have expired // and this is what we should report/use as our store first/last. - // This is running under the FSM lock which protects this field. atomic.StoreUint64(&c.firstSeq, sc.First) // Even on startup (when inNewRaftCall==true), we need to call // restoreMsgsFromSnapshot() to make sure our store is consistent. - // If we skip it (like we use to), then it is possible that this + // If we skip it (like we used to), then it is possible that this // node misses some messages from the snapshot and will then start // to apply remaining logs. It could even then become leader in // the process with missing messages. So if we need to restore some // messages and fail, keep trying until a leader is avail and // able to serve this node. - for { + ok := false + for i := 0; i < restoreMsgsAttempts; i++ { if err := r.restoreMsgsFromSnapshot(c, sc.First, sc.Last, false); err != nil { - s.log.Errorf("unable to restore messages, can't start until leader is available") - time.Sleep(time.Second) + s.log.Errorf("channel %q - unable to restore messages, can't start until leader is available", + sc.Channel) + time.Sleep(restoreMsgsSleepBetweenAttempts) } else { + ok = true break } } + if !ok { + err = fmt.Errorf("channel %q - unable to restore messages, aborting", sc.Channel) + s.log.Fatalf(err.Error()) + // In tests, we use a "dummy" logger, so process will not exit + // (and we would not want that anyway), so make sure we return + // an error. + return false, err + } if !inNewRaftCall { delete(channelsBeforeRestore, sc.Channel) } + // restoreMsgsFromSnapshot may have updated c.firstSeq. If that is the + // case it means that when restoring, we realized that some messages + // have expired/removed. If so, try to do a snapshot once we have + // finished with restoring. + shouldSnapshot = atomic.LoadUint64(&c.firstSeq) != sc.First // Ensure we check store last sequence before storing msgs in Apply(). c.lSeqChecked = false @@ -390,7 +442,7 @@ func (r *raftFSM) restoreChannelsFromSnapshot(serverSnap *spb.RaftSnapshot, inNe s.processDeleteChannel(name) } } - return nil + return shouldSnapshot, nil } func (r *raftFSM) restoreMsgsFromSnapshot(c *channel, first, last uint64, fromApply bool) error { @@ -484,7 +536,7 @@ func (r *raftFSM) restoreMsgsFromSnapshot(c *channel, first, last uint64, fromAp reqStart = reqEnd + 1 } } - resp, err := sub.NextMsg(2 * time.Second) + resp, err := sub.NextMsg(restoreMsgsRcvTimeout) if err != nil { return err }