Skip to content

Commit

Permalink
server/tso: fix the panic issue when getTS in the follower (#1930) (#…
Browse files Browse the repository at this point in the history
…1931)

Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch authored Nov 15, 2019
1 parent d14bd3d commit 6b7dc03
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 41 deletions.
40 changes: 1 addition & 39 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,45 +49,6 @@ func mustWaitLeader(c *C, svrs []*Server) *Server {
return leader
}

var _ = Suite(&testLeaderServerSuite{})

type testLeaderServerSuite struct {
svrs map[string]*Server
leaderPath string
}

func (s *testLeaderServerSuite) SetUpSuite(c *C) {
s.svrs = make(map[string]*Server)

cfgs := NewTestMultiConfig(c, 3)

ch := make(chan *Server, 3)
for i := 0; i < 3; i++ {
cfg := cfgs[i]

go func() {
svr, err := CreateServer(cfg, nil)
c.Assert(err, IsNil)
err = svr.Run(context.TODO())
c.Assert(err, IsNil)
ch <- svr
}()
}

for i := 0; i < 3; i++ {
svr := <-ch
s.svrs[svr.GetAddr()] = svr
s.leaderPath = svr.getLeaderPath()
}
}

func (s *testLeaderServerSuite) TearDownSuite(c *C) {
for _, svr := range s.svrs {
svr.Close()
cleanServer(svr.cfg)
}
}

var _ = Suite(&testServerSuite{})

type testServerSuite struct{}
Expand All @@ -100,6 +61,7 @@ func newTestServersWithCfgs(c *C, cfgs []*Config) ([]*Server, CleanupFunc) {
go func(cfg *Config) {
svr, err := CreateServer(cfg, nil)
c.Assert(err, IsNil)
c.Assert(svr, NotNil)
err = svr.Run(context.TODO())
c.Assert(err, IsNil)
ch <- svr
Expand Down
4 changes: 2 additions & 2 deletions server/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (s *Server) updateTimestamp() error {
return nil
}

const maxRetryCount = 100
var maxRetryCount = 100

func (s *Server) getRespTS(count uint32) (pdpb.Timestamp, error) {
var resp pdpb.Timestamp
Expand All @@ -225,7 +225,7 @@ func (s *Server) getRespTS(count uint32) (pdpb.Timestamp, error) {

for i := 0; i < maxRetryCount; i++ {
current := (*atomicObject)(atomic.LoadPointer(&s.ts))
if current.physical == zeroTime {
if current == nil || current.physical == zeroTime {
log.Error("we haven't synced timestamp ok, wait and retry", zap.Int("retry-count", i))
time.Sleep(200 * time.Millisecond)
continue
Expand Down
66 changes: 66 additions & 0 deletions server/tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package server

import (
"context"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -193,3 +194,68 @@ func mustGetLeader(c *C, client *clientv3.Client, leaderPath string) *pdpb.Membe
c.Fatal("get leader error")
return nil
}

var _ = Suite(&testFollowerTsoSuite{})

type testFollowerTsoSuite struct {
ctx context.Context
cancel context.CancelFunc
svrs []*Server
}

func (s *testFollowerTsoSuite) SetUpSuite(c *C) {
s.svrs = make([]*Server, 0, 2)

cfgs := NewTestMultiConfig(c, 2)
ch := make(chan *Server, 2)
for i := 0; i < 2; i++ {
cfg := cfgs[i]
go func() {
svr, err := CreateServer(cfg, nil)
c.Assert(err, IsNil)
c.Assert(svr, NotNil)
err = svr.Run(context.TODO())
c.Assert(err, IsNil)
ch <- svr
}()
}

for i := 0; i < 2; i++ {
svr := <-ch
s.svrs = append(s.svrs, svr)
}
mustWaitLeader(c, s.svrs)
}

func (s *testFollowerTsoSuite) TearDownSuite(c *C) {
for _, svr := range s.svrs {
svr.Close()
cleanServer(svr.cfg)
}
}

func (s *testFollowerTsoSuite) TestRequest(c *C) {
var err error

var followerServer *Server
for _, s := range s.svrs {
if !s.IsLeader() {
followerServer = s
}
}
c.Assert(followerServer, NotNil)
grpcPDClient := mustNewGrpcClient(c, followerServer.GetAddr())
clusterID := followerServer.ClusterID()

req := &pdpb.TsoRequest{Header: newRequestHeader(clusterID), Count: 1}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tsoClient, err := grpcPDClient.Tso(ctx)
c.Assert(err, IsNil)
defer tsoClient.CloseSend()
err = tsoClient.Send(req)
c.Assert(err, IsNil)
_, err = tsoClient.Recv()
c.Assert(err, NotNil)
c.Assert(strings.Contains(err.Error(), "not leader"), IsTrue)
}

0 comments on commit 6b7dc03

Please sign in to comment.