Skip to content

Commit

Permalink
id: fix the id allocator is not monotonic (#3305) (#3306)
Browse files Browse the repository at this point in the history
* cherry pick #3305 to release-5.0-rc

Signed-off-by: Ryan Leung <rleungx@gmail.com>

* cherry pick #3322 to release-5.0-rc

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: Ryan Leung <rleungx@gmail.com>
Co-authored-by: ShuNing <nolouch@gmail.com>
Co-authored-by: Ti Prow Robot <71242396+ti-community-prow-bot@users.noreply.github.com>
  • Loading branch information
4 people authored Jan 5, 2021
1 parent 9efcf3a commit f4e7924
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 15 deletions.
28 changes: 17 additions & 11 deletions server/id/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,29 @@ func (alloc *AllocatorImpl) Alloc() (uint64, error) {
defer alloc.mu.Unlock()

if alloc.base == alloc.end {
end, err := alloc.generate()
if err != nil {
if err := alloc.generateLocked(); err != nil {
return 0, err
}

alloc.end = end
alloc.base = alloc.end - allocStep
}

alloc.base++

return alloc.base, nil
}

func (alloc *AllocatorImpl) generate() (uint64, error) {
// Generate synchronizes and generates id range.
func (alloc *AllocatorImpl) Generate() error {
alloc.mu.Lock()
defer alloc.mu.Unlock()

return alloc.generateLocked()
}

func (alloc *AllocatorImpl) generateLocked() error {
key := alloc.getAllocIDPath()
value, err := etcdutil.GetValue(alloc.client, key)
if err != nil {
return 0, err
return err
}

var (
Expand All @@ -88,7 +92,7 @@ func (alloc *AllocatorImpl) generate() (uint64, error) {
// update the key
end, err = typeutil.BytesToUint64(value)
if err != nil {
return 0, err
return err
}

cmp = clientv3.Compare(clientv3.Value(key), "=", string(value))
Expand All @@ -101,15 +105,17 @@ func (alloc *AllocatorImpl) generate() (uint64, error) {
t := txn.If(append([]clientv3.Cmp{cmp}, clientv3.Compare(clientv3.Value(leaderPath), "=", alloc.member))...)
resp, err := t.Then(clientv3.OpPut(key, string(value))).Commit()
if err != nil {
return 0, errs.ErrEtcdTxn.Wrap(err).GenWithStackByArgs()
return errs.ErrEtcdTxn.Wrap(err).GenWithStackByArgs()
}
if !resp.Succeeded {
return 0, errs.ErrEtcdTxn.FastGenByArgs()
return errs.ErrEtcdTxn.FastGenByArgs()
}

log.Info("idAllocator allocates a new id", zap.Uint64("alloc-id", end))
idGauge.WithLabelValues("idalloc").Set(float64(end))
return end, nil
alloc.end = end
alloc.base = end - allocStep
return nil
}

func (alloc *AllocatorImpl) getAllocIDPath() string {
Expand Down
4 changes: 4 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,10 @@ func (s *Server) campaignLeader() {
log.Error("failed to load persistOptions from etcd", errs.ZapError(err))
return
}
if err := s.idAllocator.Generate(); err != nil {
log.Error("failed to sync id from etcd", errs.ZapError(err))
return
}
s.member.EnableLeader()

CheckPDVersion(s.persistOptions)
Expand Down
81 changes: 77 additions & 4 deletions tests/server/id/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ func (s *testAllocIDSuite) SetUpSuite(c *C) {
func (s *testAllocIDSuite) TearDownSuite(c *C) {
s.cancel()
}

func (s *testAllocIDSuite) TestID(c *C) {
var err error
cluster, err := tests.NewTestCluster(s.ctx, 1)
defer cluster.Destroy()
c.Assert(err, IsNil)
defer cluster.Destroy()

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
Expand Down Expand Up @@ -96,10 +96,9 @@ func (s *testAllocIDSuite) TestID(c *C) {
}

func (s *testAllocIDSuite) TestCommand(c *C) {
var err error
cluster, err := tests.NewTestCluster(s.ctx, 1)
defer cluster.Destroy()
c.Assert(err, IsNil)
defer cluster.Destroy()

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
Expand All @@ -119,3 +118,77 @@ func (s *testAllocIDSuite) TestCommand(c *C) {
last = resp.GetId()
}
}

func (s *testAllocIDSuite) TestMonotonicID(c *C) {
cluster, err := tests.NewTestCluster(s.ctx, 2)
c.Assert(err, IsNil)
defer cluster.Destroy()

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()

leaderServer := cluster.GetServer(cluster.GetLeader())
var last1 uint64
for i := uint64(0); i < 10; i++ {
id, err := leaderServer.GetAllocator().Alloc()
c.Assert(err, IsNil)
c.Assert(id, Greater, last1)
last1 = id
}
err = cluster.ResignLeader()
c.Assert(err, IsNil)
cluster.WaitLeader()
leaderServer = cluster.GetServer(cluster.GetLeader())
var last2 uint64
for i := uint64(0); i < 10; i++ {
id, err := leaderServer.GetAllocator().Alloc()
c.Assert(err, IsNil)
c.Assert(id, Greater, last2)
last2 = id
}
err = cluster.ResignLeader()
c.Assert(err, IsNil)
cluster.WaitLeader()
leaderServer = cluster.GetServer(cluster.GetLeader())
id, err := leaderServer.GetAllocator().Alloc()
c.Assert(err, IsNil)
c.Assert(id, Greater, last2)
var last3 uint64
for i := uint64(0); i < 1000; i++ {
id, err := leaderServer.GetAllocator().Alloc()
c.Assert(err, IsNil)
c.Assert(id, Greater, last3)
last3 = id
}
}

func (s *testAllocIDSuite) TestPDRestart(c *C) {
cluster, err := tests.NewTestCluster(s.ctx, 1)
c.Assert(err, IsNil)
defer cluster.Destroy()

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()
leaderServer := cluster.GetServer(cluster.GetLeader())

var last uint64
for i := uint64(0); i < 10; i++ {
id, err := leaderServer.GetAllocator().Alloc()
c.Assert(err, IsNil)
c.Assert(id, Greater, last)
last = id
}

c.Assert(leaderServer.Stop(), IsNil)
c.Assert(leaderServer.Run(), IsNil)
cluster.WaitLeader()

for i := uint64(0); i < 10; i++ {
id, err := leaderServer.GetAllocator().Alloc()
c.Assert(err, IsNil)
c.Assert(id, Greater, last)
last = id
}
}

0 comments on commit f4e7924

Please sign in to comment.