Skip to content

Commit

Permalink
Merge pull request #1088 from nats-io/fix_log_cache_issue
Browse files Browse the repository at this point in the history
[FIXED] Clustering: possible panic on restart with "log not found"
  • Loading branch information
kozlovic authored Sep 15, 2020
2 parents 363954a + e5e9dab commit 18cff06
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 9 deletions.
11 changes: 3 additions & 8 deletions server/clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ func (s *StanServer) createRaftNode(name string) (bool, error) {
if err != nil {
return false, err
}
store.setCacheSize(s.opts.Clustering.LogCacheSize)

// Go through the list of channels that we have recovered from streaming store
// and set their corresponding UID.
Expand All @@ -311,12 +312,6 @@ func (s *StanServer) createRaftNode(name string) (bool, error) {
}
s.channels.Unlock()

cacheStore, err := raft.NewLogCache(s.opts.Clustering.LogCacheSize, store)
if err != nil {
store.Close()
return false, err
}

addr := s.getClusteringAddr(name)
config := raft.DefaultConfig()
// For tests
Expand Down Expand Up @@ -383,7 +378,7 @@ func (s *StanServer) createRaftNode(name string) (bool, error) {
fsm.Unlock()
}
s.raft.fsm = fsm
node, err := raft.NewRaft(config, fsm, cacheStore, store, snapshotStore, transport)
node, err := raft.NewRaft(config, fsm, store, store, snapshotStore, transport)
if err != nil {
transport.Close()
store.Close()
Expand All @@ -392,7 +387,7 @@ func (s *StanServer) createRaftNode(name string) (bool, error) {
if testPauseAfterNewRaftCalled {
time.Sleep(time.Second)
}
existingState, err := raft.HasExistingState(cacheStore, store, snapshotStore)
existingState, err := raft.HasExistingState(store, store, snapshotStore)
if err != nil {
node.Shutdown()
transport.Close()
Expand Down
30 changes: 30 additions & 0 deletions server/raft_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ type raftLog struct {
codec *codec.MsgpackHandle
closed bool

// Our cache
cache []*raft.Log
cacheSize uint64 // Save size as uint64 to not have to case during store/load

// If the store is using encryption
encryption bool
eds *stores.EDStore
Expand Down Expand Up @@ -91,6 +95,13 @@ func newRaftLog(log logger.Logger, fileName string, sync bool, _ int, encrypt bo
return r, nil
}

func (r *raftLog) setCacheSize(cacheSize int) {
r.Lock()
defer r.Unlock()
r.cacheSize = uint64(cacheSize)
r.cache = make([]*raft.Log, cacheSize)
}

func (r *raftLog) init() error {
tx, err := r.conn.Begin(true)
if err != nil {
Expand Down Expand Up @@ -213,6 +224,14 @@ func (r *raftLog) getIndex(first bool) (uint64, error) {
// GetLog implements the LogStore interface
func (r *raftLog) GetLog(idx uint64, log *raft.Log) error {
r.RLock()
if r.cache != nil {
cached := r.cache[idx%r.cacheSize]
if cached != nil && cached.Index == idx {
*log = *cached
r.RUnlock()
return nil
}
}
tx, err := r.conn.Begin(false)
if err != nil {
r.RUnlock()
Expand Down Expand Up @@ -265,6 +284,12 @@ func (r *raftLog) StoreLogs(logs []*raft.Log) error {
tx.Rollback()
} else {
err = tx.Commit()
if err == nil && r.cache != nil {
// Cache only on success
for _, l := range logs {
r.cache[l.Index%r.cacheSize] = l
}
}
}
r.Unlock()
return err
Expand All @@ -275,6 +300,11 @@ func (r *raftLog) DeleteRange(min, max uint64) (retErr error) {
r.Lock()
defer r.Unlock()

if r.cacheSize > 0 {
// Reset cache
r.cache = make([]*raft.Log, int(r.cacheSize))
}

start := time.Now()
r.log.Noticef("Deleting raft logs from %v to %v", min, max)
err := r.deleteRange(min, max)
Expand Down
90 changes: 90 additions & 0 deletions server/raft_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,3 +512,93 @@ func TestRaftLogChannelID(t *testing.T) {
getID("bar", 0)
getID("baz", 0)
}

func TestRaftLogCache(t *testing.T) {
cleanupRaftLog(t)
defer cleanupRaftLog(t)

store := createTestRaftLog(t, false, 0)
defer store.Close()

store.setCacheSize(10)

store.RLock()
lc := len(store.cache)
cs := store.cacheSize
store.RUnlock()
if lc != 10 {
t.Fatalf("Expected cache len to be 10, got %v", lc)
}
if cs != 10 {
t.Fatalf("Expected cacheSize to be 10, got %v", cs)
}

l1 := &raft.Log{Index: 1, Data: []byte("msg1")}
if err := store.StoreLog(l1); err != nil {
t.Fatalf("Error on store: %v", err)
}

l2 := &raft.Log{Index: 2, Data: []byte("msg2")}
l3 := &raft.Log{Index: 3, Data: []byte("msg3")}
if err := store.StoreLogs([]*raft.Log{l2, l3}); err != nil {
t.Fatalf("Error on store: %v", err)
}

store.RLock()
cl1 := store.cache[1%10]
cl2 := store.cache[2%10]
cl3 := store.cache[3%10]
store.RUnlock()
if cl1 == nil || cl2 == nil || cl3 == nil || cl1.Index != 1 || cl2.Index != 2 || cl3.Index != 3 {
t.Fatalf("Wrong content: l1=%v l2=%v l3=%v", cl1, cl2, cl3)
}

l11 := &raft.Log{Index: 11, Data: []byte("msg11")}
if err := store.StoreLog(l11); err != nil {
t.Fatalf("Error on store: %v", err)
}
var cl11 raft.Log
if err := store.GetLog(11, &cl11); err != nil || cl11.Index != 11 || string(cl11.Data) != "msg11" {
t.Fatalf("Unexpected err=%v msg=%v", err, cl11)
}

if err := store.DeleteRange(1, 2); err != nil {
t.Fatalf("Error on delete range: %v", err)
}
var err error
store.RLock()
lc = len(store.cache)
cs = store.cacheSize
for _, l := range store.cache {
if l != nil {
err = fmt.Errorf("Log still in cache: %v", l)
break
}
}
store.RUnlock()
if lc != 10 {
t.Fatalf("Expected cache len to be 10, got %v", lc)
}
if cs != 10 {
t.Fatalf("Expected cacheSize to be 10, got %v", cs)
}
if err != nil {
t.Fatal(err.Error())
}

// Now cause encoding to fail so that storelog fails and
// we check that log was not cached.
store.Lock()
store.conn.Close()
store.Unlock()
l4 := &raft.Log{Index: 4, Data: []byte("msg4")}
if err := store.StoreLog(l4); err == nil {
t.Fatal("Expected error on store")
}
store.RLock()
cl4 := store.cache[4%10]
store.RUnlock()
if cl4 != nil {
t.Fatalf("Expected log 4 not to be cached, got %v", cl4)
}
}
2 changes: 1 addition & 1 deletion stores/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1781,7 +1781,7 @@ func TestFSFilesClosedOnRecovery(t *testing.T) {
cleanupFSDatastore(t)
defer cleanupFSDatastore(t)

s := createDefaultFileStore(t, SliceConfig(1, 0, 0, ""))
s := createDefaultFileStore(t, SliceConfig(1, 0, 0, ""), DoSync(false))
defer s.Close()

limits := testDefaultStoreLimits
Expand Down

0 comments on commit 18cff06

Please sign in to comment.