From efe70b2cecde8b3b3227dff3d36e4203e445f898 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 8 Aug 2018 11:16:07 -0700 Subject: [PATCH 1/2] Make snapshot compaction relative to number of nodes --- serf/snapshot.go | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/serf/snapshot.go b/serf/snapshot.go index 92ae35a53..b3feffc9a 100644 --- a/serf/snapshot.go +++ b/serf/snapshot.go @@ -45,6 +45,13 @@ const ( // shutdownFlushTimeout is the time limit to write pending events to the snapshot during a shutdown shutdownFlushTimeout = 250 * time.Millisecond + + // snapshotBytesPerNode is an estimated bytes per node to snapshot + snapshotBytesPerNode = 128 + + // snapshotCompactionThreshold is the threshold we apply to + // the snapshot size estimate (nodes * bytes per node) before compacting. + snapshotCompactionThreshold = 3 ) // Snapshotter is responsible for ingesting events and persisting @@ -63,7 +70,7 @@ type Snapshotter struct { leaveCh chan struct{} leaving bool logger *log.Logger - maxSize int64 + minCompactSize int64 path string offset int64 outCh chan<- Event @@ -90,7 +97,7 @@ func (p PreviousNode) String() string { // Setting rejoinAfterLeave makes leave not clear the state, and can be used // if you intend to rejoin the same cluster after a leave. func NewSnapshotter(path string, - maxSize int, + minCompactSize int, rejoinAfterLeave bool, logger *log.Logger, clock *LamportClock, @@ -126,7 +133,7 @@ func NewSnapshotter(path string, lastQueryClock: 0, leaveCh: make(chan struct{}), logger: logger, - maxSize: int64(maxSize), + minCompactSize: int64(minCompactSize), path: path, offset: offset, outCh: outCh, @@ -401,12 +408,25 @@ func (s *Snapshotter) appendLine(l string) error { // Check if a compaction is necessary s.offset += int64(n) - if s.offset > s.maxSize { + if s.offset > s.snapshotMaxSize() { return s.compact() } return nil } +// snapshotMaxSize computes the maximum size and is used to force periodic compaction. +func (s *Snapshotter) snapshotMaxSize() int64 { + nodes := int64(len(s.aliveNodes)) + estSize := nodes * snapshotBytesPerNode + threshold := estSize * snapshotCompactionThreshold + + // Apply a minimum threshold to avoid frequent compaction + if threshold < s.minCompactSize { + threshold = s.minCompactSize + } + return threshold +} + // Compact is used to compact the snapshot once it is too large func (s *Snapshotter) compact() error { defer metrics.MeasureSince([]string{"serf", "snapshot", "compact"}, time.Now()) From e3e1db683fcbc46c025036a87fd822239f04eb28 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 8 Aug 2018 12:47:03 -0700 Subject: [PATCH 2/2] Less conservative compaction threshold --- serf/snapshot.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/serf/snapshot.go b/serf/snapshot.go index b3feffc9a..d2eda0ea2 100644 --- a/serf/snapshot.go +++ b/serf/snapshot.go @@ -51,7 +51,7 @@ const ( // snapshotCompactionThreshold is the threshold we apply to // the snapshot size estimate (nodes * bytes per node) before compacting. - snapshotCompactionThreshold = 3 + snapshotCompactionThreshold = 2 ) // Snapshotter is responsible for ingesting events and persisting