Skip to content

Commit

Permalink
Backport two lease related bug fixes to 3.4
Browse files Browse the repository at this point in the history
The first bug fix is to resolve the race condition between goroutine
and channel on the same leases to be revoked. It's a classic mistake
in using Golang channel + goroutine. Please refer to
https://go.dev/doc/effective_go#channels

The second bug fix is to resolve the issue that etcd lessor may
continue to schedule checkpoint after stepping down the leader role.

Signed-off-by: Benjamin Wang <wachao@vmware.com>
  • Loading branch information
ahrtr committed Jun 24, 2022
1 parent 953376e commit f036529
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 33 deletions.
71 changes: 40 additions & 31 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,37 +1059,7 @@ func (s *EtcdServer) run() {
f := func(context.Context) { s.applyAll(&ep, &ap) }
sched.Schedule(f)
case leases := <-expiredLeaseC:
s.goAttach(func() {
// Increases throughput of expired leases deletion process through parallelization
c := make(chan struct{}, maxPendingRevokes)
for _, lease := range leases {
select {
case c <- struct{}{}:
case <-s.stopping:
return
}
lid := lease.ID
s.goAttach(func() {
ctx := s.authStore.WithRoot(s.ctx)
_, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})
if lerr == nil {
leaseExpired.Inc()
} else {
if lg != nil {
lg.Warn(
"failed to revoke lease",
zap.String("lease-id", fmt.Sprintf("%016x", lid)),
zap.Error(lerr),
)
} else {
plog.Warningf("failed to revoke %016x (%q)", lid, lerr.Error())
}
}

<-c
})
}
})
s.revokeExpiredLeases(leases)
case err := <-s.errorc:
if lg != nil {
lg.Warn("server error", zap.Error(err))
Expand All @@ -1109,6 +1079,45 @@ func (s *EtcdServer) run() {
}
}

func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) {
s.goAttach(func() {
lg := s.Logger()
// Increases throughput of expired leases deletion process through parallelization
c := make(chan struct{}, maxPendingRevokes)
for _, curLease := range leases {
select {
case c <- struct{}{}:
case <-s.stopping:
return
}

f := func(lid int64) {
s.goAttach(func() {
ctx := s.authStore.WithRoot(s.ctx)
_, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lid})
if lerr == nil {
leaseExpired.Inc()
} else {
if lg != nil {
lg.Warn(
"failed to revoke lease",
zap.String("lease-id", fmt.Sprintf("%016x", lid)),
zap.Error(lerr),
)
} else {
plog.Warningf("failed to revoke %016x (%q)", lid, lerr.Error())
}
}

<-c
})
}

f(int64(curLease.ID))
}
})
}

func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
s.applySnapshot(ep, apply)
s.applyEntries(ep, apply)
Expand Down
3 changes: 1 addition & 2 deletions lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,10 +626,9 @@ func (le *lessor) revokeExpiredLeases() {
// checkpointScheduledLeases finds all scheduled lease checkpoints that are due and
// submits them to the checkpointer to persist them to the consensus log.
func (le *lessor) checkpointScheduledLeases() {
var cps []*pb.LeaseCheckpoint

// rate limit
for i := 0; i < leaseCheckpointRate/2; i++ {
var cps []*pb.LeaseCheckpoint
le.mu.Lock()
if le.isPrimary() {
cps = le.findDueScheduledCheckpoints(maxLeaseCheckpointBatchSize)
Expand Down

0 comments on commit f036529

Please sign in to comment.