diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index 9d62db12154..6539b977d23 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -899,7 +899,7 @@ func (c *RaftCluster) Store(store v2store.Store) { if m.ClientURLs != nil { mustUpdateMemberAttrInStore(c.lg, store, m) } - c.lg.Info( + c.lg.Debug( "snapshot storing member", zap.String("id", m.ID.String()), zap.Strings("peer-urls", m.PeerURLs), diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 7e329381935..446606431a4 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -109,6 +109,7 @@ const ( readyPercentThreshold = 0.9 DowngradeEnabledPath = "/downgrade/enabled" + memorySnapshotCount = 100 ) var ( @@ -291,9 +292,10 @@ type EtcdServer struct { clusterVersionChanged *notify.Notifier *AccessController - // forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount. + // forceDiskSnapshot can force snapshot be triggered after apply, independent of the snapshotCount. // Should only be set within apply code path. Used to force snapshot after cluster version downgrade. - forceSnapshot bool + // TODO: Replace with flush db in v3.7 assuming v3.6 bootstraps from db file. + forceDiskSnapshot bool corruptionChecker CorruptionChecker } @@ -741,10 +743,11 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) { } type etcdProgress struct { - confState raftpb.ConfState - snapi uint64 - appliedt uint64 - appliedi uint64 + confState raftpb.ConfState + diskSnapshotIndex uint64 + memorySnapshotIndex uint64 + appliedt uint64 + appliedi uint64 } // raftReadyHandler contains a set of EtcdServer operations to be called by raftNode, @@ -809,10 +812,11 @@ func (s *EtcdServer) run() { s.r.start(rh) ep := etcdProgress{ - confState: sn.Metadata.ConfState, - snapi: sn.Metadata.Index, - appliedt: sn.Metadata.Term, - appliedi: sn.Metadata.Index, + confState: sn.Metadata.ConfState, + diskSnapshotIndex: sn.Metadata.Index, + memorySnapshotIndex: sn.Metadata.Index, + appliedt: sn.Metadata.Term, + appliedi: sn.Metadata.Index, } defer func() { @@ -979,7 +983,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) { // storage, since the raft routine might be slower than toApply routine. <-apply.notifyc - s.triggerSnapshot(ep) + s.snapshotIfNeededAndCompactRaftLog(ep) select { // snapshot requested via send() case m := <-s.r.msgSnapC: @@ -998,7 +1002,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) { lg := s.Logger() lg.Info( "applying snapshot", - zap.Uint64("current-snapshot-index", ep.snapi), + zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex), zap.Uint64("current-applied-index", ep.appliedi), zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index), zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term), @@ -1006,7 +1010,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) { defer func() { lg.Info( "applied snapshot", - zap.Uint64("current-snapshot-index", ep.snapi), + zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex), zap.Uint64("current-applied-index", ep.appliedi), zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index), zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term), @@ -1017,7 +1021,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) { if toApply.snapshot.Metadata.Index <= ep.appliedi { lg.Panic( "unexpected leader snapshot from outdated index", - zap.Uint64("current-snapshot-index", ep.snapi), + zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex), zap.Uint64("current-applied-index", ep.appliedi), zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index), zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term), @@ -1132,7 +1136,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) { ep.appliedt = toApply.snapshot.Metadata.Term ep.appliedi = toApply.snapshot.Metadata.Index - ep.snapi = ep.appliedi + ep.diskSnapshotIndex = ep.appliedi + ep.memorySnapshotIndex = ep.appliedi ep.confState = toApply.snapshot.Metadata.ConfState // As backends and implementations like alarmsStore changed, we need @@ -1188,31 +1193,26 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) { } func (s *EtcdServer) ForceSnapshot() { - s.forceSnapshot = true + s.forceDiskSnapshot = true } -func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { - if !s.shouldSnapshot(ep) { +func (s *EtcdServer) snapshotIfNeededAndCompactRaftLog(ep *etcdProgress) { + //TODO: Remove disk snapshot in v3.7 + shouldSnapshotToDisk := s.shouldSnapshotToDisk(ep) + shouldSnapshotToMemory := s.shouldSnapshotToMemory(ep) + if !shouldSnapshotToDisk && !shouldSnapshotToMemory { return } - lg := s.Logger() - lg.Info( - "triggering snapshot", - zap.String("local-member-id", s.MemberID().String()), - zap.Uint64("local-member-applied-index", ep.appliedi), - zap.Uint64("local-member-snapshot-index", ep.snapi), - zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount), - zap.Bool("snapshot-forced", s.forceSnapshot), - ) - s.forceSnapshot = false - - s.snapshot(ep.appliedi, ep.confState) + s.snapshot(ep, shouldSnapshotToDisk) s.compactRaftLog(ep.appliedi) - ep.snapi = ep.appliedi } -func (s *EtcdServer) shouldSnapshot(ep *etcdProgress) bool { - return (s.forceSnapshot && ep.appliedi != ep.snapi) || (ep.appliedi-ep.snapi > s.Cfg.SnapshotCount) +func (s *EtcdServer) shouldSnapshotToDisk(ep *etcdProgress) bool { + return (s.forceDiskSnapshot && ep.appliedi != ep.diskSnapshotIndex) || (ep.appliedi-ep.diskSnapshotIndex > s.Cfg.SnapshotCount) +} + +func (s *EtcdServer) shouldSnapshotToMemory(ep *etcdProgress) bool { + return ep.appliedi > ep.memorySnapshotIndex+memorySnapshotCount } func (s *EtcdServer) hasMultipleVotingMembers() bool { @@ -2128,23 +2128,33 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con } // TODO: non-blocking snapshot -func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { - d := GetMembershipInfoInV2Format(s.Logger(), s.cluster) - // commit kv to write metadata (for example: consistent index) to disk. - // - // This guarantees that Backend's consistent_index is >= index of last snapshot. - // - // KV().commit() updates the consistent index in backend. - // All operations that update consistent index must be called sequentially - // from applyAll function. - // So KV().Commit() cannot run in parallel with toApply. It has to be called outside - // the go routine created below. - s.KV().Commit() - +func (s *EtcdServer) snapshot(ep *etcdProgress, toDisk bool) { lg := s.Logger() + d := GetMembershipInfoInV2Format(lg, s.cluster) + if toDisk { + s.Logger().Info( + "triggering snapshot", + zap.String("local-member-id", s.MemberID().String()), + zap.Uint64("local-member-applied-index", ep.appliedi), + zap.Uint64("local-member-snapshot-index", ep.diskSnapshotIndex), + zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount), + zap.Bool("snapshot-forced", s.forceDiskSnapshot), + ) + s.forceDiskSnapshot = false + // commit kv to write metadata (for example: consistent index) to disk. + // + // This guarantees that Backend's consistent_index is >= index of last snapshot. + // + // KV().commit() updates the consistent index in backend. + // All operations that update consistent index must be called sequentially + // from applyAll function. + // So KV().Commit() cannot run in parallel with toApply. It has to be called outside + // the go routine created below. + s.KV().Commit() + } // For backward compatibility, generate v2 snapshot from v3 state. - snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d) + snap, err := s.r.raftStorage.CreateSnapshot(ep.appliedi, &ep.confState, d) if err != nil { // the snapshot was done asynchronously with the progress of raft. // raft might have already got a newer snapshot. @@ -2153,21 +2163,25 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { } lg.Panic("failed to create snapshot", zap.Error(err)) } + ep.memorySnapshotIndex = ep.appliedi verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex()) - // SaveSnap saves the snapshot to file and appends the corresponding WAL entry. - if err = s.r.storage.SaveSnap(snap); err != nil { - lg.Panic("failed to save snapshot", zap.Error(err)) - } - if err = s.r.storage.Release(snap); err != nil { - lg.Panic("failed to release wal", zap.Error(err)) - } + if toDisk { + // SaveSnap saves the snapshot to file and appends the corresponding WAL entry. + if err = s.r.storage.SaveSnap(snap); err != nil { + lg.Panic("failed to save snapshot", zap.Error(err)) + } + ep.diskSnapshotIndex = ep.appliedi + if err = s.r.storage.Release(snap); err != nil { + lg.Panic("failed to release wal", zap.Error(err)) + } - lg.Info( - "saved snapshot", - zap.Uint64("snapshot-index", snap.Metadata.Index), - ) + lg.Info( + "saved snapshot to disk", + zap.Uint64("snapshot-index", snap.Metadata.Index), + ) + } } func (s *EtcdServer) compactRaftLog(snapi uint64) { @@ -2188,7 +2202,6 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) { if snapi > s.Cfg.SnapshotCatchUpEntries { compacti = snapi - s.Cfg.SnapshotCatchUpEntries } - err := s.r.raftStorage.Compact(compacti) if err != nil { // the compaction was done asynchronously with the progress of raft. @@ -2198,7 +2211,7 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) { } lg.Panic("failed to compact", zap.Error(err)) } - lg.Info( + lg.Debug( "compacted Raft logs", zap.Uint64("compact-index", compacti), ) diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 56775a70d22..265bce38f56 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -627,8 +627,8 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { } } -// TestSnapshot should snapshot the store and cut the persistent -func TestSnapshot(t *testing.T) { +// TestSnapshotDisk should save the snapshot to disk and release old snapshots +func TestSnapshotDisk(t *testing.T) { revertFunc := verify.DisableVerifications() defer revertFunc() @@ -667,24 +667,65 @@ func TestSnapshot(t *testing.T) { gaction, _ := p.Wait(2) defer func() { ch <- struct{}{} }() - if len(gaction) != 2 { - t.Errorf("len(action) = %d, want 2", len(gaction)) - return - } - if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) { - t.Errorf("action = %s, want SaveSnap", gaction[0]) - } + assert.Len(t, gaction, 2) + assert.Equal(t, testutil.Action{Name: "SaveSnap"}, gaction[0]) + assert.Equal(t, testutil.Action{Name: "Release"}, gaction[1]) + }() + ep := etcdProgress{appliedi: 1, confState: raftpb.ConfState{Voters: []uint64{1}}} + srv.snapshot(&ep, true) + <-ch + assert.Empty(t, st.Action()) + assert.Equal(t, uint64(1), ep.diskSnapshotIndex) + assert.Equal(t, uint64(1), ep.memorySnapshotIndex) +} - if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "Release"}) { - t.Errorf("action = %s, want Release", gaction[1]) - } +func TestSnapshotMemory(t *testing.T) { + revertFunc := verify.DisableVerifications() + defer revertFunc() + + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) + + s := raft.NewMemoryStorage() + s.Append([]raftpb.Entry{{Index: 1}}) + st := mockstore.NewRecorderStream() + p := mockstorage.NewStorageRecorderStream("") + r := newRaftNode(raftNodeConfig{ + lg: zaptest.NewLogger(t), + Node: newNodeNop(), + raftStorage: s, + storage: p, + }) + srv := &EtcdServer{ + lgMu: new(sync.RWMutex), + lg: zaptest.NewLogger(t), + r: *r, + v2store: st, + consistIndex: cindex.NewConsistentIndex(be), + } + srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}) + defer func() { + assert.NoError(t, srv.kv.Close()) }() + srv.be = be - srv.snapshot(1, raftpb.ConfState{Voters: []uint64{1}}) + cl := membership.NewCluster(zaptest.NewLogger(t)) + srv.cluster = cl + + ch := make(chan struct{}, 1) + + go func() { + gaction, _ := p.Wait(1) + defer func() { ch <- struct{}{} }() + + assert.Empty(t, gaction) + }() + ep := etcdProgress{appliedi: 1, confState: raftpb.ConfState{Voters: []uint64{1}}} + srv.snapshot(&ep, false) <-ch - if len(st.Action()) != 0 { - t.Errorf("no action expected on v2store. Got %d actions", len(st.Action())) - } + assert.Empty(t, st.Action()) + assert.Equal(t, uint64(0), ep.diskSnapshotIndex) + assert.Equal(t, uint64(1), ep.memorySnapshotIndex) } // TestSnapshotOrdering ensures raft persists snapshot onto disk before diff --git a/tests/integration/v3_watch_restore_test.go b/tests/integration/v3_watch_restore_test.go index b9b6afccb38..b3fc8236f78 100644 --- a/tests/integration/v3_watch_restore_test.go +++ b/tests/integration/v3_watch_restore_test.go @@ -111,7 +111,9 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { // // Since there is no way to confirm server has compacted the log, we // use log monitor to watch and expect "compacted Raft logs" content. - expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "compacted Raft logs", 2) + // In v3.6 we no longer generates "compacted Raft logs" log as raft compaction happens independently to snapshot. + // For now let's use snapshot log which should be equivalent to compaction. + expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "saved snapshot to disk", 2) // After RecoverPartition, leader L will send snapshot to slow F_m0 // follower, because F_m0(index:8) is 'out of date' compared to