Skip to content

Commit

Permalink
Run a separate in memory snapshot to reduce number of entries stored …
Browse files Browse the repository at this point in the history
…in raft memory storage

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Nov 2, 2024
1 parent 3de0018 commit 3b00627
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 33 deletions.
6 changes: 0 additions & 6 deletions server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,12 +899,6 @@ func (c *RaftCluster) Store(store v2store.Store) {
if m.ClientURLs != nil {
mustUpdateMemberAttrInStore(c.lg, store, m)
}
c.lg.Info(
"snapshot storing member",
zap.String("id", m.ID.String()),
zap.Strings("peer-urls", m.PeerURLs),
zap.Bool("is-learner", m.IsLearner),
)
}
for id := range c.removed {
//We do not need to delete the member since the store is empty.
Expand Down
79 changes: 53 additions & 26 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ type EtcdServer struct {
*AccessController
// forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
// Should only be set within apply code path. Used to force snapshot after cluster version downgrade.
// TODO: Replace with flush db in v3.7 assuming v3.6 bootstraps from db file.
forceSnapshot bool
corruptionChecker CorruptionChecker
}
Expand Down Expand Up @@ -741,10 +742,11 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
}

type etcdProgress struct {
confState raftpb.ConfState
snapi uint64
appliedt uint64
appliedi uint64
confState raftpb.ConfState
diskSnapshotIndex uint64
memorySnapshotIndex uint64
appliedt uint64
appliedi uint64
}

// raftReadyHandler contains a set of EtcdServer operations to be called by raftNode,
Expand Down Expand Up @@ -809,10 +811,11 @@ func (s *EtcdServer) run() {
s.r.start(rh)

ep := etcdProgress{
confState: sn.Metadata.ConfState,
snapi: sn.Metadata.Index,
appliedt: sn.Metadata.Term,
appliedi: sn.Metadata.Index,
confState: sn.Metadata.ConfState,
diskSnapshotIndex: sn.Metadata.Index,
memorySnapshotIndex: sn.Metadata.Index,
appliedt: sn.Metadata.Term,
appliedi: sn.Metadata.Index,
}

defer func() {
Expand Down Expand Up @@ -998,15 +1001,15 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
lg := s.Logger()
lg.Info(
"applying snapshot",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
)
defer func() {
lg.Info(
"applied snapshot",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
Expand All @@ -1017,7 +1020,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
if toApply.snapshot.Metadata.Index <= ep.appliedi {
lg.Panic(
"unexpected leader snapshot from outdated index",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
Expand Down Expand Up @@ -1132,7 +1135,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {

ep.appliedt = toApply.snapshot.Metadata.Term
ep.appliedi = toApply.snapshot.Metadata.Index
ep.snapi = ep.appliedi
ep.diskSnapshotIndex = ep.appliedi
ep.memorySnapshotIndex = ep.appliedi
ep.confState = toApply.snapshot.Metadata.ConfState

// As backends and implementations like alarmsStore changed, we need
Expand Down Expand Up @@ -1192,27 +1196,35 @@ func (s *EtcdServer) ForceSnapshot() {
}

func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
if !s.shouldSnapshot(ep) {
if !s.shouldSnapshotToDisk(ep) {
// Cannot snapshot/compact on index 1 and 2. Original code was not designed to handle low index numbers.
// Before merge need to decide the best place to handle this case.
if ep.appliedi > 2 && ep.appliedi > ep.memorySnapshotIndex {
s.snapshotToMemory(ep.appliedi, ep.confState)
s.compactRaftLog(ep.appliedi)
ep.memorySnapshotIndex = ep.appliedi
}
return
}
//TODO: Remove disk snapshot in v3.7
lg := s.Logger()
lg.Info(
"triggering snapshot",
zap.String("local-member-id", s.MemberID().String()),
zap.Uint64("local-member-applied-index", ep.appliedi),
zap.Uint64("local-member-snapshot-index", ep.snapi),
zap.Uint64("local-member-snapshot-index", ep.diskSnapshotIndex),
zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
zap.Bool("snapshot-forced", s.forceSnapshot),
)
s.forceSnapshot = false

s.snapshot(ep.appliedi, ep.confState)
s.snapshotToDisk(ep.appliedi, ep.confState)
s.compactRaftLog(ep.appliedi)
ep.snapi = ep.appliedi
ep.diskSnapshotIndex = ep.appliedi
}

func (s *EtcdServer) shouldSnapshot(ep *etcdProgress) bool {
return (s.forceSnapshot && ep.appliedi != ep.snapi) || (ep.appliedi-ep.snapi > s.Cfg.SnapshotCount)
func (s *EtcdServer) shouldSnapshotToDisk(ep *etcdProgress) bool {
return (s.forceSnapshot && ep.appliedi != ep.diskSnapshotIndex) || (ep.appliedi-ep.diskSnapshotIndex > s.Cfg.SnapshotCount)
}

func (s *EtcdServer) hasMultipleVotingMembers() bool {
Expand Down Expand Up @@ -2132,7 +2144,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
}

// TODO: non-blocking snapshot
func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
func (s *EtcdServer) snapshotToDisk(snapi uint64, confState raftpb.ConfState) {
d := GetMembershipInfoInV2Format(s.Logger(), s.cluster)
// commit kv to write metadata (for example: consistent index) to disk.
//
Expand Down Expand Up @@ -2174,6 +2186,25 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
)
}

func (s *EtcdServer) snapshotToMemory(snapi uint64, confState raftpb.ConfState) {
d := GetMembershipInfoInV2Format(s.Logger(), s.cluster)

lg := s.Logger()

// For backward compatibility, generate v2 snapshot from v3 state.
snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
if err != nil {
// the snapshot was done asynchronously with the progress of raft.
// raft might have already got a newer snapshot.
if errorspkg.Is(err, raft.ErrSnapOutOfDate) {
return
}
lg.Panic("failed to create snapshot", zap.Error(err))
}

verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex())
}

func (s *EtcdServer) compactRaftLog(snapi uint64) {
lg := s.Logger()

Expand All @@ -2189,10 +2220,10 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) {

// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
if snapi <= s.Cfg.SnapshotCatchUpEntries {
return
}

compacti = snapi - s.Cfg.SnapshotCatchUpEntries
err := s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
Expand All @@ -2202,10 +2233,6 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) {
}
lg.Panic("failed to compact", zap.Error(err))
}
lg.Info(
"compacted Raft logs",
zap.Uint64("compact-index", compacti),
)
}

// CutPeer drops messages to the specified peer.
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ func TestSnapshot(t *testing.T) {
}
}()

srv.snapshot(1, raftpb.ConfState{Voters: []uint64{1}})
srv.snapshotToDisk(1, raftpb.ConfState{Voters: []uint64{1}})
<-ch
if len(st.Action()) != 0 {
t.Errorf("no action expected on v2store. Got %d actions", len(st.Action()))
Expand Down

0 comments on commit 3b00627

Please sign in to comment.