diff --git a/server/id/id.go b/server/id/id.go new file mode 100644 index 000000000000..6360a8a0ff6f --- /dev/null +++ b/server/id/id.go @@ -0,0 +1,118 @@ +// Copyright 2016 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package id + +import ( + "path" + "sync" + + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/etcdutil" + "github.com/tikv/pd/pkg/typeutil" + "github.com/tikv/pd/server/kv" + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" +) + +// Allocator is the allocator to generate unique ID. +type Allocator interface { + Alloc() (uint64, error) +} + +const allocStep = uint64(1000) + +// AllocatorImpl is used to allocate ID. +type AllocatorImpl struct { + mu sync.Mutex + base uint64 + end uint64 + + client *clientv3.Client + rootPath string + member string +} + +// NewAllocatorImpl creates a new IDAllocator. +func NewAllocatorImpl(client *clientv3.Client, rootPath string, member string) *AllocatorImpl { + return &AllocatorImpl{client: client, rootPath: rootPath, member: member} +} + +// Alloc returns a new id. +func (alloc *AllocatorImpl) Alloc() (uint64, error) { + alloc.mu.Lock() + defer alloc.mu.Unlock() + + if alloc.base == alloc.end { + err := alloc.Generate() + if err != nil { + return 0, err + } + + alloc.base = alloc.end - allocStep + } + + alloc.base++ + + return alloc.base, nil +} + +// Generate synchronizes and generates id range. +func (alloc *AllocatorImpl) Generate() error { + key := alloc.getAllocIDPath() + value, err := etcdutil.GetValue(alloc.client, key) + if err != nil { + return err + } + + var ( + cmp clientv3.Cmp + end uint64 + ) + + if value == nil { + // create the key + cmp = clientv3.Compare(clientv3.CreateRevision(key), "=", 0) + } else { + // update the key + end, err = typeutil.BytesToUint64(value) + if err != nil { + return err + } + + cmp = clientv3.Compare(clientv3.Value(key), "=", string(value)) + } + + end += allocStep + value = typeutil.Uint64ToBytes(end) + txn := kv.NewSlowLogTxn(alloc.client) + leaderPath := path.Join(alloc.rootPath, "leader") + t := txn.If(append([]clientv3.Cmp{cmp}, clientv3.Compare(clientv3.Value(leaderPath), "=", alloc.member))...) + resp, err := t.Then(clientv3.OpPut(key, string(value))).Commit() + if err != nil { + return errs.ErrEtcdTxn.Wrap(err).GenWithStackByArgs() + } + if !resp.Succeeded { + return errs.ErrEtcdTxn.FastGenByArgs() + } + + log.Info("idAllocator allocates a new id", zap.Uint64("alloc-id", end)) + idGauge.WithLabelValues("idalloc").Set(float64(end)) + alloc.end = end + return nil +} + +func (alloc *AllocatorImpl) getAllocIDPath() string { + return path.Join(alloc.rootPath, "alloc_id") +} diff --git a/server/id_test.go b/server/id_test.go index 4590da04cea1..12414f708656 100644 --- a/server/id_test.go +++ b/server/id_test.go @@ -95,3 +95,48 @@ func (s *testAllocIDSuite) TestCommand(c *C) { last = resp.GetId() } } + +func (s *testAllocIDSuite) TestMonotonicID(c *C) { + var err error + cluster, err := tests.NewTestCluster(s.ctx, 2) + defer cluster.Destroy() + c.Assert(err, IsNil) + + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + cluster.WaitLeader() + + leaderServer := cluster.GetServer(cluster.GetLeader()) + var last1 uint64 + for i := uint64(0); i < 10; i++ { + id, err := leaderServer.GetAllocator().Alloc() + c.Assert(err, IsNil) + c.Assert(id, Greater, last1) + last1 = id + } + err = cluster.ResignLeader() + c.Assert(err, IsNil) + cluster.WaitLeader() + leaderServer = cluster.GetServer(cluster.GetLeader()) + var last2 uint64 + for i := uint64(0); i < 10; i++ { + id, err := leaderServer.GetAllocator().Alloc() + c.Assert(err, IsNil) + c.Assert(id, Greater, last2) + last2 = id + } + err = cluster.ResignLeader() + c.Assert(err, IsNil) + cluster.WaitLeader() + leaderServer = cluster.GetServer(cluster.GetLeader()) + id, err := leaderServer.GetAllocator().Alloc() + c.Assert(err, IsNil) + c.Assert(id, Greater, last2) + var last3 uint64 + for i := uint64(0); i < 1000; i++ { + id, err := leaderServer.GetAllocator().Alloc() + c.Assert(err, IsNil) + c.Assert(id, Greater, last3) + last3 = id + } +} diff --git a/server/server.go b/server/server.go index 4219627c73dd..bcb586f8e1fa 100644 --- a/server/server.go +++ b/server/server.go @@ -680,10 +680,158 @@ func (s *Server) SetMemberLeaderPriority(id uint64, priority int) error { return nil } +<<<<<<< HEAD // DeleteMemberLeaderPriority removes a member's priority config. func (s *Server) DeleteMemberLeaderPriority(id uint64) error { key := s.getMemberLeaderPriorityPath(id) res, err := s.leaderTxn().Then(clientv3.OpDelete(key)).Commit() +======= +func (s *Server) leaderLoop() { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + for { + if s.IsClosed() { + log.Info("server is closed, return pd leader loop") + return + } + + leader, rev, checkAgain := s.member.CheckLeader() + if checkAgain { + continue + } + if leader != nil { + err := s.reloadConfigFromKV() + if err != nil { + log.Error("reload config failed", errs.ZapError(err)) + continue + } + // Check the cluster dc-location after the PD leader is elected + go s.tsoAllocatorManager.ClusterDCLocationChecker() + syncer := s.cluster.GetRegionSyncer() + if s.persistOptions.IsUseRegionStorage() { + syncer.StartSyncWithLeader(leader.GetClientUrls()[0]) + } + log.Info("start to watch pd leader", zap.Stringer("pd-leader", leader)) + // WatchLeader will keep looping and never return unless the PD leader has changed. + s.member.WatchLeader(s.serverLoopCtx, leader, rev) + syncer.StopSyncWithLeader() + log.Info("pd leader has changed, try to re-campaign a pd leader") + } + + // To make sure the etcd leader and PD leader are on the same server. + etcdLeader := s.member.GetEtcdLeader() + if etcdLeader != s.member.ID() { + log.Info("skip campaigning of pd leader and check later", + zap.String("server-name", s.Name()), + zap.Uint64("etcd-leader-id", etcdLeader), + zap.Uint64("member-id", s.member.ID())) + time.Sleep(200 * time.Millisecond) + continue + } + s.campaignLeader() + } +} + +func (s *Server) campaignLeader() { + log.Info("start to campaign pd leader", zap.String("campaign-pd-leader-name", s.Name())) + if err := s.member.CampaignLeader(s.cfg.LeaderLease); err != nil { + log.Error("campaign pd leader meet error", errs.ZapError(err)) + return + } + + // Start keepalive the leadership and enable TSO service. + // TSO service is strictly enabled/disabled by PD leader lease for 2 reasons: + // 1. lease based approach is not affected by thread pause, slow runtime schedule, etc. + // 2. load region could be slow. Based on lease we can recover TSO service faster. + + ctx, cancel := context.WithCancel(s.serverLoopCtx) + defer cancel() + defer s.member.ResetLeader() + // maintain the PD leader + go s.member.KeepLeader(ctx) + log.Info("campaign pd leader ok", zap.String("campaign-pd-leader-name", s.Name())) + + log.Info("setting up the global TSO allocator") + if err := s.tsoAllocatorManager.SetUpAllocator(ctx, config.GlobalDCLocation, s.member.GetLeadership()); err != nil { + log.Error("failed to set up the global TSO allocator", errs.ZapError(err)) + return + } + // Check the cluster dc-location after the PD leader is elected + go s.tsoAllocatorManager.ClusterDCLocationChecker() + + if err := s.reloadConfigFromKV(); err != nil { + log.Error("failed to reload configuration", errs.ZapError(err)) + return + } + + if err := s.encryptionKeyManager.SetLeadership(s.member.GetLeadership()); err != nil { + log.Error("failed to initialize encryption", errs.ZapError(err)) + return + } + + // Try to create raft cluster. + if err := s.createRaftCluster(); err != nil { + log.Error("failed to create raft cluster", errs.ZapError(err)) + return + } + defer s.stopRaftCluster() + if err := s.persistOptions.LoadTTLFromEtcd(s.ctx, s.client); err != nil { + log.Error("failed to load persistOptions from etcd", errs.ZapError(err)) + return + } + if err := s.idAllocator.Generate(); err != nil { + log.Error("failed to sync id from etcd", errs.ZapError(err)) + return + } + s.member.EnableLeader() + + CheckPDVersion(s.persistOptions) + log.Info("PD cluster leader is ready to serve", zap.String("pd-leader-name", s.Name())) + + leaderTicker := time.NewTicker(leaderTickInterval) + defer leaderTicker.Stop() + + for { + select { + case <-leaderTicker.C: + if !s.member.IsLeader() { + log.Info("no longer a leader because lease has expired, pd leader will step down") + return + } + etcdLeader := s.member.GetEtcdLeader() + if etcdLeader != s.member.ID() { + log.Info("etcd leader changed, resigns pd leadership", zap.String("old-pd-leader-name", s.Name())) + return + } + case <-ctx.Done(): + // Server is closed and it should return nil. + log.Info("server is closed") + return + } + } +} + +func (s *Server) etcdLeaderLoop() { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + ctx, cancel := context.WithCancel(s.serverLoopCtx) + defer cancel() + for { + select { + case <-time.After(s.cfg.LeaderPriorityCheckInterval.Duration): + s.member.CheckPriority(ctx) + case <-ctx.Done(): + log.Info("server is closed, exit etcd leader loop") + return + } + } +} + +func (s *Server) reloadConfigFromKV() error { + err := s.persistOptions.Reload(s.storage) +>>>>>>> 65e08e2b... fix the id allocator is not monotonic (#3305) if err != nil { return errors.WithStack(err) }