Skip to content

Commit

Permalink
cherry pick tikv#3305 to release-3.0
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Jan 25, 2021
1 parent 635fef2 commit d6aad11
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 11 deletions.
28 changes: 17 additions & 11 deletions server/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,29 @@ func (alloc *idAllocator) Alloc() (uint64, error) {
defer alloc.mu.Unlock()

if alloc.base == alloc.end {
end, err := alloc.generate()
if err != nil {
if err := alloc.generateLocked(); err != nil {
return 0, err
}

alloc.end = end
alloc.base = alloc.end - allocStep
}

alloc.base++

return alloc.base, nil
}

func (alloc *idAllocator) generate() (uint64, error) {
// Generate synchronizes and generates id range.
func (alloc *idAllocator) Generate() error {
alloc.mu.Lock()
defer alloc.mu.Unlock()

return alloc.generateLocked()
}

func (alloc *idAllocator) generateLocked() error {
key := alloc.s.getAllocIDPath()
value, err := getValue(alloc.s.client, key)
if err != nil {
return 0, err
return err
}

var (
Expand All @@ -72,7 +76,7 @@ func (alloc *idAllocator) generate() (uint64, error) {
// update the key
end, err = bytesToUint64(value)
if err != nil {
return 0, err
return err
}

cmp = clientv3.Compare(clientv3.Value(key), "=", string(value))
Expand All @@ -82,13 +86,15 @@ func (alloc *idAllocator) generate() (uint64, error) {
value = uint64ToBytes(end)
resp, err := alloc.s.leaderTxn(cmp).Then(clientv3.OpPut(key, string(value))).Commit()
if err != nil {
return 0, err
return err
}
if !resp.Succeeded {
return 0, errors.New("generate id failed, we may not leader")
return errors.New("generate id failed, we may not leader")
}

log.Info("idAllocator allocates a new id", zap.Uint64("alloc-id", end))
metadataGauge.WithLabelValues("idalloc").Set(float64(end))
return end, nil
alloc.end = end
alloc.base = end - allocStep
return nil
}
5 changes: 5 additions & 0 deletions server/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,11 @@ func (s *Server) campaignLeader() error {
}
defer s.stopRaftCluster()

log.Info("sync id from etcd")
if err = s.idAlloc.Generate(); err != nil {
return err
}

s.enableLeader()
defer s.disableLeader()

Expand Down
5 changes: 5 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,11 @@ func (s *Server) GetClassifier() namespace.Classifier {
return s.classifier
}

// GetAllocator returns the id allocator of this server.
func (s *Server) GetAllocator() core.IDAllocator {
return s.idAlloc
}

// txn returns an etcd client transaction wrapper.
// The wrapper will set a request timeout to the context and log slow transactions.
func (s *Server) txn() clientv3.Txn {
Expand Down
77 changes: 77 additions & 0 deletions tests/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package server_test

import (
"context"
"testing"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -125,3 +126,79 @@ func (s *serverTestSuite) TestLeader(c *C) {
return leader != leader1
})
}

func (s *serverTestSuite) TestMonotonicID(c *C) {
var err error
cluster, err := tests.NewTestCluster(2)
defer cluster.Destroy()
c.Assert(err, IsNil)

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()

leaderServer := cluster.GetServer(cluster.GetLeader()).GetServer()
var last1 uint64
for i := uint64(0); i < 10; i++ {
id, err := leaderServer.GetAllocator().Alloc()
c.Assert(err, IsNil)
c.Assert(id, Greater, last1)
last1 = id
}
err = cluster.ResignLeader()
c.Assert(err, IsNil)
cluster.WaitLeader()
leaderServer = cluster.GetServer(cluster.GetLeader()).GetServer()
var last2 uint64
for i := uint64(0); i < 10; i++ {
id, err := leaderServer.GetAllocator().Alloc()
c.Assert(err, IsNil)
c.Assert(id, Greater, last2)
last2 = id
}
err = cluster.ResignLeader()
c.Assert(err, IsNil)
cluster.WaitLeader()
leaderServer = cluster.GetServer(cluster.GetLeader()).GetServer()
id, err := leaderServer.GetAllocator().Alloc()
c.Assert(err, IsNil)
c.Assert(id, Greater, last2)
var last3 uint64
for i := uint64(0); i < 1000; i++ {
id, err := leaderServer.GetAllocator().Alloc()
c.Assert(err, IsNil)
c.Assert(id, Greater, last3)
last3 = id
}
}

func (s *serverTestSuite) TestPDRestart(c *C) {
cluster, err := tests.NewTestCluster(1)
c.Assert(err, IsNil)
defer cluster.Destroy()

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()
leaderServer := cluster.GetServer(cluster.GetLeader())
leader := leaderServer.GetServer()

var last uint64
for i := uint64(0); i < 10; i++ {
id, err := leader.GetAllocator().Alloc()
c.Assert(err, IsNil)
c.Assert(id, Greater, last)
last = id
}

c.Assert(leaderServer.Stop(), IsNil)
c.Assert(leaderServer.Run(context.TODO()), IsNil)
cluster.WaitLeader()

for i := uint64(0); i < 10; i++ {
id, err := leader.GetAllocator().Alloc()
c.Assert(err, IsNil)
c.Assert(id, Greater, last)
last = id
}
}

0 comments on commit d6aad11

Please sign in to comment.