diff --git a/pkg/member/member.go b/pkg/member/member.go index 24f6eea4b54..80332a65f94 100644 --- a/pkg/member/member.go +++ b/pkg/member/member.go @@ -58,6 +58,8 @@ type EmbeddedEtcdMember struct { // etcd leader key when the PD node is successfully elected as the PD leader // of the cluster. Every write will use it to check PD leadership. memberValue string + // lastLeaderUpdatedTime is the last time when the leader is updated. + lastLeaderUpdatedTime atomic.Value } // NewMember create a new Member. @@ -140,11 +142,13 @@ func (m *EmbeddedEtcdMember) GetLeader() *pdpb.Member { // setLeader sets the member's PD leader. func (m *EmbeddedEtcdMember) setLeader(member *pdpb.Member) { m.leader.Store(member) + m.lastLeaderUpdatedTime.Store(time.Now()) } // unsetLeader unsets the member's PD leader. func (m *EmbeddedEtcdMember) unsetLeader() { m.leader.Store(&pdpb.Member{}) + m.lastLeaderUpdatedTime.Store(time.Now()) } // EnableLeader sets the member itself to a PD leader. @@ -162,6 +166,15 @@ func (m *EmbeddedEtcdMember) GetLeadership() *election.Leadership { return m.leadership } +// GetLastLeaderUpdatedTime returns the last time when the leader is updated. +func (m *EmbeddedEtcdMember) GetLastLeaderUpdatedTime() time.Time { + lastLeaderUpdatedTime := m.lastLeaderUpdatedTime.Load() + if lastLeaderUpdatedTime == nil { + return time.Time{} + } + return lastLeaderUpdatedTime.(time.Time) +} + // CampaignLeader is used to campaign a PD member's leadership // and make it become a PD leader. func (m *EmbeddedEtcdMember) CampaignLeader(leaseTimeout int64) error { diff --git a/pkg/member/participant.go b/pkg/member/participant.go index 41cdef77004..102bfcbce5f 100644 --- a/pkg/member/participant.go +++ b/pkg/member/participant.go @@ -51,6 +51,8 @@ type Participant struct { // campaignChecker is used to check whether the additional constraints for a // campaign are satisfied. If it returns false, the campaign will fail. campaignChecker atomic.Value // Store as leadershipCheckFunc + // lastLeaderUpdatedTime is the last time when the leader is updated. + lastLeaderUpdatedTime atomic.Value } // NewParticipant create a new Participant. @@ -78,6 +80,7 @@ func (m *Participant) InitInfo(name string, id uint64, rootPath string, leaderNa m.rootPath = rootPath m.leaderPath = path.Join(rootPath, leaderName) m.leadership = election.NewLeadership(m.client, m.GetLeaderPath(), purpose) + m.lastLeaderUpdatedTime.Store(time.Now()) log.Info("participant joining election", zap.Stringer("participant-info", m.member), zap.String("leader-path", m.leaderPath)) } @@ -143,11 +146,13 @@ func (m *Participant) GetLeader() *tsopb.Participant { // setLeader sets the member's leader. func (m *Participant) setLeader(member *tsopb.Participant) { m.leader.Store(member) + m.lastLeaderUpdatedTime.Store(time.Now()) } // unsetLeader unsets the member's leader. func (m *Participant) unsetLeader() { m.leader.Store(&tsopb.Participant{}) + m.lastLeaderUpdatedTime.Store(time.Now()) } // EnableLeader declares the member itself to be the leader. @@ -160,6 +165,15 @@ func (m *Participant) GetLeaderPath() string { return m.leaderPath } +// GetLastLeaderUpdatedTime returns the last time when the leader is updated. +func (m *Participant) GetLastLeaderUpdatedTime() time.Time { + lastLeaderUpdatedTime := m.lastLeaderUpdatedTime.Load() + if lastLeaderUpdatedTime == nil { + return time.Time{} + } + return lastLeaderUpdatedTime.(time.Time) +} + // GetLeadership returns the leadership of the member. func (m *Participant) GetLeadership() *election.Leadership { return m.leadership diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 464019d63c7..a780e7da74e 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -136,6 +136,8 @@ type ElectionMember interface { GetLeaderPath() string // GetLeadership returns the leadership of the election member. GetLeadership() *election.Leadership + // GetLastLeaderUpdatedTime returns the last time when the leader is updated. + GetLastLeaderUpdatedTime() time.Time // GetDCLocationPathPrefix returns the dc-location path prefix of the cluster. GetDCLocationPathPrefix() string // GetDCLocationPath returns the dc-location path of a member with the given member ID. diff --git a/server/server.go b/server/server.go index 4e49faa6b24..619049cac79 100644 --- a/server/server.go +++ b/server/server.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "fmt" + "math/rand" "net/http" "os" "path" @@ -105,6 +106,9 @@ const ( maxRetryTimesGetServicePrimary = 25 // retryIntervalGetServicePrimary is the retry interval for getting primary addr. retryIntervalGetServicePrimary = 100 * time.Millisecond + + lostPDLeaderMaxTimeoutSecs = 10 + lostPDLeaderReElectionFactor = 10 ) // EtcdStartTimeout the timeout of the startup etcd. @@ -1457,6 +1461,14 @@ func (s *Server) leaderLoop() { } leader, checkAgain := s.member.CheckLeader() + // add failpoint to test leader check go to stuck. + failpoint.Inject("leaderLoopCheckAgain", func(val failpoint.Value) { + memberString := val.(string) + memberID, _ := strconv.ParseUint(memberString, 10, 64) + if s.member.ID() == memberID { + checkAgain = true + } + }) if checkAgain { continue } @@ -1484,6 +1496,25 @@ func (s *Server) leaderLoop() { // To make sure the etcd leader and PD leader are on the same server. etcdLeader := s.member.GetEtcdLeader() if etcdLeader != s.member.ID() { + if s.member.GetLeader() == nil { + lastUpdated := s.member.GetLastLeaderUpdatedTime() + // use random timeout to avoid leader campaigning storm. + randomTimeout := time.Duration(rand.Intn(int(lostPDLeaderMaxTimeoutSecs)))*time.Second + lostPDLeaderMaxTimeoutSecs*time.Second + lostPDLeaderReElectionFactor*s.cfg.ElectionInterval.Duration + // add failpoint to test the campaign leader logic. + failpoint.Inject("timeoutWaitPDLeader", func() { + log.Info("timeoutWaitPDLeader is injected, skip wait other etcd leader be etcd leader") + randomTimeout = time.Duration(rand.Intn(10))*time.Millisecond + 100*time.Millisecond + }) + if lastUpdated.Add(randomTimeout).Before(time.Now()) && !lastUpdated.IsZero() && etcdLeader != 0 { + log.Info("the pd leader is lost for a long time, try to re-campaign a pd leader with resign etcd leader", + zap.Duration("timeout", randomTimeout), + zap.Time("last-updated", lastUpdated), + zap.String("current-leader-member-id", types.ID(etcdLeader).String()), + zap.String("transferee-member-id", types.ID(s.member.ID()).String()), + ) + s.member.MoveEtcdLeader(s.ctx, etcdLeader, s.member.ID()) + } + } log.Info("skip campaigning of pd leader and check later", zap.String("server-name", s.Name()), zap.Uint64("etcd-leader-id", etcdLeader), @@ -1600,6 +1631,16 @@ func (s *Server) campaignLeader() { log.Info("no longer a leader because lease has expired, pd leader will step down") return } + // add failpoint to test exit leader, failpoint judge the member is the give value, then break + failpoint.Inject("exitCampaignLeader", func(val failpoint.Value) { + memberString := val.(string) + memberID, _ := strconv.ParseUint(memberString, 10, 64) + if s.member.ID() == memberID { + log.Info("exit PD leader") + failpoint.Return() + } + }) + etcdLeader := s.member.GetEtcdLeader() if etcdLeader != s.member.ID() { log.Info("etcd leader changed, resigns pd leadership", zap.String("old-pd-leader-name", s.Name())) diff --git a/tests/server/member/member_test.go b/tests/server/member/member_test.go index 1cdc267a99a..ca89e66a041 100644 --- a/tests/server/member/member_test.go +++ b/tests/server/member/member_test.go @@ -248,6 +248,30 @@ func TestLeaderResignWithBlock(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/server/raftclusterIsBusy")) } +func TestPDLeaderLostWhileEtcdLeaderIntact(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 2) + defer cluster.Destroy() + re.NoError(err) + + err = cluster.RunInitialServers() + re.NoError(err) + + leader1 := cluster.WaitLeader() + memberID := cluster.GetServer(leader1).GetLeader().GetMemberId() + + re.NoError(failpoint.Enable("github.com/tikv/pd/server/leaderLoopCheckAgain", fmt.Sprintf("return(\"%d\")", memberID))) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/exitCampaignLeader", fmt.Sprintf("return(\"%d\")", memberID))) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/timeoutWaitPDLeader", `return(true)`)) + leader2 := waitLeaderChange(re, cluster, leader1) + re.NotEqual(leader1, leader2) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/leaderLoopCheckAgain")) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/exitCampaignLeader")) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/timeoutWaitPDLeader")) +} + func waitLeaderChange(re *require.Assertions, cluster *tests.TestCluster, old string) string { var leader string testutil.Eventually(re, func() bool {