From 428b77afc3c39e19038460d053318d09420e8fba Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 28 Feb 2015 20:33:28 -0800 Subject: [PATCH] etcdserver: keep a min number of entries in memory Do not aggressively compact raft log entries. After a snapshot, etcd server can compact the raft log upto snapshot index. etcd server compacts to an index smaller than snapshot to keep some entries in memory. The leader can still read out the in memory entries to send to a slightly slow follower. If all the entries are compacted, the leader will send the whole snapshot or read entries from disk if possible. --- etcdserver/raft.go | 9 +++++++++ etcdserver/server.go | 10 ++++++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 02e8d61ec30..a2525844f96 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -32,6 +32,15 @@ import ( "github.com/coreos/etcd/wal/walpb" ) +const ( + // Number of entries for slow follower to catch-up after compacting + // the raft storage entries. + // We expect the follower has a millisecond level latency with the leader. + // The max throughput is around 10K. Keep a 5K entries is enough for helping + // follower to catch up. + numberOfCatchUpEntries = 5000 +) + var ( // indirection for expvar func interface // expvar panics when publishing duplicate name diff --git a/etcdserver/server.go b/etcdserver/server.go index 3cefd27ff69..48862b0efd3 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -836,8 +836,14 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { if err := s.r.storage.SaveSnap(snap); err != nil { log.Fatalf("etcdserver: save snapshot error: %v", err) } + log.Printf("etcdserver: saved snapshot at index %d", snap.Metadata.Index) - err = s.r.raftStorage.Compact(snapi) + // keep some in memory log entries for slow followers. + compacti := uint64(1) + if snapi > numberOfCatchUpEntries { + compacti = snapi - numberOfCatchUpEntries + } + err = s.r.raftStorage.Compact(compacti) if err != nil { // the compaction was done asynchronously with the progress of raft. // raft log might already been compact. @@ -846,7 +852,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { } log.Panicf("etcdserver: unexpected compaction error %v", err) } - log.Printf("etcdserver: saved snapshot at index %d", snap.Metadata.Index) + log.Printf("etcdserver: compacted raft log at %d", compacti) }() }