Skip to content

Commit

Permalink
[WIP] storage: mark replicas added to replica GC queue as destroyed
Browse files Browse the repository at this point in the history
Before when adding replicas to the GC queue, they weren't fully considered
destroyed. This lead to redirectOnOrAcquireLease waiting for the
replica to be GCed before returning a NotLeaseHolderError.

Now redirectOnOrAcquireLease will respond faster and anything depending
on that such as an RPC will not hang.
  • Loading branch information
Masha Schneider committed Oct 19, 2017
1 parent 95713d3 commit b1f626a
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 21 deletions.
2 changes: 1 addition & 1 deletion pkg/storage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ func (bq *baseQueue) processReplica(
return errors.New("cannot process uninitialized replica")
}

if err := repl.IsDestroyed(); err != nil {
if err, pending := repl.IsDestroyed(); err != nil && pending == false {
if log.V(3) {
log.Infof(queueCtx, "replica destroyed (%s); skipping", err)
}
Expand Down
27 changes: 15 additions & 12 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,10 @@ type Replica struct {
// Protects all fields in the mu struct.
syncutil.RWMutex
// Has the replica been destroyed.
destroyed error
destroyed struct {
destroyedErr error
pending bool
}
// Corrupted persistently (across process restarts) indicates whether the
// replica has been corrupted.
//
Expand Down Expand Up @@ -478,7 +481,7 @@ var _ KeyRange = &Replica{}
func (r *Replica) withRaftGroupLocked(
shouldCampaignOnCreation bool, f func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error),
) error {
if r.mu.destroyed != nil {
if r.mu.destroyed.destroyedErr != nil {
// Silently ignore all operations on destroyed replicas. We can't return an
// error here as all errors returned from this method are considered fatal.
return nil
Expand Down Expand Up @@ -682,8 +685,8 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked(
if err != nil {
return err
}
r.mu.destroyed = pErr.GetDetail()
r.mu.corrupted = r.mu.destroyed != nil
r.mu.destroyed.destroyedErr = pErr.GetDetail()
r.mu.corrupted = r.mu.destroyed.destroyedErr != nil

if replicaID == 0 {
repDesc, ok := desc.GetReplicaDescriptor(r.store.StoreID())
Expand Down Expand Up @@ -962,7 +965,7 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(
// replica can get destroyed is an option, alternatively we can clear
// our leader status and close the proposalQuota whenever the replica is
// destroyed.
if r.mu.destroyed != nil {
if r.mu.destroyed.destroyedErr != nil {
if r.mu.proposalQuota != nil {
r.mu.proposalQuota.close()
}
Expand Down Expand Up @@ -1142,10 +1145,10 @@ func (r *Replica) IsFirstRange() bool {
}

// IsDestroyed returns a non-nil error if the replica has been destroyed.
func (r *Replica) IsDestroyed() error {
func (r *Replica) IsDestroyed() (error, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
return r.mu.destroyed
return r.mu.destroyed.destroyedErr, r.mu.destroyed.pending
}

// getLease returns the current lease, and the tentative next one, if a lease
Expand Down Expand Up @@ -2435,7 +2438,7 @@ func (r *Replica) executeReadOnlyBatch(
endCmds.done(br, pErr, proposalNoRetry)
}()

if err := r.IsDestroyed(); err != nil {
if err, _ := r.IsDestroyed(); err != nil {
return nil, roachpb.NewError(err)
}

Expand Down Expand Up @@ -2871,7 +2874,7 @@ func (r *Replica) propose(
spans *SpanSet,
) (chan proposalResult, func() bool, func(), *roachpb.Error) {
noop := func() {}
if err := r.IsDestroyed(); err != nil {
if err, _ := r.IsDestroyed(); err != nil {
return nil, nil, noop, roachpb.NewError(err)
}

Expand Down Expand Up @@ -2964,7 +2967,7 @@ func (r *Replica) propose(
// been destroyed between the initial check at the beginning of this method
// and the acquisition of Replica.mu. Failure to do so will leave pending
// proposals that never get cleared.
if err := r.mu.destroyed; err != nil {
if err := r.mu.destroyed.destroyedErr; err != nil {
return nil, nil, undoQuotaAcquisition, roachpb.NewError(err)
}

Expand Down Expand Up @@ -4527,7 +4530,7 @@ func (r *Replica) acquireSplitLock(
// then presumably it was alive for some reason other than a concurrent
// split and shouldn't be destroyed.
rightRng.mu.Lock()
rightRng.mu.destroyed = errors.Errorf("%s: failed to initialize", rightRng)
rightRng.mu.destroyed.destroyedErr = errors.Errorf("%s: failed to initialize", rightRng)
rightRng.mu.Unlock()
r.store.mu.Lock()
r.store.mu.replicas.Delete(int64(rightRng.RangeID))
Expand Down Expand Up @@ -5376,7 +5379,7 @@ func (r *Replica) maybeSetCorrupt(ctx context.Context, pErr *roachpb.Error) *roa

log.Errorf(ctx, "stalling replica due to: %s", cErr.ErrorMsg)
cErr.Processed = true
r.mu.destroyed = cErr
r.mu.destroyed.destroyedErr = cErr
r.mu.corrupted = true
pErr = roachpb.NewError(cErr)

Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6059,7 +6059,7 @@ func TestReplicaCorruption(t *testing.T) {
r := tc.store.LookupReplica(rkey, rkey)
r.mu.Lock()
defer r.mu.Unlock()
if r.mu.destroyed.Error() != pErr.GetDetail().Error() {
if r.mu.destroyed.destroyedErr.Error() != pErr.GetDetail().Error() {
t.Fatalf("expected r.mu.destroyed == pErr.GetDetail(), instead %q != %q", r.mu.destroyed, pErr.GetDetail())
}

Expand All @@ -6068,7 +6068,7 @@ func TestReplicaCorruption(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if r.mu.destroyed.Error() != pErr.GetDetail().Error() {
if r.mu.destroyed.destroyedErr.Error() != pErr.GetDetail().Error() {
t.Fatalf("expected r.mu.destroyed == pErr.GetDetail(), instead %q != %q", r.mu.destroyed, pErr.GetDetail())
}

Expand Down
12 changes: 7 additions & 5 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (rs *storeReplicaVisitor) Visit(visitor func(*Replica) bool) {
destroyed := repl.mu.destroyed
initialized := repl.isInitializedRLocked()
repl.mu.RUnlock()
if initialized && destroyed == nil && !visitor(repl) {
if initialized && destroyed.destroyedErr == nil && !visitor(repl) {
break
}
}
Expand Down Expand Up @@ -2210,7 +2210,7 @@ func (s *Store) removeReplicaImpl(
rep.mu.Lock()
rep.cancelPendingCommandsLocked()
rep.mu.internalRaftGroup = nil
rep.mu.destroyed = roachpb.NewRangeNotFoundError(rep.RangeID)
rep.mu.destroyed.destroyedErr = roachpb.NewRangeNotFoundError(rep.RangeID)
rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()

Expand Down Expand Up @@ -3228,6 +3228,8 @@ func (s *Store) HandleRaftResponse(ctx context.Context, resp *RaftMessageRespons
// replica GC to succeed.
if tErr.ReplicaID == repl.mu.replicaID {
repl.cancelPendingCommandsLocked()
repl.mu.destroyed.destroyedErr = roachpb.NewRangeNotFoundError(repl.RangeID)
repl.mu.destroyed.pending = true
}
repl.mu.Unlock()
added, err := s.replicaGCQueue.Add(
Expand Down Expand Up @@ -3831,10 +3833,10 @@ func (s *Store) tryGetOrCreateReplica(
repl.mu.RLock()
destroyed, corrupted := repl.mu.destroyed, repl.mu.corrupted
repl.mu.RUnlock()
if destroyed != nil {
if destroyed.destroyedErr != nil {
repl.raftMu.Unlock()
if corrupted {
return nil, false, destroyed
return nil, false, destroyed.destroyedErr
}
return nil, false, errRetry
}
Expand Down Expand Up @@ -3898,7 +3900,7 @@ func (s *Store) tryGetOrCreateReplica(
if err := repl.initRaftMuLockedReplicaMuLocked(desc, s.Clock(), replicaID); err != nil {
// Mark the replica as destroyed and remove it from the replicas maps to
// ensure nobody tries to use it
repl.mu.destroyed = errors.Wrapf(err, "%s: failed to initialize", repl)
repl.mu.destroyed.destroyedErr = errors.Wrapf(err, "%s: failed to initialize", repl)
repl.mu.Unlock()
s.mu.Lock()
s.mu.replicas.Delete(int64(rangeID))
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ func TestStoreRemoveReplicaDestroy(t *testing.T) {
}

repl1.mu.Lock()
expErr := roachpb.NewError(repl1.mu.destroyed)
expErr := roachpb.NewError(repl1.mu.destroyed.destroyedErr)
lease := *repl1.mu.state.Lease
repl1.mu.Unlock()

Expand Down

0 comments on commit b1f626a

Please sign in to comment.