Skip to content

Commit

Permalink
server: set timeout for MoveLeader (tikv#1533)
Browse files Browse the repository at this point in the history
* server: set timeout for MoveLeader

Signed-off-by: disksing <i@disksing.com>
  • Loading branch information
disksing committed May 16, 2019
1 parent 67549be commit b8055f5
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 3 deletions.
15 changes: 12 additions & 3 deletions server/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (
"go.uber.org/zap"
)

// The timeout to wait transfer etcd leader to complete.
const moveLeaderTimeout = 5 * time.Second

// IsLeader returns whether the server is leader or not.
func (s *Server) IsLeader() bool {
// If server is not started. Both leaderID and ID could be 0.
Expand Down Expand Up @@ -144,7 +147,7 @@ func (s *Server) etcdLeaderLoop() {
break
}
if myPriority > leaderPriority {
err := s.etcd.Server.MoveLeader(ctx, etcdLeader, s.ID())
err := s.MoveEtcdLeader(ctx, etcdLeader, s.ID())
if err != nil {
log.Error("failed to transfer etcd leader", zap.Error(err))
} else {
Expand All @@ -160,6 +163,13 @@ func (s *Server) etcdLeaderLoop() {
}
}

// MoveEtcdLeader tries to transfer etcd leader.
func (s *Server) MoveEtcdLeader(ctx context.Context, old, new uint64) error {
moveCtx, cancel := context.WithTimeout(ctx, moveLeaderTimeout)
defer cancel()
return errors.WithStack(s.etcd.Server.MoveLeader(moveCtx, old, new))
}

// getLeader gets server leader from etcd.
func getLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, int64, error) {
leader := &pdpb.Member{}
Expand Down Expand Up @@ -373,8 +383,7 @@ func (s *Server) ResignLeader(nextLeader string) error {
return errors.New("no valid pd to transfer leader")
}
nextLeaderID := leaderIDs[rand.Intn(len(leaderIDs))]
err = s.etcd.Server.MoveLeader(s.serverLoopCtx, s.ID(), nextLeaderID)
return errors.WithStack(err)
return s.MoveEtcdLeader(s.serverLoopCtx, s.ID(), nextLeaderID)
}

func (s *Server) deleteLeaderKey() error {
Expand Down
19 changes: 19 additions & 0 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,25 @@ func (s *TestServer) GetEtcdLeader() (string, error) {
return members.GetEtcdLeader().GetName(), nil
}

// GetEtcdLeaderID returns the builtin etcd leader ID.
func (s *TestServer) GetEtcdLeaderID() (uint64, error) {
s.RLock()
defer s.RUnlock()
req := &pdpb.GetMembersRequest{Header: &pdpb.RequestHeader{ClusterId: s.server.ClusterID()}}
members, err := s.server.GetMembers(context.TODO(), req)
if err != nil {
return 0, errors.WithStack(err)
}
return members.GetEtcdLeader().GetMemberId(), nil
}

// MoveEtcdLeader moves etcd leader from old to new.
func (s *TestServer) MoveEtcdLeader(old, new uint64) error {
s.RLock()
defer s.RUnlock()
return s.server.MoveEtcdLeader(context.Background(), old, new)
}

// GetEtcdClient returns the builtin etcd client.
func (s *TestServer) GetEtcdClient() *clientv3.Client {
s.RLock()
Expand Down
60 changes: 60 additions & 0 deletions tests/server/move_leader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package server_test

import (
"sync"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/pd/tests"
)

func (s *serverTestSuite) TestMoveLeader(c *C) {
c.Parallel()

cluster, err := tests.NewTestCluster(5)
c.Assert(err, IsNil)
defer cluster.Destroy()

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()

var wg sync.WaitGroup
wg.Add(5)
for _, s := range cluster.GetServers() {
go func(s *tests.TestServer) {
defer wg.Done()
if s.IsLeader() {
s.ResignLeader()
} else {
old, _ := s.GetEtcdLeaderID()
s.MoveEtcdLeader(old, s.GetServerID())
}
}(s)
}

done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()

select {
case <-done:
case <-time.After(10 * time.Second):
c.Fatal("move etcd leader does not return in 10 seconds")
}
}

0 comments on commit b8055f5

Please sign in to comment.