Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: use leader lease to determine tso service validity #1676 #2117

Merged
merged 3 commits into from
Feb 14, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ func (s *Server) Tso(stream pdpb.PD_TsoServer) error {
return errors.WithStack(err)
}
start := time.Now()
if err = s.validateRequest(request.GetHeader()); err != nil {
return err
// TSO uses leader lease to determine validity. No need to check leader here.
if request.GetHeader().GetClusterId() != s.clusterID {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId())
}

count := request.GetCount()
ts, err := s.getRespTS(count)
if err != nil {
Expand Down
75 changes: 39 additions & 36 deletions server/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ import (
"go.uber.org/zap"
)

// The timeout to wait transfer etcd leader to complete.
const moveLeaderTimeout = 5 * time.Second
const (
// The timeout to wait transfer etcd leader to complete.
moveLeaderTimeout = 5 * time.Second
leaderTickInterval = 50 * time.Millisecond
)

// IsLeader returns whether the server is leader or not.
func (s *Server) IsLeader() bool {
Expand Down Expand Up @@ -214,30 +217,22 @@ func (s *Server) memberInfo() (member *pdpb.Member, marshalStr string) {
func (s *Server) campaignLeader() error {
log.Info("start to campaign leader", zap.String("campaign-leader-name", s.Name()))

lessor := clientv3.NewLease(s.client)
lease := NewLeaderLease(s.client)
defer func() {
lessor.Close()
defer lease.Close()
log.Info("exit campaign leader")
}()

start := time.Now()
ctx, cancel := context.WithTimeout(s.client.Ctx(), requestTimeout)
leaseResp, err := lessor.Grant(ctx, s.cfg.LeaderLease)
cancel()

if cost := time.Since(start); cost > slowRequestTime {
log.Warn("lessor grants too slow", zap.Duration("cost", cost))
}

err := lease.Grant(s.cfg.LeaderLease)
if err != nil {
return errors.WithStack(err)
return err
}

leaderKey := s.getLeaderPath()
// The leader key must not exist, so the CreateRevision is 0.
resp, err := s.txn().
If(clientv3.Compare(clientv3.CreateRevision(leaderKey), "=", 0)).
Then(clientv3.OpPut(leaderKey, s.memberValue, clientv3.WithLease(leaseResp.ID))).
Then(clientv3.OpPut(leaderKey, s.memberValue, clientv3.WithLease(lease.ID))).
Commit()
if err != nil {
return errors.WithStack(err)
Expand All @@ -246,35 +241,40 @@ func (s *Server) campaignLeader() error {
return errors.New("failed to campaign leader, other server may campaign ok")
}

// Start keepalive and enable TSO service.
// TSO service is strictly enabled/disabled by leader lease for 2 reasons:
// 1. lease based approach is not affected by thread pause, slow runtime schedule, etc.
// 2. load region could be slow. Based on lease we can recover TSO service faster.
// Make the leader keepalived.
ctx, cancel = context.WithCancel(s.serverLoopCtx)
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()

ch, err := lessor.KeepAlive(ctx, leaseResp.ID)
if err != nil {
return errors.WithStack(err)
}
go lease.KeepAlive(ctx)
log.Info("campaign leader ok", zap.String("campaign-leader-name", s.Name()))

// sync timestamp.
log.Debug("sync timestamp for tso")
if err = s.syncTimestamp(lease); err != nil {
return err
}

defer s.ts.Store(&atomicObject{
physical: zeroTime,
})

// reload config.
err = s.reloadConfigFromKV()
if err != nil {
return err
}

// Try to create raft cluster.
err = s.createRaftCluster()
if err != nil {
return err
}
defer s.stopRaftCluster()

log.Debug("sync timestamp for tso")
if err = s.syncTimestamp(); err != nil {
return err
}
defer s.ts.Store(&atomicObject{
physical: zeroTime,
})

s.enableLeader()
defer s.disableLeader()

Expand All @@ -284,23 +284,26 @@ func (s *Server) campaignLeader() error {
tsTicker := time.NewTicker(updateTimestampStep)
defer tsTicker.Stop()

leaderTicker := time.NewTicker(leaderTickInterval)
defer leaderTicker.Stop()
for {
select {
case _, ok := <-ch:
if !ok {
log.Info("keep alive channel is closed")
case <-leaderTicker.C:
if lease.IsExpired() {
log.Info("lease expired, leader step down")
return nil
}
case <-tsTicker.C:
if err = s.updateTimestamp(); err != nil {
log.Info("failed to update timestamp")
return err
}
etcdLeader := s.GetEtcdLeader()
if etcdLeader != s.ID() {
log.Info("etcd leader changed, resigns leadership", zap.String("old-leader-name", s.Name()))
return nil
}
case <-tsTicker.C:
if err = s.updateTimestamp(); err != nil {
log.Info("failed to update timestamp")
return err
}

case <-ctx.Done():
// Server is closed and it should return nil.
log.Info("server is closed")
Expand Down
140 changes: 140 additions & 0 deletions server/lease.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"context"
"sync/atomic"
"time"

"github.com/pingcap/log"
"github.com/pkg/errors"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

// LeaderLease is used for renewing leadership of PD server.
type LeaderLease struct {
client *clientv3.Client
lease clientv3.Lease
ID clientv3.LeaseID
leaseTimeout time.Duration

expireTime atomic.Value
}

// NewLeaderLease creates a lease.
func NewLeaderLease(client *clientv3.Client) *LeaderLease {
return &LeaderLease{
client: client,
lease: clientv3.NewLease(client),
}
}

// Grant uses `lease.Grant` to initialize the lease and expireTime.
func (l *LeaderLease) Grant(leaseTimeout int64) error {
start := time.Now()
ctx, cancel := context.WithTimeout(l.client.Ctx(), requestTimeout)
leaseResp, err := l.lease.Grant(ctx, leaseTimeout)
cancel()
if err != nil {
return errors.WithStack(err)
}
if cost := time.Since(start); cost > slowRequestTime {
log.Warn("lease grants too slow", zap.Duration("cost", cost))
}
l.ID = leaseResp.ID
l.leaseTimeout = time.Duration(leaseTimeout) * time.Second
l.expireTime.Store(start.Add(time.Duration(leaseResp.TTL) * time.Second))
return nil
}

const revokeLeaseTimeout = time.Second

// Close releases the lease.
func (l *LeaderLease) Close() error {
// Reset expire time.
l.expireTime.Store(time.Time{})
// Try to revoke lease to make subsequent elections faster.
ctx, cancel := context.WithTimeout(l.client.Ctx(), revokeLeaseTimeout)
defer cancel()
l.lease.Revoke(ctx, l.ID)
return l.lease.Close()
}

// IsExpired checks if the lease is expired. If it returns true, current PD
// server should step down and try to re-elect again.
func (l *LeaderLease) IsExpired() bool {
return time.Now().After(l.expireTime.Load().(time.Time))
}

// KeepAlive auto renews the lease and update expireTime.
func (l *LeaderLease) KeepAlive(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
timeCh := l.keepAliveWorker(ctx, l.leaseTimeout/3)

var maxExpire time.Time
for {
select {
case t := <-timeCh:
if t.After(maxExpire) {
maxExpire = t
l.expireTime.Store(t)
}
case <-time.After(l.leaseTimeout):
return
case <-ctx.Done():
return
}
}
}

// Periodically call `lease.KeepAliveOnce` and post back latest received expire time into the channel.
func (l *LeaderLease) keepAliveWorker(ctx context.Context, interval time.Duration) <-chan time.Time {
ch := make(chan time.Time)

go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
go func() {
start := time.Now()
ctx1, cancel := context.WithTimeout(ctx, time.Duration(l.leaseTimeout))
defer cancel()
res, err := l.lease.KeepAliveOnce(ctx1, l.ID)
if err != nil {
log.Warn("leader lease keep alive failed", zap.Error(err))
return
}
if res.TTL > 0 {
expire := start.Add(time.Duration(res.TTL) * time.Second)
select {
case ch <- expire:
case <-ctx1.Done():
}
}
}()

select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
}()

return ch
}
4 changes: 3 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ type Server struct {
// for raft cluster
cluster *RaftCluster
// For tso, set after pd becomes leader.
ts atomic.Value
ts atomic.Value
// lease is leadership of PD server.
lease *LeaderLease
lastSavedTime time.Time
// For async region heartbeat.
hbStreams *heartbeatStreams
Expand Down
6 changes: 5 additions & 1 deletion server/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *Server) saveTimestamp(ts time.Time) error {
return nil
}

func (s *Server) syncTimestamp() error {
func (s *Server) syncTimestamp(lease *LeaderLease) error {
tsoCounter.WithLabelValues("sync").Inc()

last, err := s.loadTimestamp()
Expand Down Expand Up @@ -107,6 +107,7 @@ func (s *Server) syncTimestamp() error {
current := &atomicObject{
physical: next,
}
s.lease = lease
s.ts.Store(current)

return nil
Expand Down Expand Up @@ -205,6 +206,9 @@ func (s *Server) getRespTS(count uint32) (pdpb.Timestamp, error) {
time.Sleep(updateTimestampStep)
continue
}
if s.lease == nil || s.lease.IsExpired() {
return pdpb.Timestamp{}, errors.New("alloc timestamp failed, lease expired")
}
return resp, nil
}
return resp, errors.New("can not get timestamp")
Expand Down