Skip to content

Commit

Permalink
Merge pull request #14150 from ahrtr/lease_revoke_race_3.4
Browse files Browse the repository at this point in the history
[3.4] Backport two lease related bug fixes to 3.4
  • Loading branch information
serathius authored Jun 24, 2022
2 parents 953376e + f036529 commit 17fc680
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 33 deletions.
71 changes: 40 additions & 31 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,37 +1059,7 @@ func (s *EtcdServer) run() {
f := func(context.Context) { s.applyAll(&ep, &ap) }
sched.Schedule(f)
case leases := <-expiredLeaseC:
s.goAttach(func() {
// Increases throughput of expired leases deletion process through parallelization
c := make(chan struct{}, maxPendingRevokes)
for _, lease := range leases {
select {
case c <- struct{}{}:
case <-s.stopping:
return
}
lid := lease.ID
s.goAttach(func() {
ctx := s.authStore.WithRoot(s.ctx)
_, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})
if lerr == nil {
leaseExpired.Inc()
} else {
if lg != nil {
lg.Warn(
"failed to revoke lease",
zap.String("lease-id", fmt.Sprintf("%016x", lid)),
zap.Error(lerr),
)
} else {
plog.Warningf("failed to revoke %016x (%q)", lid, lerr.Error())
}
}

<-c
})
}
})
s.revokeExpiredLeases(leases)
case err := <-s.errorc:
if lg != nil {
lg.Warn("server error", zap.Error(err))
Expand All @@ -1109,6 +1079,45 @@ func (s *EtcdServer) run() {
}
}

func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) {
s.goAttach(func() {
lg := s.Logger()
// Increases throughput of expired leases deletion process through parallelization
c := make(chan struct{}, maxPendingRevokes)
for _, curLease := range leases {
select {
case c <- struct{}{}:
case <-s.stopping:
return
}

f := func(lid int64) {
s.goAttach(func() {
ctx := s.authStore.WithRoot(s.ctx)
_, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lid})
if lerr == nil {
leaseExpired.Inc()
} else {
if lg != nil {
lg.Warn(
"failed to revoke lease",
zap.String("lease-id", fmt.Sprintf("%016x", lid)),
zap.Error(lerr),
)
} else {
plog.Warningf("failed to revoke %016x (%q)", lid, lerr.Error())
}
}

<-c
})
}

f(int64(curLease.ID))
}
})
}

func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
s.applySnapshot(ep, apply)
s.applyEntries(ep, apply)
Expand Down
3 changes: 1 addition & 2 deletions lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,10 +626,9 @@ func (le *lessor) revokeExpiredLeases() {
// checkpointScheduledLeases finds all scheduled lease checkpoints that are due and
// submits them to the checkpointer to persist them to the consensus log.
func (le *lessor) checkpointScheduledLeases() {
var cps []*pb.LeaseCheckpoint

// rate limit
for i := 0; i < leaseCheckpointRate/2; i++ {
var cps []*pb.LeaseCheckpoint
le.mu.Lock()
if le.isPrimary() {
cps = le.findDueScheduledCheckpoints(maxLeaseCheckpointBatchSize)
Expand Down

0 comments on commit 17fc680

Please sign in to comment.