diff --git a/server/server_test.go b/server/server_test.go index 8378e7c8f0b..bef4cca3d03 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -49,45 +49,6 @@ func mustWaitLeader(c *C, svrs []*Server) *Server { return leader } -var _ = Suite(&testLeaderServerSuite{}) - -type testLeaderServerSuite struct { - svrs map[string]*Server - leaderPath string -} - -func (s *testLeaderServerSuite) SetUpSuite(c *C) { - s.svrs = make(map[string]*Server) - - cfgs := NewTestMultiConfig(c, 3) - - ch := make(chan *Server, 3) - for i := 0; i < 3; i++ { - cfg := cfgs[i] - - go func() { - svr, err := CreateServer(cfg, nil) - c.Assert(err, IsNil) - err = svr.Run(context.TODO()) - c.Assert(err, IsNil) - ch <- svr - }() - } - - for i := 0; i < 3; i++ { - svr := <-ch - s.svrs[svr.GetAddr()] = svr - s.leaderPath = svr.getLeaderPath() - } -} - -func (s *testLeaderServerSuite) TearDownSuite(c *C) { - for _, svr := range s.svrs { - svr.Close() - cleanServer(svr.cfg) - } -} - var _ = Suite(&testServerSuite{}) type testServerSuite struct{} @@ -100,6 +61,7 @@ func newTestServersWithCfgs(c *C, cfgs []*Config) ([]*Server, CleanupFunc) { go func(cfg *Config) { svr, err := CreateServer(cfg, nil) c.Assert(err, IsNil) + c.Assert(svr, NotNil) err = svr.Run(context.TODO()) c.Assert(err, IsNil) ch <- svr diff --git a/server/tso.go b/server/tso.go index 27259914110..37e39260a30 100644 --- a/server/tso.go +++ b/server/tso.go @@ -214,7 +214,7 @@ func (s *Server) updateTimestamp() error { return nil } -const maxRetryCount = 100 +var maxRetryCount = 100 func (s *Server) getRespTS(count uint32) (pdpb.Timestamp, error) { var resp pdpb.Timestamp @@ -225,7 +225,7 @@ func (s *Server) getRespTS(count uint32) (pdpb.Timestamp, error) { for i := 0; i < maxRetryCount; i++ { current := (*atomicObject)(atomic.LoadPointer(&s.ts)) - if current.physical == zeroTime { + if current == nil || current.physical == zeroTime { log.Error("we haven't synced timestamp ok, wait and retry", zap.Int("retry-count", i)) time.Sleep(200 * time.Millisecond) continue diff --git a/server/tso_test.go b/server/tso_test.go index 7084b3e16ad..32d7d2103e4 100644 --- a/server/tso_test.go +++ b/server/tso_test.go @@ -15,6 +15,7 @@ package server import ( "context" + "strings" "sync" "time" @@ -193,3 +194,68 @@ func mustGetLeader(c *C, client *clientv3.Client, leaderPath string) *pdpb.Membe c.Fatal("get leader error") return nil } + +var _ = Suite(&testFollowerTsoSuite{}) + +type testFollowerTsoSuite struct { + ctx context.Context + cancel context.CancelFunc + svrs []*Server +} + +func (s *testFollowerTsoSuite) SetUpSuite(c *C) { + s.svrs = make([]*Server, 0, 2) + + cfgs := NewTestMultiConfig(c, 2) + ch := make(chan *Server, 2) + for i := 0; i < 2; i++ { + cfg := cfgs[i] + go func() { + svr, err := CreateServer(cfg, nil) + c.Assert(err, IsNil) + c.Assert(svr, NotNil) + err = svr.Run(context.TODO()) + c.Assert(err, IsNil) + ch <- svr + }() + } + + for i := 0; i < 2; i++ { + svr := <-ch + s.svrs = append(s.svrs, svr) + } + mustWaitLeader(c, s.svrs) +} + +func (s *testFollowerTsoSuite) TearDownSuite(c *C) { + for _, svr := range s.svrs { + svr.Close() + cleanServer(svr.cfg) + } +} + +func (s *testFollowerTsoSuite) TestRequest(c *C) { + var err error + + var followerServer *Server + for _, s := range s.svrs { + if !s.IsLeader() { + followerServer = s + } + } + c.Assert(followerServer, NotNil) + grpcPDClient := mustNewGrpcClient(c, followerServer.GetAddr()) + clusterID := followerServer.ClusterID() + + req := &pdpb.TsoRequest{Header: newRequestHeader(clusterID), Count: 1} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tsoClient, err := grpcPDClient.Tso(ctx) + c.Assert(err, IsNil) + defer tsoClient.CloseSend() + err = tsoClient.Send(req) + c.Assert(err, IsNil) + _, err = tsoClient.Recv() + c.Assert(err, NotNil) + c.Assert(strings.Contains(err.Error(), "not leader"), IsTrue) +}