diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index e35375a22c6..e81908d0e74 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -899,12 +899,6 @@ func (c *RaftCluster) Store(store v2store.Store) { if m.ClientURLs != nil { mustUpdateMemberAttrInStore(c.lg, store, m) } - c.lg.Info( - "snapshot storing member", - zap.String("id", m.ID.String()), - zap.Strings("peer-urls", m.PeerURLs), - zap.Bool("is-learner", m.IsLearner), - ) } for id := range c.removed { //We do not need to delete the member since the store is empty. diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 3e5c6b619d2..fc3d216ec5d 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -293,6 +293,7 @@ type EtcdServer struct { *AccessController // forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount. // Should only be set within apply code path. Used to force snapshot after cluster version downgrade. + // TODO: Replace with flush db in v3.7 assuming v3.6 bootstraps from db file. forceSnapshot bool corruptionChecker CorruptionChecker } @@ -741,10 +742,11 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) { } type etcdProgress struct { - confState raftpb.ConfState - snapi uint64 - appliedt uint64 - appliedi uint64 + confState raftpb.ConfState + diskSnapshotIndex uint64 + memorySnapshotIndex uint64 + appliedt uint64 + appliedi uint64 } // raftReadyHandler contains a set of EtcdServer operations to be called by raftNode, @@ -809,10 +811,11 @@ func (s *EtcdServer) run() { s.r.start(rh) ep := etcdProgress{ - confState: sn.Metadata.ConfState, - snapi: sn.Metadata.Index, - appliedt: sn.Metadata.Term, - appliedi: sn.Metadata.Index, + confState: sn.Metadata.ConfState, + diskSnapshotIndex: sn.Metadata.Index, + memorySnapshotIndex: sn.Metadata.Index, + appliedt: sn.Metadata.Term, + appliedi: sn.Metadata.Index, } defer func() { @@ -998,7 +1001,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) { lg := s.Logger() lg.Info( "applying snapshot", - zap.Uint64("current-snapshot-index", ep.snapi), + zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex), zap.Uint64("current-applied-index", ep.appliedi), zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index), zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term), @@ -1006,7 +1009,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) { defer func() { lg.Info( "applied snapshot", - zap.Uint64("current-snapshot-index", ep.snapi), + zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex), zap.Uint64("current-applied-index", ep.appliedi), zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index), zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term), @@ -1017,7 +1020,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) { if toApply.snapshot.Metadata.Index <= ep.appliedi { lg.Panic( "unexpected leader snapshot from outdated index", - zap.Uint64("current-snapshot-index", ep.snapi), + zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex), zap.Uint64("current-applied-index", ep.appliedi), zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index), zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term), @@ -1132,7 +1135,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) { ep.appliedt = toApply.snapshot.Metadata.Term ep.appliedi = toApply.snapshot.Metadata.Index - ep.snapi = ep.appliedi + ep.diskSnapshotIndex = ep.appliedi + ep.memorySnapshotIndex = ep.appliedi ep.confState = toApply.snapshot.Metadata.ConfState // As backends and implementations like alarmsStore changed, we need @@ -1192,27 +1196,35 @@ func (s *EtcdServer) ForceSnapshot() { } func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { - if !s.shouldSnapshot(ep) { + if !s.shouldSnapshotToDisk(ep) { + // Cannot snapshot/compact on index 1 and 2. Original code was not designed to handle low index numbers. + // Before merge need to decide the best place to handle this case. + if ep.appliedi > 2 && ep.appliedi > ep.memorySnapshotIndex { + s.snapshotToMemory(ep.appliedi, ep.confState) + s.compactRaftLog(ep.appliedi) + ep.memorySnapshotIndex = ep.appliedi + } return } + //TODO: Remove disk snapshot in v3.7 lg := s.Logger() lg.Info( "triggering snapshot", zap.String("local-member-id", s.MemberID().String()), zap.Uint64("local-member-applied-index", ep.appliedi), - zap.Uint64("local-member-snapshot-index", ep.snapi), + zap.Uint64("local-member-snapshot-index", ep.diskSnapshotIndex), zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount), zap.Bool("snapshot-forced", s.forceSnapshot), ) s.forceSnapshot = false - s.snapshot(ep.appliedi, ep.confState) + s.snapshotToDisk(ep.appliedi, ep.confState) s.compactRaftLog(ep.appliedi) - ep.snapi = ep.appliedi + ep.diskSnapshotIndex = ep.appliedi } -func (s *EtcdServer) shouldSnapshot(ep *etcdProgress) bool { - return (s.forceSnapshot && ep.appliedi != ep.snapi) || (ep.appliedi-ep.snapi > s.Cfg.SnapshotCount) +func (s *EtcdServer) shouldSnapshotToDisk(ep *etcdProgress) bool { + return (s.forceSnapshot && ep.appliedi != ep.diskSnapshotIndex) || (ep.appliedi-ep.diskSnapshotIndex > s.Cfg.SnapshotCount) } func (s *EtcdServer) hasMultipleVotingMembers() bool { @@ -2132,7 +2144,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con } // TODO: non-blocking snapshot -func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { +func (s *EtcdServer) snapshotToDisk(snapi uint64, confState raftpb.ConfState) { d := GetMembershipInfoInV2Format(s.Logger(), s.cluster) // commit kv to write metadata (for example: consistent index) to disk. // @@ -2174,6 +2186,25 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { ) } +func (s *EtcdServer) snapshotToMemory(snapi uint64, confState raftpb.ConfState) { + d := GetMembershipInfoInV2Format(s.Logger(), s.cluster) + + lg := s.Logger() + + // For backward compatibility, generate v2 snapshot from v3 state. + snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d) + if err != nil { + // the snapshot was done asynchronously with the progress of raft. + // raft might have already got a newer snapshot. + if errorspkg.Is(err, raft.ErrSnapOutOfDate) { + return + } + lg.Panic("failed to create snapshot", zap.Error(err)) + } + + verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex()) +} + func (s *EtcdServer) compactRaftLog(snapi uint64) { lg := s.Logger() @@ -2189,10 +2220,10 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) { // keep some in memory log entries for slow followers. compacti := uint64(1) - if snapi > s.Cfg.SnapshotCatchUpEntries { - compacti = snapi - s.Cfg.SnapshotCatchUpEntries + if snapi <= s.Cfg.SnapshotCatchUpEntries { + return } - + compacti = snapi - s.Cfg.SnapshotCatchUpEntries err := s.r.raftStorage.Compact(compacti) if err != nil { // the compaction was done asynchronously with the progress of raft. @@ -2202,10 +2233,6 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) { } lg.Panic("failed to compact", zap.Error(err)) } - lg.Info( - "compacted Raft logs", - zap.Uint64("compact-index", compacti), - ) } // CutPeer drops messages to the specified peer. diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 688d71be264..7ee3b4b8a16 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -649,7 +649,7 @@ func TestSnapshot(t *testing.T) { } }() - srv.snapshot(1, raftpb.ConfState{Voters: []uint64{1}}) + srv.snapshotToDisk(1, raftpb.ConfState{Voters: []uint64{1}}) <-ch if len(st.Action()) != 0 { t.Errorf("no action expected on v2store. Got %d actions", len(st.Action()))