From 5b04088775df0ed076790cb05826980b20b81acc Mon Sep 17 00:00:00 2001 From: nolouch Date: Wed, 26 Dec 2018 12:01:35 +0800 Subject: [PATCH 1/2] server/leader: use the compact revision to watch leader Signed-off-by: nolouch --- pkg/integration_test/leader_watch_test.go | 2 +- server/leader.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/integration_test/leader_watch_test.go b/pkg/integration_test/leader_watch_test.go index 350a4f12477..9c88c814de9 100644 --- a/pkg/integration_test/leader_watch_test.go +++ b/pkg/integration_test/leader_watch_test.go @@ -25,7 +25,7 @@ import ( func (s *integrationTestSuite) TestWatcher(c *C) { c.Parallel() - cluster, err := newTestCluster(1) + cluster, err := newTestCluster(1, func(conf *server.Config) { conf.AutoCompactionRetention = "1s" }) c.Assert(err, IsNil) defer cluster.Destroy() diff --git a/server/leader.go b/server/leader.go index 2b4f8a98643..7fc58ee2e25 100644 --- a/server/leader.go +++ b/server/leader.go @@ -314,8 +314,8 @@ func (s *Server) watchLeader(leader *pdpb.Member, revision int64) { for wresp := range rch { // meet compacted error, use current revision. if wresp.CompactRevision != 0 { - log.Warnf("required revision %d has been compacted, use current revision", revision) - revision = 0 + log.Warnf("required revision %d has been compacted, use the compact revision %d", revision, wresp.CompactRevision) + revision = wresp.CompactRevision break } if wresp.Canceled { From b61e0d61d94f8fa98e207d94c5b330c2fc19e30a Mon Sep 17 00:00:00 2001 From: nolouch Date: Thu, 27 Dec 2018 11:57:29 +0800 Subject: [PATCH 2/2] address comment Signed-off-by: nolouch --- server/leader.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/leader.go b/server/leader.go index 7fc58ee2e25..7e04bb16188 100644 --- a/server/leader.go +++ b/server/leader.go @@ -308,11 +308,14 @@ func (s *Server) watchLeader(leader *pdpb.Member, revision int64) { defer s.cluster.regionSyncer.StopSyncWithLeader() } + // The revision is the revision of last modification on this key. + // If the revision is compacted, will meet required revision has been compacted error. + // In this case, use the compact revision to re-watch the key. for { // gofail: var delayWatcher struct{} rch := watcher.Watch(ctx, s.getLeaderPath(), clientv3.WithRev(revision)) for wresp := range rch { - // meet compacted error, use current revision. + // meet compacted error, use the compact revision. if wresp.CompactRevision != 0 { log.Warnf("required revision %d has been compacted, use the compact revision %d", revision, wresp.CompactRevision) revision = wresp.CompactRevision