From 72b0ebb115b74c9396c8c1a6eda95d13f2c4ac5e Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 29 Apr 2019 16:11:41 -0600 Subject: [PATCH] [FIXED] Clustering: possibly getting empty messages Resolves #814 Signed-off-by: Ivan Kozlovic --- server/clustering.go | 13 +++-- server/clustering_test.go | 112 ++++++++++++++++++++++++++++++++++++++ server/snapshot.go | 7 +++ 3 files changed, 128 insertions(+), 4 deletions(-) diff --git a/server/clustering.go b/server/clustering.go index 0edaf755..6373f8f8 100644 --- a/server/clustering.go +++ b/server/clustering.go @@ -465,8 +465,9 @@ func (r *raftFSM) Apply(l *raft.Log) interface{} { case spb.RaftOperation_Publish: // Message replication. var ( - c *channel - err error + c *channel + err error + lastSeq uint64 ) for _, msg := range op.PublishBatch.Messages { // This is a batch for a given channel, so lookup channel once. @@ -477,13 +478,17 @@ func (r *raftFSM) Apply(l *raft.Log) interface{} { if err == ErrChanDelInProgress { return nil } + lastSeq, err = c.store.Msgs.LastSequence() + } + if err == nil && lastSeq < msg.Sequence-1 { + err = s.raft.fsm.restoreMsgsFromSnapshot(c, lastSeq+1, msg.Sequence-1) } if err == nil { _, err = c.store.Msgs.Store(msg) } if err != nil { - panic(fmt.Errorf("failed to store replicated message %d on channel %s: %v", - msg.Sequence, msg.Subject, err)) + return fmt.Errorf("failed to store replicated message %d on channel %s: %v", + msg.Sequence, msg.Subject, err) } } return nil diff --git a/server/clustering_test.go b/server/clustering_test.go index a2fa8f15..3e429086 100644 --- a/server/clustering_test.go +++ b/server/clustering_test.go @@ -5210,3 +5210,115 @@ func TestClusteringReplSubSentAckWhileClosing(t *testing.T) { waitForNumClients(t, s1, 0) s1.Shutdown() } + +type msgStoreDoesntFlush struct { + stores.MsgStore +} + +func (s *msgStoreDoesntFlush) Store(m *pb.MsgProto) (uint64, error) { + // To simulate a no flush, we are actually skipping storing + // the message. + return m.Sequence, nil +} + +func TestClusteringGapsAfterSnapshotAndNoFlush(t *testing.T) { + cleanupDatastore(t) + defer cleanupDatastore(t) + cleanupRaftLog(t) + defer cleanupRaftLog(t) + + // For this test, use a central NATS server. + ns := natsdTest.RunDefaultServer() + defer ns.Shutdown() + + // Configure first server + s1sOpts := getTestDefaultOptsForClustering("a", true) + s1 := runServerWithOpts(t, s1sOpts, nil) + defer s1.Shutdown() + + // Configure second server. + s2sOpts := getTestDefaultOptsForClustering("b", false) + s2sOpts.FileStoreOpts.BufferSize = 1024 * 1024 + s2 := runServerWithOpts(t, s2sOpts, nil) + defer s2.Shutdown() + + // Configure third server. + s3sOpts := getTestDefaultOptsForClustering("c", false) + s3 := runServerWithOpts(t, s3sOpts, nil) + defer s3.Shutdown() + + getLeader(t, 10*time.Second, s1, s2, s3) + + sc := NewDefaultConnection(t) + defer sc.Close() + + if err := sc.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + + verifyChannelExist(t, s2, "foo", true, 2*time.Second) + + // Flush the first message + c := s2.channels.get("foo") + c.store.Msgs.Flush() + // Replace with a store that does not write messages + c.store.Msgs = &msgStoreDoesntFlush{c.store.Msgs} + + for i := 0; i < 100; i++ { + if err := sc.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + } + + if err := s2.raft.Snapshot().Error(); err != nil { + t.Fatalf("Error on snapshot: %v", err) + } + s2.Shutdown() + + for i := 0; i < 10; i++ { + if err := sc.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + } + + s2 = runServerWithOpts(t, s2sOpts, nil) + defer s2.Shutdown() + + waitForCount(t, 111, func() (string, int) { + var last uint64 + c := s2.channels.get("foo") + if c != nil { + last, _ = c.store.Msgs.LastSequence() + } + return "last sequence for channel foo", int(last) + }) + + sc.Close() + s1.Shutdown() + s3.Shutdown() + + s2.Shutdown() + s2sOpts.Clustering.Clustered = false + s2 = runServerWithOpts(t, s2sOpts, nil) + defer s2.Shutdown() + + sc = NewDefaultConnection(t) + defer sc.Close() + + ch := make(chan *stan.Msg, 110) + if _, err := sc.Subscribe("foo", func(msg *stan.Msg) { + ch <- msg + }, stan.DeliverAllAvailable()); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + for i := 0; i < 110; i++ { + select { + case m := <-ch: + if len(m.Data) == 0 { + t.Fatalf("Received empty message: %+v", m) + } + case <-time.After(2 * time.Second): + t.Fatalf("Did not get all messages") + } + } +} diff --git a/server/snapshot.go b/server/snapshot.go index 4ef22181..776a506e 100644 --- a/server/snapshot.go +++ b/server/snapshot.go @@ -143,6 +143,13 @@ func (s *serverSnapshot) snapshotChannels(snap *spb.RaftSnapshot) error { snap.Channels = make([]*spb.ChannelSnapshot, numChannels) numChannel := 0 for _, c := range s.channels.channels { + // Flush msg and sub stores before persisting snapshot + if err := c.store.Subs.Flush(); err != nil { + return err + } + if err := c.store.Msgs.Flush(); err != nil { + return err + } first, last, err := c.store.Msgs.FirstAndLastSequence() if err != nil { return err