Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#6447
Browse files Browse the repository at this point in the history
close tikv#6403

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
nolouch authored and ti-chi-bot committed May 12, 2023
1 parent e77b5ad commit 3478768
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 0 deletions.
13 changes: 13 additions & 0 deletions pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type EmbeddedEtcdMember struct {
// etcd leader key when the PD node is successfully elected as the PD leader
// of the cluster. Every write will use it to check PD leadership.
memberValue string
// lastLeaderUpdatedTime is the last time when the leader is updated.
lastLeaderUpdatedTime atomic.Value
}

// NewMember create a new Member.
Expand Down Expand Up @@ -140,11 +142,13 @@ func (m *EmbeddedEtcdMember) GetLeader() *pdpb.Member {
// setLeader sets the member's PD leader.
func (m *EmbeddedEtcdMember) setLeader(member *pdpb.Member) {
m.leader.Store(member)
m.lastLeaderUpdatedTime.Store(time.Now())
}

// unsetLeader unsets the member's PD leader.
func (m *EmbeddedEtcdMember) unsetLeader() {
m.leader.Store(&pdpb.Member{})
m.lastLeaderUpdatedTime.Store(time.Now())
}

// EnableLeader sets the member itself to a PD leader.
Expand All @@ -162,6 +166,15 @@ func (m *EmbeddedEtcdMember) GetLeadership() *election.Leadership {
return m.leadership
}

// GetLastLeaderUpdatedTime returns the last time when the leader is updated.
func (m *EmbeddedEtcdMember) GetLastLeaderUpdatedTime() time.Time {
lastLeaderUpdatedTime := m.lastLeaderUpdatedTime.Load()
if lastLeaderUpdatedTime == nil {
return time.Time{}
}
return lastLeaderUpdatedTime.(time.Time)
}

// CampaignLeader is used to campaign a PD member's leadership
// and make it become a PD leader.
func (m *EmbeddedEtcdMember) CampaignLeader(leaseTimeout int64) error {
Expand Down
24 changes: 24 additions & 0 deletions pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,17 @@ type Participant struct {
// leader key when this participant is successfully elected as the leader of
// the group. Every write will use it to check the leadership.
memberValue string
<<<<<<< HEAD
// preCampaignChecker is called before the campaign. If it returns false, the
// campaign will be skipped.
preCampaignChecker leadershipCheckFunc
=======
// campaignChecker is used to check whether the additional constraints for a
// campaign are satisfied. If it returns false, the campaign will fail.
campaignChecker atomic.Value // Store as leadershipCheckFunc
// lastLeaderUpdatedTime is the last time when the leader is updated.
lastLeaderUpdatedTime atomic.Value
>>>>>>> 3e4056406 (server: fix the leader cannot election after pd leader lost while etcd leader intact (#6447))
}

// NewParticipant create a new Participant.
Expand Down Expand Up @@ -78,7 +86,12 @@ func (m *Participant) InitInfo(name string, id uint64, rootPath string, leaderNa
m.rootPath = rootPath
m.leaderPath = path.Join(rootPath, leaderName)
m.leadership = election.NewLeadership(m.client, m.GetLeaderPath(), purpose)
<<<<<<< HEAD
log.Info("Participant joining election", zap.Stringer("participant-info", m.member), zap.String("leader-path", m.leaderPath))
=======
m.lastLeaderUpdatedTime.Store(time.Now())
log.Info("participant joining election", zap.Stringer("participant-info", m.member), zap.String("leader-path", m.leaderPath))
>>>>>>> 3e4056406 (server: fix the leader cannot election after pd leader lost while etcd leader intact (#6447))
}

// ID returns the unique ID for this participant in the election group
Expand Down Expand Up @@ -143,11 +156,13 @@ func (m *Participant) GetLeader() *tsopb.Participant {
// setLeader sets the member's leader.
func (m *Participant) setLeader(member *tsopb.Participant) {
m.leader.Store(member)
m.lastLeaderUpdatedTime.Store(time.Now())
}

// unsetLeader unsets the member's leader.
func (m *Participant) unsetLeader() {
m.leader.Store(&tsopb.Participant{})
m.lastLeaderUpdatedTime.Store(time.Now())
}

// EnableLeader declares the member itself to be the leader.
Expand All @@ -160,6 +175,15 @@ func (m *Participant) GetLeaderPath() string {
return m.leaderPath
}

// GetLastLeaderUpdatedTime returns the last time when the leader is updated.
func (m *Participant) GetLastLeaderUpdatedTime() time.Time {
lastLeaderUpdatedTime := m.lastLeaderUpdatedTime.Load()
if lastLeaderUpdatedTime == nil {
return time.Time{}
}
return lastLeaderUpdatedTime.(time.Time)
}

// GetLeadership returns the leadership of the member.
func (m *Participant) GetLeadership() *election.Leadership {
return m.leadership
Expand Down
2 changes: 2 additions & 0 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ type ElectionMember interface {
GetLeaderPath() string
// GetLeadership returns the leadership of the PD member.
GetLeadership() *election.Leadership
// GetLastLeaderUpdatedTime returns the last time when the leader is updated.
GetLastLeaderUpdatedTime() time.Time
// GetDCLocationPathPrefix returns the dc-location path prefix of the cluster.
GetDCLocationPathPrefix() string
// GetDCLocationPath returns the dc-location path of a member with the given member ID.
Expand Down
44 changes: 44 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"
"math/rand"
"net/http"
"os"
"path"
Expand Down Expand Up @@ -104,8 +105,14 @@ const (
maxRetryTimesGetServicePrimary = 25
// retryIntervalGetServicePrimary is the retry interval for getting primary addr.
retryIntervalGetServicePrimary = 100 * time.Millisecond
<<<<<<< HEAD
// TODO: move it to etcdutil
watchEtcdChangeRetryInterval = 1 * time.Second
=======

lostPDLeaderMaxTimeoutSecs = 10
lostPDLeaderReElectionFactor = 10
>>>>>>> 3e4056406 (server: fix the leader cannot election after pd leader lost while etcd leader intact (#6447))
)

// EtcdStartTimeout the timeout of the startup etcd.
Expand Down Expand Up @@ -1408,6 +1415,14 @@ func (s *Server) leaderLoop() {
}

leader, checkAgain := s.member.CheckLeader()
// add failpoint to test leader check go to stuck.
failpoint.Inject("leaderLoopCheckAgain", func(val failpoint.Value) {
memberString := val.(string)
memberID, _ := strconv.ParseUint(memberString, 10, 64)
if s.member.ID() == memberID {
checkAgain = true
}
})
if checkAgain {
continue
}
Expand Down Expand Up @@ -1435,6 +1450,25 @@ func (s *Server) leaderLoop() {
// To make sure the etcd leader and PD leader are on the same server.
etcdLeader := s.member.GetEtcdLeader()
if etcdLeader != s.member.ID() {
if s.member.GetLeader() == nil {
lastUpdated := s.member.GetLastLeaderUpdatedTime()
// use random timeout to avoid leader campaigning storm.
randomTimeout := time.Duration(rand.Intn(int(lostPDLeaderMaxTimeoutSecs)))*time.Second + lostPDLeaderMaxTimeoutSecs*time.Second + lostPDLeaderReElectionFactor*s.cfg.ElectionInterval.Duration
// add failpoint to test the campaign leader logic.
failpoint.Inject("timeoutWaitPDLeader", func() {
log.Info("timeoutWaitPDLeader is injected, skip wait other etcd leader be etcd leader")
randomTimeout = time.Duration(rand.Intn(10))*time.Millisecond + 100*time.Millisecond
})
if lastUpdated.Add(randomTimeout).Before(time.Now()) && !lastUpdated.IsZero() && etcdLeader != 0 {
log.Info("the pd leader is lost for a long time, try to re-campaign a pd leader with resign etcd leader",
zap.Duration("timeout", randomTimeout),
zap.Time("last-updated", lastUpdated),
zap.String("current-leader-member-id", types.ID(etcdLeader).String()),
zap.String("transferee-member-id", types.ID(s.member.ID()).String()),
)
s.member.MoveEtcdLeader(s.ctx, etcdLeader, s.member.ID())
}
}
log.Info("skip campaigning of pd leader and check later",
zap.String("server-name", s.Name()),
zap.Uint64("etcd-leader-id", etcdLeader),
Expand Down Expand Up @@ -1551,6 +1585,16 @@ func (s *Server) campaignLeader() {
log.Info("no longer a leader because lease has expired, pd leader will step down")
return
}
// add failpoint to test exit leader, failpoint judge the member is the give value, then break
failpoint.Inject("exitCampaignLeader", func(val failpoint.Value) {
memberString := val.(string)
memberID, _ := strconv.ParseUint(memberString, 10, 64)
if s.member.ID() == memberID {
log.Info("exit PD leader")
failpoint.Return()
}
})

etcdLeader := s.member.GetEtcdLeader()
if etcdLeader != s.member.ID() {
log.Info("etcd leader changed, resigns pd leadership", zap.String("old-pd-leader-name", s.Name()))
Expand Down
24 changes: 24 additions & 0 deletions tests/server/member/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,30 @@ func TestLeaderResignWithBlock(t *testing.T) {
re.NoError(failpoint.Disable("github.com/tikv/pd/server/raftclusterIsBusy"))
}

func TestPDLeaderLostWhileEtcdLeaderIntact(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 2)
defer cluster.Destroy()
re.NoError(err)

err = cluster.RunInitialServers()
re.NoError(err)

leader1 := cluster.WaitLeader()
memberID := cluster.GetServer(leader1).GetLeader().GetMemberId()

re.NoError(failpoint.Enable("github.com/tikv/pd/server/leaderLoopCheckAgain", fmt.Sprintf("return(\"%d\")", memberID)))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/exitCampaignLeader", fmt.Sprintf("return(\"%d\")", memberID)))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/timeoutWaitPDLeader", `return(true)`))
leader2 := waitLeaderChange(re, cluster, leader1)
re.NotEqual(leader1, leader2)
re.NoError(failpoint.Disable("github.com/tikv/pd/server/leaderLoopCheckAgain"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/exitCampaignLeader"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/timeoutWaitPDLeader"))
}

func waitLeaderChange(re *require.Assertions, cluster *tests.TestCluster, old string) string {
var leader string
testutil.Eventually(re, func() bool {
Expand Down

0 comments on commit 3478768

Please sign in to comment.