From acb1ee993af5f694c3dae8d99954b583b7dce390 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Sat, 4 Jun 2022 14:01:08 +0800 Subject: [PATCH] Backport two lease related bug fixes to 3.5 The first bug fix is to resolve the race condition between goroutine and channel on the same leases to be revoked. It's a classic mistake in using Golang channel + goroutine. Please refer to https://go.dev/doc/effective_go#channels The second bug fix is to resolve the issue that etcd lessor may continue to schedule checkpoint after stepping down the leader role. --- server/etcdserver/server.go | 63 +++++++++++++++++++++---------------- server/lease/lessor.go | 3 +- 2 files changed, 37 insertions(+), 29 deletions(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 4529a68bb20..33bad9f6ce2 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -1119,33 +1119,7 @@ func (s *EtcdServer) run() { f := func(context.Context) { s.applyAll(&ep, &ap) } sched.Schedule(f) case leases := <-expiredLeaseC: - s.GoAttach(func() { - // Increases throughput of expired leases deletion process through parallelization - c := make(chan struct{}, maxPendingRevokes) - for _, lease := range leases { - select { - case c <- struct{}{}: - case <-s.stopping: - return - } - lid := lease.ID - s.GoAttach(func() { - ctx := s.authStore.WithRoot(s.ctx) - _, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)}) - if lerr == nil { - leaseExpired.Inc() - } else { - lg.Warn( - "failed to revoke lease", - zap.String("lease-id", fmt.Sprintf("%016x", lid)), - zap.Error(lerr), - ) - } - - <-c - }) - } - }) + s.revokeExpiredLeases(leases) case err := <-s.errorc: lg.Warn("server error", zap.Error(err)) lg.Warn("data-dir used by this member must be removed") @@ -1160,6 +1134,41 @@ func (s *EtcdServer) run() { } } +func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) { + s.GoAttach(func() { + lg := s.Logger() + // Increases throughput of expired leases deletion process through parallelization + c := make(chan struct{}, maxPendingRevokes) + for _, curLease := range leases { + select { + case c <- struct{}{}: + case <-s.stopping: + return + } + + f := func(lid int64) { + s.GoAttach(func() { + ctx := s.authStore.WithRoot(s.ctx) + _, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lid}) + if lerr == nil { + leaseExpired.Inc() + } else { + lg.Warn( + "failed to revoke lease", + zap.String("lease-id", fmt.Sprintf("%016x", lid)), + zap.Error(lerr), + ) + } + + <-c + }) + } + + f(int64(curLease.ID)) + } + }) +} + // Cleanup removes allocated objects by EtcdServer.NewServer in // situation that EtcdServer::Start was not called (that takes care of cleanup). func (s *EtcdServer) Cleanup() { diff --git a/server/lease/lessor.go b/server/lease/lessor.go index 02ee77f5053..4c6bf3c3ce7 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -654,10 +654,9 @@ func (le *lessor) revokeExpiredLeases() { // checkpointScheduledLeases finds all scheduled lease checkpoints that are due and // submits them to the checkpointer to persist them to the consensus log. func (le *lessor) checkpointScheduledLeases() { - var cps []*pb.LeaseCheckpoint - // rate limit for i := 0; i < leaseCheckpointRate/2; i++ { + var cps []*pb.LeaseCheckpoint le.mu.Lock() if le.isPrimary() { cps = le.findDueScheduledCheckpoints(maxLeaseCheckpointBatchSize)