Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run a separate in memory snapshot to reduce number of entries stored in raft memory storage #18825

Merged
merged 6 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ func (c *RaftCluster) Store(store v2store.Store) {
if m.ClientURLs != nil {
mustUpdateMemberAttrInStore(c.lg, store, m)
}
c.lg.Info(
c.lg.Debug(
"snapshot storing member",
zap.String("id", m.ID.String()),
zap.Strings("peer-urls", m.PeerURLs),
Expand Down
133 changes: 73 additions & 60 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
readyPercentThreshold = 0.9

DowngradeEnabledPath = "/downgrade/enabled"
memorySnapshotCount = 100
ahrtr marked this conversation as resolved.
Show resolved Hide resolved
)

var (
Expand Down Expand Up @@ -291,9 +292,10 @@
clusterVersionChanged *notify.Notifier

*AccessController
// forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
// forceDiskSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
// Should only be set within apply code path. Used to force snapshot after cluster version downgrade.
forceSnapshot bool
// TODO: Replace with flush db in v3.7 assuming v3.6 bootstraps from db file.
forceDiskSnapshot bool
corruptionChecker CorruptionChecker
}

Expand Down Expand Up @@ -741,10 +743,11 @@
}

type etcdProgress struct {
confState raftpb.ConfState
snapi uint64
appliedt uint64
appliedi uint64
confState raftpb.ConfState
diskSnapshotIndex uint64
memorySnapshotIndex uint64
appliedt uint64
appliedi uint64
}

// raftReadyHandler contains a set of EtcdServer operations to be called by raftNode,
Expand Down Expand Up @@ -809,10 +812,11 @@
s.r.start(rh)

ep := etcdProgress{
confState: sn.Metadata.ConfState,
snapi: sn.Metadata.Index,
appliedt: sn.Metadata.Term,
appliedi: sn.Metadata.Index,
confState: sn.Metadata.ConfState,
diskSnapshotIndex: sn.Metadata.Index,
memorySnapshotIndex: sn.Metadata.Index,
appliedt: sn.Metadata.Term,
appliedi: sn.Metadata.Index,
}

defer func() {
Expand Down Expand Up @@ -979,7 +983,7 @@
// storage, since the raft routine might be slower than toApply routine.
<-apply.notifyc

s.triggerSnapshot(ep)
s.snapshotIfNeededAndCompactRaftLog(ep)
select {
// snapshot requested via send()
case m := <-s.r.msgSnapC:
Expand All @@ -998,15 +1002,15 @@
lg := s.Logger()
lg.Info(
"applying snapshot",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
)
defer func() {
lg.Info(
"applied snapshot",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
Expand All @@ -1017,7 +1021,7 @@
if toApply.snapshot.Metadata.Index <= ep.appliedi {
lg.Panic(
"unexpected leader snapshot from outdated index",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),

Check warning on line 1024 in server/etcdserver/server.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/server.go#L1024

Added line #L1024 was not covered by tests
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
Expand Down Expand Up @@ -1132,7 +1136,8 @@

ep.appliedt = toApply.snapshot.Metadata.Term
ep.appliedi = toApply.snapshot.Metadata.Index
ep.snapi = ep.appliedi
ep.diskSnapshotIndex = ep.appliedi
ep.memorySnapshotIndex = ep.appliedi
ep.confState = toApply.snapshot.Metadata.ConfState

// As backends and implementations like alarmsStore changed, we need
Expand Down Expand Up @@ -1188,31 +1193,26 @@
}

func (s *EtcdServer) ForceSnapshot() {
s.forceSnapshot = true
s.forceDiskSnapshot = true

Check warning on line 1196 in server/etcdserver/server.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/server.go#L1196

Added line #L1196 was not covered by tests
}

func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
if !s.shouldSnapshot(ep) {
func (s *EtcdServer) snapshotIfNeededAndCompactRaftLog(ep *etcdProgress) {
//TODO: Remove disk snapshot in v3.7
shouldSnapshotToDisk := s.shouldSnapshotToDisk(ep)
shouldSnapshotToMemory := s.shouldSnapshotToMemory(ep)
if !shouldSnapshotToDisk && !shouldSnapshotToMemory {
return
}
lg := s.Logger()
lg.Info(
"triggering snapshot",
zap.String("local-member-id", s.MemberID().String()),
zap.Uint64("local-member-applied-index", ep.appliedi),
zap.Uint64("local-member-snapshot-index", ep.snapi),
zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
zap.Bool("snapshot-forced", s.forceSnapshot),
)
s.forceSnapshot = false

s.snapshot(ep.appliedi, ep.confState)
s.snapshot(ep, shouldSnapshotToDisk)
s.compactRaftLog(ep.appliedi)
ep.snapi = ep.appliedi
}

func (s *EtcdServer) shouldSnapshot(ep *etcdProgress) bool {
return (s.forceSnapshot && ep.appliedi != ep.snapi) || (ep.appliedi-ep.snapi > s.Cfg.SnapshotCount)
func (s *EtcdServer) shouldSnapshotToDisk(ep *etcdProgress) bool {
return (s.forceDiskSnapshot && ep.appliedi != ep.diskSnapshotIndex) || (ep.appliedi-ep.diskSnapshotIndex > s.Cfg.SnapshotCount)
}

func (s *EtcdServer) shouldSnapshotToMemory(ep *etcdProgress) bool {
return ep.appliedi > ep.memorySnapshotIndex+memorySnapshotCount
}

func (s *EtcdServer) hasMultipleVotingMembers() bool {
Expand Down Expand Up @@ -2128,23 +2128,33 @@
}

// TODO: non-blocking snapshot
func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
d := GetMembershipInfoInV2Format(s.Logger(), s.cluster)
// commit kv to write metadata (for example: consistent index) to disk.
//
// This guarantees that Backend's consistent_index is >= index of last snapshot.
//
// KV().commit() updates the consistent index in backend.
// All operations that update consistent index must be called sequentially
// from applyAll function.
// So KV().Commit() cannot run in parallel with toApply. It has to be called outside
// the go routine created below.
s.KV().Commit()

func (s *EtcdServer) snapshot(ep *etcdProgress, toDisk bool) {
lg := s.Logger()
d := GetMembershipInfoInV2Format(lg, s.cluster)
if toDisk {
s.Logger().Info(
"triggering snapshot",
zap.String("local-member-id", s.MemberID().String()),
zap.Uint64("local-member-applied-index", ep.appliedi),
zap.Uint64("local-member-snapshot-index", ep.diskSnapshotIndex),
zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
zap.Bool("snapshot-forced", s.forceDiskSnapshot),
serathius marked this conversation as resolved.
Show resolved Hide resolved
)
s.forceDiskSnapshot = false
// commit kv to write metadata (for example: consistent index) to disk.
//
// This guarantees that Backend's consistent_index is >= index of last snapshot.
//
// KV().commit() updates the consistent index in backend.
// All operations that update consistent index must be called sequentially
// from applyAll function.
// So KV().Commit() cannot run in parallel with toApply. It has to be called outside
// the go routine created below.
s.KV().Commit()
}
serathius marked this conversation as resolved.
Show resolved Hide resolved

// For backward compatibility, generate v2 snapshot from v3 state.
snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
snap, err := s.r.raftStorage.CreateSnapshot(ep.appliedi, &ep.confState, d)
if err != nil {
// the snapshot was done asynchronously with the progress of raft.
// raft might have already got a newer snapshot.
Expand All @@ -2153,21 +2163,25 @@
}
lg.Panic("failed to create snapshot", zap.Error(err))
}
ep.memorySnapshotIndex = ep.appliedi

verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex())

// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
if err = s.r.storage.SaveSnap(snap); err != nil {
lg.Panic("failed to save snapshot", zap.Error(err))
}
if err = s.r.storage.Release(snap); err != nil {
lg.Panic("failed to release wal", zap.Error(err))
}
if toDisk {
// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
if err = s.r.storage.SaveSnap(snap); err != nil {
lg.Panic("failed to save snapshot", zap.Error(err))

Check warning on line 2173 in server/etcdserver/server.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/server.go#L2173

Added line #L2173 was not covered by tests
}
ep.diskSnapshotIndex = ep.appliedi
if err = s.r.storage.Release(snap); err != nil {
lg.Panic("failed to release wal", zap.Error(err))

Check warning on line 2177 in server/etcdserver/server.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/server.go#L2177

Added line #L2177 was not covered by tests
}

lg.Info(
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
lg.Info(
"saved snapshot to disk",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
}
serathius marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *EtcdServer) compactRaftLog(snapi uint64) {
Expand All @@ -2188,7 +2202,6 @@
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
}

err := s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
Expand All @@ -2198,7 +2211,7 @@
}
lg.Panic("failed to compact", zap.Error(err))
}
lg.Info(
lg.Debug(
"compacted Raft logs",
zap.Uint64("compact-index", compacti),
)
Expand Down
73 changes: 57 additions & 16 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,8 +627,8 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
}
}

// TestSnapshot should snapshot the store and cut the persistent
func TestSnapshot(t *testing.T) {
// TestSnapshotDisk should save the snapshot to disk and release old snapshots
func TestSnapshotDisk(t *testing.T) {
revertFunc := verify.DisableVerifications()
defer revertFunc()

Expand Down Expand Up @@ -667,24 +667,65 @@ func TestSnapshot(t *testing.T) {
gaction, _ := p.Wait(2)
defer func() { ch <- struct{}{} }()

if len(gaction) != 2 {
t.Errorf("len(action) = %d, want 2", len(gaction))
return
}
if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) {
t.Errorf("action = %s, want SaveSnap", gaction[0])
}
assert.Len(t, gaction, 2)
assert.Equal(t, testutil.Action{Name: "SaveSnap"}, gaction[0])
assert.Equal(t, testutil.Action{Name: "Release"}, gaction[1])
}()
ep := etcdProgress{appliedi: 1, confState: raftpb.ConfState{Voters: []uint64{1}}}
srv.snapshot(&ep, true)
<-ch
assert.Empty(t, st.Action())
assert.Equal(t, uint64(1), ep.diskSnapshotIndex)
assert.Equal(t, uint64(1), ep.memorySnapshotIndex)
}

if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "Release"}) {
t.Errorf("action = %s, want Release", gaction[1])
}
func TestSnapshotMemory(t *testing.T) {
revertFunc := verify.DisableVerifications()
defer revertFunc()

be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)

s := raft.NewMemoryStorage()
s.Append([]raftpb.Entry{{Index: 1}})
st := mockstore.NewRecorderStream()
p := mockstorage.NewStorageRecorderStream("")
r := newRaftNode(raftNodeConfig{
lg: zaptest.NewLogger(t),
Node: newNodeNop(),
raftStorage: s,
storage: p,
})
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zaptest.NewLogger(t),
r: *r,
v2store: st,
consistIndex: cindex.NewConsistentIndex(be),
}
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
defer func() {
assert.NoError(t, srv.kv.Close())
}()
srv.be = be

srv.snapshot(1, raftpb.ConfState{Voters: []uint64{1}})
cl := membership.NewCluster(zaptest.NewLogger(t))
srv.cluster = cl

ch := make(chan struct{}, 1)

go func() {
gaction, _ := p.Wait(1)
defer func() { ch <- struct{}{} }()

assert.Empty(t, gaction)
}()
ep := etcdProgress{appliedi: 1, confState: raftpb.ConfState{Voters: []uint64{1}}}
srv.snapshot(&ep, false)
<-ch
if len(st.Action()) != 0 {
t.Errorf("no action expected on v2store. Got %d actions", len(st.Action()))
}
assert.Empty(t, st.Action())
assert.Equal(t, uint64(0), ep.diskSnapshotIndex)
assert.Equal(t, uint64(1), ep.memorySnapshotIndex)
}

// TestSnapshotOrdering ensures raft persists snapshot onto disk before
Expand Down
4 changes: 3 additions & 1 deletion tests/integration/v3_watch_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
//
// Since there is no way to confirm server has compacted the log, we
// use log monitor to watch and expect "compacted Raft logs" content.
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "compacted Raft logs", 2)
// In v3.6 we no longer generates "compacted Raft logs" log as raft compaction happens independently to snapshot.
// For now let's use snapshot log which should be equivalent to compaction.
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "saved snapshot to disk", 2)

// After RecoverPartition, leader L will send snapshot to slow F_m0
// follower, because F_m0(index:8) is 'out of date' compared to
Expand Down