diff --git a/server/grpc_service.go b/server/grpc_service.go index f3a8bade797..9aa54abbdf4 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -75,9 +75,14 @@ func (s *Server) Tso(stream pdpb.PD_TsoServer) error { return errors.WithStack(err) } start := time.Now() - if err = s.validateRequest(request.GetHeader()); err != nil { - return err + // TSO uses leader lease to determine validity. No need to check leader here. + if s.isClosed() { + return status.Errorf(codes.Unknown, "server not started") } + if request.GetHeader().GetClusterId() != s.clusterID { + return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId()) + } + count := request.GetCount() ts, err := s.getRespTS(count) if err != nil { diff --git a/server/leader.go b/server/leader.go index e5e39d36554..89d4d78ef9e 100644 --- a/server/leader.go +++ b/server/leader.go @@ -31,8 +31,11 @@ import ( "go.uber.org/zap" ) -// The timeout to wait transfer etcd leader to complete. -const moveLeaderTimeout = 5 * time.Second +const ( + // The timeout to wait transfer etcd leader to complete. + moveLeaderTimeout = 5 * time.Second + leaderTickInterval = 50 * time.Millisecond +) // IsLeader returns whether the server is leader or not. func (s *Server) IsLeader() bool { @@ -214,30 +217,22 @@ func (s *Server) memberInfo() (member *pdpb.Member, marshalStr string) { func (s *Server) campaignLeader() error { log.Info("start to campaign leader", zap.String("campaign-leader-name", s.Name())) - lessor := clientv3.NewLease(s.client) + lease := NewLeaderLease(s.client) defer func() { - lessor.Close() + defer lease.Close() log.Info("exit campaign leader") }() - start := time.Now() - ctx, cancel := context.WithTimeout(s.client.Ctx(), requestTimeout) - leaseResp, err := lessor.Grant(ctx, s.cfg.LeaderLease) - cancel() - - if cost := time.Since(start); cost > slowRequestTime { - log.Warn("lessor grants too slow", zap.Duration("cost", cost)) - } - + err := lease.Grant(s.cfg.LeaderLease) if err != nil { - return errors.WithStack(err) + return err } leaderKey := s.getLeaderPath() // The leader key must not exist, so the CreateRevision is 0. resp, err := s.txn(). If(clientv3.Compare(clientv3.CreateRevision(leaderKey), "=", 0)). - Then(clientv3.OpPut(leaderKey, s.memberValue, clientv3.WithLease(leaseResp.ID))). + Then(clientv3.OpPut(leaderKey, s.memberValue, clientv3.WithLease(lease.ID))). Commit() if err != nil { return errors.WithStack(err) @@ -246,20 +241,33 @@ func (s *Server) campaignLeader() error { return errors.New("failed to campaign leader, other server may campaign ok") } + // Start keepalive and enable TSO service. + // TSO service is strictly enabled/disabled by leader lease for 2 reasons: + // 1. lease based approach is not affected by thread pause, slow runtime schedule, etc. + // 2. load region could be slow. Based on lease we can recover TSO service faster. // Make the leader keepalived. - ctx, cancel = context.WithCancel(s.serverLoopCtx) + ctx, cancel := context.WithCancel(s.serverLoopCtx) defer cancel() - ch, err := lessor.KeepAlive(ctx, leaseResp.ID) - if err != nil { - return errors.WithStack(err) - } + go lease.KeepAlive(ctx) log.Info("campaign leader ok", zap.String("campaign-leader-name", s.Name())) + // sync timestamp. + log.Debug("sync timestamp for tso") + if err = s.syncTimestamp(lease); err != nil { + return err + } + + defer s.ts.Store(&atomicObject{ + physical: zeroTime, + }) + + // reload config. err = s.reloadConfigFromKV() if err != nil { return err } + // Try to create raft cluster. err = s.createRaftCluster() if err != nil { @@ -267,14 +275,6 @@ func (s *Server) campaignLeader() error { } defer s.stopRaftCluster() - log.Debug("sync timestamp for tso") - if err = s.syncTimestamp(); err != nil { - return err - } - defer s.ts.Store(&atomicObject{ - physical: zeroTime, - }) - s.enableLeader() defer s.disableLeader() @@ -284,23 +284,26 @@ func (s *Server) campaignLeader() error { tsTicker := time.NewTicker(updateTimestampStep) defer tsTicker.Stop() + leaderTicker := time.NewTicker(leaderTickInterval) + defer leaderTicker.Stop() for { select { - case _, ok := <-ch: - if !ok { - log.Info("keep alive channel is closed") + case <-leaderTicker.C: + if lease.IsExpired() { + log.Info("lease expired, leader step down") return nil } - case <-tsTicker.C: - if err = s.updateTimestamp(); err != nil { - log.Info("failed to update timestamp") - return err - } etcdLeader := s.GetEtcdLeader() if etcdLeader != s.ID() { log.Info("etcd leader changed, resigns leadership", zap.String("old-leader-name", s.Name())) return nil } + case <-tsTicker.C: + if err = s.updateTimestamp(); err != nil { + log.Info("failed to update timestamp") + return err + } + case <-ctx.Done(): // Server is closed and it should return nil. log.Info("server is closed") diff --git a/server/lease.go b/server/lease.go new file mode 100644 index 00000000000..8281919c7f5 --- /dev/null +++ b/server/lease.go @@ -0,0 +1,140 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "context" + "sync/atomic" + "time" + + "github.com/pingcap/log" + "github.com/pkg/errors" + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" +) + +// LeaderLease is used for renewing leadership of PD server. +type LeaderLease struct { + client *clientv3.Client + lease clientv3.Lease + ID clientv3.LeaseID + leaseTimeout time.Duration + + expireTime atomic.Value +} + +// NewLeaderLease creates a lease. +func NewLeaderLease(client *clientv3.Client) *LeaderLease { + return &LeaderLease{ + client: client, + lease: clientv3.NewLease(client), + } +} + +// Grant uses `lease.Grant` to initialize the lease and expireTime. +func (l *LeaderLease) Grant(leaseTimeout int64) error { + start := time.Now() + ctx, cancel := context.WithTimeout(l.client.Ctx(), requestTimeout) + leaseResp, err := l.lease.Grant(ctx, leaseTimeout) + cancel() + if err != nil { + return errors.WithStack(err) + } + if cost := time.Since(start); cost > slowRequestTime { + log.Warn("lease grants too slow", zap.Duration("cost", cost)) + } + l.ID = leaseResp.ID + l.leaseTimeout = time.Duration(leaseTimeout) * time.Second + l.expireTime.Store(start.Add(time.Duration(leaseResp.TTL) * time.Second)) + return nil +} + +const revokeLeaseTimeout = time.Second + +// Close releases the lease. +func (l *LeaderLease) Close() error { + // Reset expire time. + l.expireTime.Store(time.Time{}) + // Try to revoke lease to make subsequent elections faster. + ctx, cancel := context.WithTimeout(l.client.Ctx(), revokeLeaseTimeout) + defer cancel() + l.lease.Revoke(ctx, l.ID) + return l.lease.Close() +} + +// IsExpired checks if the lease is expired. If it returns true, current PD +// server should step down and try to re-elect again. +func (l *LeaderLease) IsExpired() bool { + return time.Now().After(l.expireTime.Load().(time.Time)) +} + +// KeepAlive auto renews the lease and update expireTime. +func (l *LeaderLease) KeepAlive(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + timeCh := l.keepAliveWorker(ctx, l.leaseTimeout/3) + + var maxExpire time.Time + for { + select { + case t := <-timeCh: + if t.After(maxExpire) { + maxExpire = t + l.expireTime.Store(t) + } + case <-time.After(l.leaseTimeout): + return + case <-ctx.Done(): + return + } + } +} + +// Periodically call `lease.KeepAliveOnce` and post back latest received expire time into the channel. +func (l *LeaderLease) keepAliveWorker(ctx context.Context, interval time.Duration) <-chan time.Time { + ch := make(chan time.Time) + + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + go func() { + start := time.Now() + ctx1, cancel := context.WithTimeout(ctx, time.Duration(l.leaseTimeout)) + defer cancel() + res, err := l.lease.KeepAliveOnce(ctx1, l.ID) + if err != nil { + log.Warn("leader lease keep alive failed", zap.Error(err)) + return + } + if res.TTL > 0 { + expire := start.Add(time.Duration(res.TTL) * time.Second) + select { + case ch <- expire: + case <-ctx1.Done(): + } + } + }() + + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + } + }() + + return ch +} diff --git a/server/server.go b/server/server.go index 226d20a222f..b688e90bae2 100644 --- a/server/server.go +++ b/server/server.go @@ -96,7 +96,9 @@ type Server struct { // for raft cluster cluster *RaftCluster // For tso, set after pd becomes leader. - ts atomic.Value + ts atomic.Value + // lease is leadership of PD server. + lease *LeaderLease lastSavedTime time.Time // For async region heartbeat. hbStreams *heartbeatStreams diff --git a/server/tso.go b/server/tso.go index 17d2ebaad18..a8ed19e10ec 100644 --- a/server/tso.go +++ b/server/tso.go @@ -76,7 +76,7 @@ func (s *Server) saveTimestamp(ts time.Time) error { return nil } -func (s *Server) syncTimestamp() error { +func (s *Server) syncTimestamp(lease *LeaderLease) error { tsoCounter.WithLabelValues("sync").Inc() last, err := s.loadTimestamp() @@ -107,6 +107,7 @@ func (s *Server) syncTimestamp() error { current := &atomicObject{ physical: next, } + s.lease = lease s.ts.Store(current) return nil @@ -205,6 +206,9 @@ func (s *Server) getRespTS(count uint32) (pdpb.Timestamp, error) { time.Sleep(updateTimestampStep) continue } + if s.lease == nil || s.lease.IsExpired() { + return pdpb.Timestamp{}, errors.New("alloc timestamp failed, lease expired") + } return resp, nil } return resp, errors.New("can not get timestamp") diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index bdba10ce1bc..3f047dfa512 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -105,7 +105,7 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) { regionLen = len(regions) // ensure flush to region kv - time.Sleep(3 * time.Second) + time.Sleep(4 * time.Second) err = leaderServer.Stop() c.Assert(err, IsNil) cluster.WaitLeader()