diff --git a/server/id.go b/server/id.go index 4e4b6145589..38785f3a782 100644 --- a/server/id.go +++ b/server/id.go @@ -39,13 +39,9 @@ func (alloc *idAllocator) Alloc() (uint64, error) { defer alloc.mu.Unlock() if alloc.base == alloc.end { - end, err := alloc.generate() - if err != nil { + if err := alloc.generateLocked(); err != nil { return 0, err } - - alloc.end = end - alloc.base = alloc.end - allocStep } alloc.base++ @@ -53,11 +49,19 @@ func (alloc *idAllocator) Alloc() (uint64, error) { return alloc.base, nil } -func (alloc *idAllocator) generate() (uint64, error) { +// Generate synchronizes and generates id range. +func (alloc *idAllocator) Generate() error { + alloc.mu.Lock() + defer alloc.mu.Unlock() + + return alloc.generateLocked() +} + +func (alloc *idAllocator) generateLocked() error { key := alloc.s.getAllocIDPath() value, err := getValue(alloc.s.client, key) if err != nil { - return 0, err + return err } var ( @@ -72,7 +76,7 @@ func (alloc *idAllocator) generate() (uint64, error) { // update the key end, err = bytesToUint64(value) if err != nil { - return 0, err + return err } cmp = clientv3.Compare(clientv3.Value(key), "=", string(value)) @@ -82,13 +86,15 @@ func (alloc *idAllocator) generate() (uint64, error) { value = uint64ToBytes(end) resp, err := alloc.s.leaderTxn(cmp).Then(clientv3.OpPut(key, string(value))).Commit() if err != nil { - return 0, err + return err } if !resp.Succeeded { - return 0, errors.New("generate id failed, we may not leader") + return errors.New("generate id failed, we may not leader") } log.Info("idAllocator allocates a new id", zap.Uint64("alloc-id", end)) metadataGauge.WithLabelValues("idalloc").Set(float64(end)) - return end, nil + alloc.end = end + alloc.base = end - allocStep + return nil } diff --git a/server/leader.go b/server/leader.go index 419fe8c0612..a34cdfaf048 100644 --- a/server/leader.go +++ b/server/leader.go @@ -275,6 +275,11 @@ func (s *Server) campaignLeader() error { } defer s.stopRaftCluster() + log.Info("sync id from etcd") + if err = s.idAlloc.Generate(); err != nil { + return err + } + s.enableLeader() defer s.disableLeader() diff --git a/server/server.go b/server/server.go index 63e978b9aba..6c200fdb825 100644 --- a/server/server.go +++ b/server/server.go @@ -495,6 +495,11 @@ func (s *Server) GetClassifier() namespace.Classifier { return s.classifier } +// GetAllocator returns the id allocator of this server. +func (s *Server) GetAllocator() core.IDAllocator { + return s.idAlloc +} + // txn returns an etcd client transaction wrapper. // The wrapper will set a request timeout to the context and log slow transactions. func (s *Server) txn() clientv3.Txn { diff --git a/tests/server/server_test.go b/tests/server/server_test.go index 4553a51a774..5004b7487c3 100644 --- a/tests/server/server_test.go +++ b/tests/server/server_test.go @@ -14,6 +14,7 @@ package server_test import ( + "context" "testing" . "github.com/pingcap/check" @@ -125,3 +126,79 @@ func (s *serverTestSuite) TestLeader(c *C) { return leader != leader1 }) } + +func (s *serverTestSuite) TestMonotonicID(c *C) { + var err error + cluster, err := tests.NewTestCluster(2) + defer cluster.Destroy() + c.Assert(err, IsNil) + + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + cluster.WaitLeader() + + leaderServer := cluster.GetServer(cluster.GetLeader()).GetServer() + var last1 uint64 + for i := uint64(0); i < 10; i++ { + id, err := leaderServer.GetAllocator().Alloc() + c.Assert(err, IsNil) + c.Assert(id, Greater, last1) + last1 = id + } + err = cluster.ResignLeader() + c.Assert(err, IsNil) + cluster.WaitLeader() + leaderServer = cluster.GetServer(cluster.GetLeader()).GetServer() + var last2 uint64 + for i := uint64(0); i < 10; i++ { + id, err := leaderServer.GetAllocator().Alloc() + c.Assert(err, IsNil) + c.Assert(id, Greater, last2) + last2 = id + } + err = cluster.ResignLeader() + c.Assert(err, IsNil) + cluster.WaitLeader() + leaderServer = cluster.GetServer(cluster.GetLeader()).GetServer() + id, err := leaderServer.GetAllocator().Alloc() + c.Assert(err, IsNil) + c.Assert(id, Greater, last2) + var last3 uint64 + for i := uint64(0); i < 1000; i++ { + id, err := leaderServer.GetAllocator().Alloc() + c.Assert(err, IsNil) + c.Assert(id, Greater, last3) + last3 = id + } +} + +func (s *serverTestSuite) TestPDRestart(c *C) { + cluster, err := tests.NewTestCluster(1) + c.Assert(err, IsNil) + defer cluster.Destroy() + + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + cluster.WaitLeader() + leaderServer := cluster.GetServer(cluster.GetLeader()) + leader := leaderServer.GetServer() + + var last uint64 + for i := uint64(0); i < 10; i++ { + id, err := leader.GetAllocator().Alloc() + c.Assert(err, IsNil) + c.Assert(id, Greater, last) + last = id + } + + c.Assert(leaderServer.Stop(), IsNil) + c.Assert(leaderServer.Run(context.TODO()), IsNil) + cluster.WaitLeader() + + for i := uint64(0); i < 10; i++ { + id, err := leader.GetAllocator().Alloc() + c.Assert(err, IsNil) + c.Assert(id, Greater, last) + last = id + } +}