From b1f626a81b006dee0fa2e158b19cc8ea3f724049 Mon Sep 17 00:00:00 2001 From: Masha Schneider Date: Wed, 18 Oct 2017 17:13:26 -0400 Subject: [PATCH] [WIP] storage: mark replicas added to replica GC queue as destroyed Before when adding replicas to the GC queue, they weren't fully considered destroyed. This lead to redirectOnOrAcquireLease waiting for the replica to be GCed before returning a NotLeaseHolderError. Now redirectOnOrAcquireLease will respond faster and anything depending on that such as an RPC will not hang. --- pkg/storage/queue.go | 2 +- pkg/storage/replica.go | 27 +++++++++++++++------------ pkg/storage/replica_test.go | 4 ++-- pkg/storage/store.go | 12 +++++++----- pkg/storage/store_test.go | 2 +- 5 files changed, 26 insertions(+), 21 deletions(-) diff --git a/pkg/storage/queue.go b/pkg/storage/queue.go index 5481d438d1c9..637a55ed53b9 100644 --- a/pkg/storage/queue.go +++ b/pkg/storage/queue.go @@ -599,7 +599,7 @@ func (bq *baseQueue) processReplica( return errors.New("cannot process uninitialized replica") } - if err := repl.IsDestroyed(); err != nil { + if err, pending := repl.IsDestroyed(); err != nil && pending == false { if log.V(3) { log.Infof(queueCtx, "replica destroyed (%s); skipping", err) } diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index c861c2a4773a..d89808002b10 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -285,7 +285,10 @@ type Replica struct { // Protects all fields in the mu struct. syncutil.RWMutex // Has the replica been destroyed. - destroyed error + destroyed struct { + destroyedErr error + pending bool + } // Corrupted persistently (across process restarts) indicates whether the // replica has been corrupted. // @@ -478,7 +481,7 @@ var _ KeyRange = &Replica{} func (r *Replica) withRaftGroupLocked( shouldCampaignOnCreation bool, f func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error), ) error { - if r.mu.destroyed != nil { + if r.mu.destroyed.destroyedErr != nil { // Silently ignore all operations on destroyed replicas. We can't return an // error here as all errors returned from this method are considered fatal. return nil @@ -682,8 +685,8 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked( if err != nil { return err } - r.mu.destroyed = pErr.GetDetail() - r.mu.corrupted = r.mu.destroyed != nil + r.mu.destroyed.destroyedErr = pErr.GetDetail() + r.mu.corrupted = r.mu.destroyed.destroyedErr != nil if replicaID == 0 { repDesc, ok := desc.GetReplicaDescriptor(r.store.StoreID()) @@ -962,7 +965,7 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( // replica can get destroyed is an option, alternatively we can clear // our leader status and close the proposalQuota whenever the replica is // destroyed. - if r.mu.destroyed != nil { + if r.mu.destroyed.destroyedErr != nil { if r.mu.proposalQuota != nil { r.mu.proposalQuota.close() } @@ -1142,10 +1145,10 @@ func (r *Replica) IsFirstRange() bool { } // IsDestroyed returns a non-nil error if the replica has been destroyed. -func (r *Replica) IsDestroyed() error { +func (r *Replica) IsDestroyed() (error, bool) { r.mu.RLock() defer r.mu.RUnlock() - return r.mu.destroyed + return r.mu.destroyed.destroyedErr, r.mu.destroyed.pending } // getLease returns the current lease, and the tentative next one, if a lease @@ -2435,7 +2438,7 @@ func (r *Replica) executeReadOnlyBatch( endCmds.done(br, pErr, proposalNoRetry) }() - if err := r.IsDestroyed(); err != nil { + if err, _ := r.IsDestroyed(); err != nil { return nil, roachpb.NewError(err) } @@ -2871,7 +2874,7 @@ func (r *Replica) propose( spans *SpanSet, ) (chan proposalResult, func() bool, func(), *roachpb.Error) { noop := func() {} - if err := r.IsDestroyed(); err != nil { + if err, _ := r.IsDestroyed(); err != nil { return nil, nil, noop, roachpb.NewError(err) } @@ -2964,7 +2967,7 @@ func (r *Replica) propose( // been destroyed between the initial check at the beginning of this method // and the acquisition of Replica.mu. Failure to do so will leave pending // proposals that never get cleared. - if err := r.mu.destroyed; err != nil { + if err := r.mu.destroyed.destroyedErr; err != nil { return nil, nil, undoQuotaAcquisition, roachpb.NewError(err) } @@ -4527,7 +4530,7 @@ func (r *Replica) acquireSplitLock( // then presumably it was alive for some reason other than a concurrent // split and shouldn't be destroyed. rightRng.mu.Lock() - rightRng.mu.destroyed = errors.Errorf("%s: failed to initialize", rightRng) + rightRng.mu.destroyed.destroyedErr = errors.Errorf("%s: failed to initialize", rightRng) rightRng.mu.Unlock() r.store.mu.Lock() r.store.mu.replicas.Delete(int64(rightRng.RangeID)) @@ -5376,7 +5379,7 @@ func (r *Replica) maybeSetCorrupt(ctx context.Context, pErr *roachpb.Error) *roa log.Errorf(ctx, "stalling replica due to: %s", cErr.ErrorMsg) cErr.Processed = true - r.mu.destroyed = cErr + r.mu.destroyed.destroyedErr = cErr r.mu.corrupted = true pErr = roachpb.NewError(cErr) diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 8c1662b7d9e4..075b5bcbe56f 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -6059,7 +6059,7 @@ func TestReplicaCorruption(t *testing.T) { r := tc.store.LookupReplica(rkey, rkey) r.mu.Lock() defer r.mu.Unlock() - if r.mu.destroyed.Error() != pErr.GetDetail().Error() { + if r.mu.destroyed.destroyedErr.Error() != pErr.GetDetail().Error() { t.Fatalf("expected r.mu.destroyed == pErr.GetDetail(), instead %q != %q", r.mu.destroyed, pErr.GetDetail()) } @@ -6068,7 +6068,7 @@ func TestReplicaCorruption(t *testing.T) { if err != nil { t.Fatal(err) } - if r.mu.destroyed.Error() != pErr.GetDetail().Error() { + if r.mu.destroyed.destroyedErr.Error() != pErr.GetDetail().Error() { t.Fatalf("expected r.mu.destroyed == pErr.GetDetail(), instead %q != %q", r.mu.destroyed, pErr.GetDetail()) } diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 0e3f9b7abf0c..3fc805efd7c7 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -312,7 +312,7 @@ func (rs *storeReplicaVisitor) Visit(visitor func(*Replica) bool) { destroyed := repl.mu.destroyed initialized := repl.isInitializedRLocked() repl.mu.RUnlock() - if initialized && destroyed == nil && !visitor(repl) { + if initialized && destroyed.destroyedErr == nil && !visitor(repl) { break } } @@ -2210,7 +2210,7 @@ func (s *Store) removeReplicaImpl( rep.mu.Lock() rep.cancelPendingCommandsLocked() rep.mu.internalRaftGroup = nil - rep.mu.destroyed = roachpb.NewRangeNotFoundError(rep.RangeID) + rep.mu.destroyed.destroyedErr = roachpb.NewRangeNotFoundError(rep.RangeID) rep.mu.Unlock() rep.readOnlyCmdMu.Unlock() @@ -3228,6 +3228,8 @@ func (s *Store) HandleRaftResponse(ctx context.Context, resp *RaftMessageRespons // replica GC to succeed. if tErr.ReplicaID == repl.mu.replicaID { repl.cancelPendingCommandsLocked() + repl.mu.destroyed.destroyedErr = roachpb.NewRangeNotFoundError(repl.RangeID) + repl.mu.destroyed.pending = true } repl.mu.Unlock() added, err := s.replicaGCQueue.Add( @@ -3831,10 +3833,10 @@ func (s *Store) tryGetOrCreateReplica( repl.mu.RLock() destroyed, corrupted := repl.mu.destroyed, repl.mu.corrupted repl.mu.RUnlock() - if destroyed != nil { + if destroyed.destroyedErr != nil { repl.raftMu.Unlock() if corrupted { - return nil, false, destroyed + return nil, false, destroyed.destroyedErr } return nil, false, errRetry } @@ -3898,7 +3900,7 @@ func (s *Store) tryGetOrCreateReplica( if err := repl.initRaftMuLockedReplicaMuLocked(desc, s.Clock(), replicaID); err != nil { // Mark the replica as destroyed and remove it from the replicas maps to // ensure nobody tries to use it - repl.mu.destroyed = errors.Wrapf(err, "%s: failed to initialize", repl) + repl.mu.destroyed.destroyedErr = errors.Wrapf(err, "%s: failed to initialize", repl) repl.mu.Unlock() s.mu.Lock() s.mu.replicas.Delete(int64(rangeID)) diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index d9094baf0656..c7b86d20a974 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -470,7 +470,7 @@ func TestStoreRemoveReplicaDestroy(t *testing.T) { } repl1.mu.Lock() - expErr := roachpb.NewError(repl1.mu.destroyed) + expErr := roachpb.NewError(repl1.mu.destroyed.destroyedErr) lease := *repl1.mu.state.Lease repl1.mu.Unlock()