Skip to content

Commit

Permalink
server/leader: use the compact revision to watch leader (#1396) (#1404)
Browse files Browse the repository at this point in the history
* server/leader: use the compact revision to watch leader

Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch authored and rleungx committed Jan 9, 2019
1 parent 1fdf717 commit b1a419a
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pkg/integration_test/leader_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

func (s *integrationTestSuite) TestWatcher(c *C) {
c.Parallel()
cluster, err := newTestCluster(1)
cluster, err := newTestCluster(1, func(conf *server.Config) { conf.AutoCompactionRetention = "1s" })
c.Assert(err, IsNil)
defer cluster.Destroy()

Expand Down
9 changes: 6 additions & 3 deletions server/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,17 @@ func (s *Server) watchLeader(leader *pdpb.Member, revision int64) {
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()

// The revision is the revision of last modification on this key.
// If the revision is compacted, will meet required revision has been compacted error.
// In this case, use the compact revision to re-watch the key.
for {
// gofail: var delayWatcher struct{}
rch := watcher.Watch(ctx, s.getLeaderPath(), clientv3.WithRev(revision))
for wresp := range rch {
// meet compacted error, use current revision.
// meet compacted error, use the compact revision.
if wresp.CompactRevision != 0 {
log.Warnf("required revision %d has been compacted, use current revision", revision)
revision = 0
log.Warnf("required revision %d has been compacted, use the compact revision %d", revision, wresp.CompactRevision)
revision = wresp.CompactRevision
break
}
if wresp.Canceled {
Expand Down

0 comments on commit b1a419a

Please sign in to comment.