diff --git a/etcdserver/server.go b/etcdserver/server.go index a341625dccb..0733fd28547 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -1059,37 +1059,7 @@ func (s *EtcdServer) run() { f := func(context.Context) { s.applyAll(&ep, &ap) } sched.Schedule(f) case leases := <-expiredLeaseC: - s.goAttach(func() { - // Increases throughput of expired leases deletion process through parallelization - c := make(chan struct{}, maxPendingRevokes) - for _, lease := range leases { - select { - case c <- struct{}{}: - case <-s.stopping: - return - } - lid := lease.ID - s.goAttach(func() { - ctx := s.authStore.WithRoot(s.ctx) - _, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)}) - if lerr == nil { - leaseExpired.Inc() - } else { - if lg != nil { - lg.Warn( - "failed to revoke lease", - zap.String("lease-id", fmt.Sprintf("%016x", lid)), - zap.Error(lerr), - ) - } else { - plog.Warningf("failed to revoke %016x (%q)", lid, lerr.Error()) - } - } - - <-c - }) - } - }) + s.revokeExpiredLeases(leases) case err := <-s.errorc: if lg != nil { lg.Warn("server error", zap.Error(err)) @@ -1109,6 +1079,45 @@ func (s *EtcdServer) run() { } } +func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) { + s.goAttach(func() { + lg := s.Logger() + // Increases throughput of expired leases deletion process through parallelization + c := make(chan struct{}, maxPendingRevokes) + for _, curLease := range leases { + select { + case c <- struct{}{}: + case <-s.stopping: + return + } + + f := func(lid int64) { + s.goAttach(func() { + ctx := s.authStore.WithRoot(s.ctx) + _, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lid}) + if lerr == nil { + leaseExpired.Inc() + } else { + if lg != nil { + lg.Warn( + "failed to revoke lease", + zap.String("lease-id", fmt.Sprintf("%016x", lid)), + zap.Error(lerr), + ) + } else { + plog.Warningf("failed to revoke %016x (%q)", lid, lerr.Error()) + } + } + + <-c + }) + } + + f(int64(curLease.ID)) + } + }) +} + func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { s.applySnapshot(ep, apply) s.applyEntries(ep, apply) diff --git a/lease/lessor.go b/lease/lessor.go index b16099fbf1f..9612ed88fc9 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -626,10 +626,9 @@ func (le *lessor) revokeExpiredLeases() { // checkpointScheduledLeases finds all scheduled lease checkpoints that are due and // submits them to the checkpointer to persist them to the consensus log. func (le *lessor) checkpointScheduledLeases() { - var cps []*pb.LeaseCheckpoint - // rate limit for i := 0; i < leaseCheckpointRate/2; i++ { + var cps []*pb.LeaseCheckpoint le.mu.Lock() if le.isPrimary() { cps = le.findDueScheduledCheckpoints(maxLeaseCheckpointBatchSize)