Skip to content

Commit

Permalink
[WIP] storage: mark replicas added to replica GC queue as destroyed
Browse files Browse the repository at this point in the history
Before when adding replicas to the GC queue, they weren't fully considered
destroyed. This lead to redirectOnOrAcquireLease waiting for the
replica to be GCed before returning a NotLeaseHolderError.

Now redirectOnOrAcquireLease will respond faster and anything depending
on that such as an RPC will not hang.
  • Loading branch information
Masha Schneider committed Nov 1, 2017
1 parent aafdf68 commit 91b86db
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 51 deletions.
13 changes: 9 additions & 4 deletions pkg/storage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ type queueConfig struct {
// want to try to replicate a range until we know which zone it is in and
// therefore how many replicas are required).
acceptsUnsplitRanges bool
// processDestroyedReplicas controls whether or not we want to process replicas
// that have been destroyed but not GCed.
processDestroyedReplicas bool
// processTimeout is the timeout for processing a replica.
processTimeout time.Duration
// successes is a counter of replicas processed successfully.
Expand Down Expand Up @@ -599,11 +602,13 @@ func (bq *baseQueue) processReplica(
return errors.New("cannot process uninitialized replica")
}

if err := repl.IsDestroyed(); err != nil {
if log.V(3) {
log.Infof(queueCtx, "replica destroyed (%s); skipping", err)
if reason, err := repl.IsDestroyed(); err != nil {
if !bq.queueConfig.processDestroyedReplicas || reason != destroyReasonRemovalPending {
if log.V(3) {
log.Infof(queueCtx, "replica destroyed (%s); skipping", err)
}
return nil
}
return nil
}

// If the queue requires a replica to have the range lease in
Expand Down
79 changes: 55 additions & 24 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,34 @@ func (d *atomicDescString) String() string {
return *(*string)(atomic.LoadPointer(&d.strPtr))
}

// DestroyReason indicates if a replica is alive, destroyed, corrupted or pending destruction.
type DestroyReason int

type destroyStatus struct {
reason DestroyReason
err error
}

func (s destroyStatus) IsAlive() bool {
return s.reason == destroyReasonAlive
}

func (s *destroyStatus) SetDestroyed(err error, reason DestroyReason) {
s.err = err
s.reason = reason
}

const (
// The Replica is alive.
destroyReasonAlive DestroyReason = iota
// Indicates whether the replica has been corrupted.
destroyReasonCorrupted
// The replica has been marked for GC, but hasn't been GCed yet.
destroyReasonRemovalPending
// The replica has been GCed.
destroyReasonRemoved
)

// A Replica is a contiguous keyspace with writes managed via an
// instance of the Raft consensus algorithm. Many ranges may exist
// in a store and they are unlikely to be contiguous. Ranges are
Expand Down Expand Up @@ -288,13 +316,9 @@ type Replica struct {
mu struct {
// Protects all fields in the mu struct.
syncutil.RWMutex
// Has the replica been destroyed.
destroyed error
// Corrupted persistently (across process restarts) indicates whether the
// replica has been corrupted.
//
// TODO(tschottdorf): remove/refactor this field.
corrupted bool
// The destroyed status of a replica indicating if it's alive, corrupt,
// scheduled for destruction or has been GCed.
destroyed destroyStatus
// Is the range quiescent? Quiescent ranges are not Tick()'d and unquiesce
// whenever a Raft operation is performed.
quiescent bool
Expand Down Expand Up @@ -484,7 +508,7 @@ var _ KeyRange = &Replica{}
func (r *Replica) withRaftGroupLocked(
shouldCampaignOnCreation bool, f func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error),
) error {
if r.mu.destroyed != nil {
if r.mu.destroyed.reason == destroyReasonRemoved {
// Silently ignore all operations on destroyed replicas. We can't return an
// error here as all errors returned from this method are considered fatal.
return nil
Expand Down Expand Up @@ -688,8 +712,11 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked(
if err != nil {
return err
}
r.mu.destroyed = pErr.GetDetail()
r.mu.corrupted = r.mu.destroyed != nil
if r.mu.destroyed.reason != destroyReasonRemovalPending {
if pErr.GetDetail() != nil {
r.mu.destroyed.SetDestroyed(pErr.GetDetail(), destroyReasonRemoved)
}
}

if replicaID == 0 {
repDesc, ok := desc.GetReplicaDescriptor(r.store.StoreID())
Expand Down Expand Up @@ -968,7 +995,7 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(
// replica can get destroyed is an option, alternatively we can clear
// our leader status and close the proposalQuota whenever the replica is
// destroyed.
if r.mu.destroyed != nil {
if r.mu.destroyed.reason == destroyReasonRemoved {
if r.mu.proposalQuota != nil {
r.mu.proposalQuota.close()
}
Expand Down Expand Up @@ -1098,7 +1125,7 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(
// getEstimatedBehindCountRLocked returns an estimate of how far this replica is
// behind. A return value of 0 indicates that the replica is up to date.
func (r *Replica) getEstimatedBehindCountRLocked(raftStatus *raft.Status) int64 {
if !r.isInitializedRLocked() || r.mu.replicaID == 0 || r.mu.destroyed != nil {
if !r.isInitializedRLocked() || r.mu.replicaID == 0 || !r.mu.destroyed.IsAlive() {
// The range is either not fully initialized or has been destroyed.
return 0
}
Expand Down Expand Up @@ -1151,11 +1178,13 @@ func (r *Replica) IsFirstRange() bool {
return r.RangeID == 1
}

// IsDestroyed returns a non-nil error if the replica has been destroyed.
func (r *Replica) IsDestroyed() error {
// IsDestroyed returns a non-nil error if the replica has been destroyed
// and the value of pending. If pending is true the replica has been added
// to the GC queue but hasn't been removed.
func (r *Replica) IsDestroyed() (DestroyReason, error) {
r.mu.RLock()
defer r.mu.RUnlock()
return r.mu.destroyed
return r.mu.destroyed.reason, r.mu.destroyed.err
}

// GetLease returns the lease and, if available, the proposed next lease.
Expand Down Expand Up @@ -1769,7 +1798,6 @@ func (r *Replica) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
var br *roachpb.BatchResponse

if r.leaseholderStats != nil && ba.Header.GatewayNodeID != 0 {
r.leaseholderStats.record(ba.Header.GatewayNodeID)
}
Expand Down Expand Up @@ -2501,7 +2529,7 @@ func (r *Replica) executeReadOnlyBatch(
endCmds.done(br, pErr, proposalNoRetry)
}()

if err := r.IsDestroyed(); err != nil {
if _, err := r.IsDestroyed(); err != nil {
return nil, roachpb.NewError(err)
}

Expand Down Expand Up @@ -2937,9 +2965,13 @@ func (r *Replica) propose(
spans *SpanSet,
) (chan proposalResult, func() bool, func(), *roachpb.Error) {
noop := func() {}
if err := r.IsDestroyed(); err != nil {
return nil, nil, noop, roachpb.NewError(err)

r.mu.Lock()
if !r.mu.destroyed.IsAlive() {
r.mu.Unlock()
return nil, nil, noop, roachpb.NewError(r.mu.destroyed.err)
}
r.mu.Unlock()

rSpan, err := keys.Range(ba)
if err != nil {
Expand Down Expand Up @@ -3038,8 +3070,8 @@ func (r *Replica) propose(
// been destroyed between the initial check at the beginning of this method
// and the acquisition of Replica.mu. Failure to do so will leave pending
// proposals that never get cleared.
if err := r.mu.destroyed; err != nil {
return nil, nil, undoQuotaAcquisition, roachpb.NewError(err)
if r.mu.destroyed.reason == destroyReasonRemoved {
return nil, nil, undoQuotaAcquisition, roachpb.NewError(r.mu.destroyed.err)
}

repDesc, err := r.getReplicaDescriptorRLocked()
Expand Down Expand Up @@ -4693,7 +4725,7 @@ func (r *Replica) acquireSplitLock(
// then presumably it was alive for some reason other than a concurrent
// split and shouldn't be destroyed.
rightRng.mu.Lock()
rightRng.mu.destroyed = errors.Errorf("%s: failed to initialize", rightRng)
rightRng.mu.destroyed.SetDestroyed(errors.Errorf("%s: failed to initialize", rightRng), destroyReasonRemoved)
rightRng.mu.Unlock()
r.store.mu.Lock()
r.store.mu.replicas.Delete(int64(rightRng.RangeID))
Expand Down Expand Up @@ -5547,8 +5579,7 @@ func (r *Replica) maybeSetCorrupt(ctx context.Context, pErr *roachpb.Error) *roa

log.Errorf(ctx, "stalling replica due to: %s", cErr.ErrorMsg)
cErr.Processed = true
r.mu.destroyed = cErr
r.mu.corrupted = true
r.mu.destroyed.SetDestroyed(cErr, destroyReasonCorrupted)
pErr = roachpb.NewError(cErr)

// Try to persist the destroyed error message. If the underlying store is
Expand Down
17 changes: 9 additions & 8 deletions pkg/storage/replica_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,15 @@ func newReplicaGCQueue(store *Store, db *client.DB, gossip *gossip.Gossip) *repl
rgcq.baseQueue = newBaseQueue(
"replicaGC", rgcq, store, gossip,
queueConfig{
maxSize: defaultQueueMaxSize,
needsLease: false,
needsSystemConfig: false,
acceptsUnsplitRanges: true,
successes: store.metrics.ReplicaGCQueueSuccesses,
failures: store.metrics.ReplicaGCQueueFailures,
pending: store.metrics.ReplicaGCQueuePending,
processingNanos: store.metrics.ReplicaGCQueueProcessingNanos,
maxSize: defaultQueueMaxSize,
needsLease: false,
needsSystemConfig: false,
acceptsUnsplitRanges: true,
processDestroyedReplicas: true,
successes: store.metrics.ReplicaGCQueueSuccesses,
failures: store.metrics.ReplicaGCQueueFailures,
pending: store.metrics.ReplicaGCQueuePending,
processingNanos: store.metrics.ReplicaGCQueueProcessingNanos,
},
)
return rgcq
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6102,7 +6102,7 @@ func TestReplicaCorruption(t *testing.T) {
r := tc.store.LookupReplica(rkey, rkey)
r.mu.Lock()
defer r.mu.Unlock()
if r.mu.destroyed.Error() != pErr.GetDetail().Error() {
if r.mu.destroyed.err.Error() != pErr.GetDetail().Error() {
t.Fatalf("expected r.mu.destroyed == pErr.GetDetail(), instead %q != %q", r.mu.destroyed, pErr.GetDetail())
}

Expand All @@ -6111,7 +6111,7 @@ func TestReplicaCorruption(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if r.mu.destroyed.Error() != pErr.GetDetail().Error() {
if r.mu.destroyed.err.Error() != pErr.GetDetail().Error() {
t.Fatalf("expected r.mu.destroyed == pErr.GetDetail(), instead %q != %q", r.mu.destroyed, pErr.GetDetail())
}

Expand Down
28 changes: 16 additions & 12 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (rs *storeReplicaVisitor) Visit(visitor func(*Replica) bool) {
destroyed := repl.mu.destroyed
initialized := repl.isInitializedRLocked()
repl.mu.RUnlock()
if initialized && destroyed == nil && !visitor(repl) {
if initialized && (destroyed.IsAlive() || destroyed.reason == destroyReasonRemovalPending) && !visitor(repl) {
break
}
}
Expand Down Expand Up @@ -2155,7 +2155,6 @@ func (s *Store) RemoveReplica(
<-s.snapshotApplySem
}()
}

rep.raftMu.Lock()
defer rep.raftMu.Unlock()
return s.removeReplicaImpl(ctx, rep, consistentDesc, destroy)
Expand Down Expand Up @@ -2215,7 +2214,7 @@ func (s *Store) removeReplicaImpl(
rep.mu.Lock()
rep.cancelPendingCommandsLocked()
rep.mu.internalRaftGroup = nil
rep.mu.destroyed = roachpb.NewRangeNotFoundError(rep.RangeID)
rep.mu.destroyed.SetDestroyed(roachpb.NewRangeNotFoundError(rep.RangeID), destroyReasonRemoved)
rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()

Expand All @@ -2239,6 +2238,7 @@ func (s *Store) removeReplicaImpl(
// TODO(peter): Could release s.mu.Lock() here.
s.maybeGossipOnCapacityChange(ctx, rangeChangeEvent)
s.scanner.RemoveReplica(rep)

return nil
}

Expand Down Expand Up @@ -2390,7 +2390,7 @@ func (s *Store) deadReplicas() roachpb.StoreDeadReplicas {
s.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool {
r := (*Replica)(v)
r.mu.RLock()
corrupted := r.mu.corrupted
corrupted := r.mu.destroyed.reason == destroyReasonCorrupted
desc := r.mu.state.Desc
r.mu.RUnlock()
replicaDesc, ok := desc.GetReplicaDescriptor(s.Ident.StoreID)
Expand Down Expand Up @@ -2576,7 +2576,6 @@ func (s *Store) Send(
if br, pErr = s.maybeWaitInPushTxnQueue(ctx, &ba, repl); br != nil || pErr != nil {
return br, pErr
}

br, pErr = repl.Send(ctx, ba)
if pErr == nil {
return br, nil
Expand Down Expand Up @@ -2834,7 +2833,6 @@ func (s *Store) HandleSnapshot(
if header.RaftMessageRequest.ToReplica.ReplicaID == 0 {
inSnap.snapType = snapTypePreemptive
}

if err := s.processRaftSnapshotRequest(ctx, &header.RaftMessageRequest, inSnap); err != nil {
return sendSnapError(errors.Wrap(err.GoError(), "failed to apply snapshot"))
}
Expand Down Expand Up @@ -3288,6 +3286,11 @@ func (s *Store) HandleRaftResponse(ctx context.Context, resp *RaftMessageRespons
if err != nil {
log.Errorf(ctx, "unable to add to replica GC queue: %s", err)
} else if added {
repl.mu.Lock()
if repl.mu.destroyed.IsAlive() {
repl.mu.destroyed.SetDestroyed(roachpb.NewRangeNotFoundError(repl.RangeID), destroyReasonRemovalPending)
}
repl.mu.Unlock()
log.Infof(ctx, "added to replica GC queue (peer suggestion)")
}
case *roachpb.StoreNotFoundError:
Expand Down Expand Up @@ -3914,15 +3917,16 @@ func (s *Store) tryGetOrCreateReplica(

repl.raftMu.Lock()
repl.mu.RLock()
destroyed, corrupted := repl.mu.destroyed, repl.mu.corrupted
destroyed := repl.mu.destroyed
repl.mu.RUnlock()
if destroyed != nil {
if destroyed.reason == destroyReasonRemoved {
repl.raftMu.Unlock()
if corrupted {
return nil, false, destroyed
}
return nil, false, errRetry
}
if destroyed.reason == destroyReasonCorrupted {
repl.raftMu.Unlock()
return nil, false, destroyed.err
}
repl.mu.Lock()
if err := repl.setReplicaIDRaftMuLockedMuLocked(replicaID); err != nil {
repl.mu.Unlock()
Expand Down Expand Up @@ -3983,7 +3987,7 @@ func (s *Store) tryGetOrCreateReplica(
if err := repl.initRaftMuLockedReplicaMuLocked(desc, s.Clock(), replicaID); err != nil {
// Mark the replica as destroyed and remove it from the replicas maps to
// ensure nobody tries to use it
repl.mu.destroyed = errors.Wrapf(err, "%s: failed to initialize", repl)
repl.mu.destroyed.SetDestroyed(errors.Wrapf(err, "%s: failed to initialize", repl), destroyReasonRemoved)
repl.mu.Unlock()
s.mu.Lock()
s.mu.replicas.Delete(int64(rangeID))
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func TestStoreRemoveReplicaDestroy(t *testing.T) {
}

repl1.mu.Lock()
expErr := roachpb.NewError(repl1.mu.destroyed)
expErr := roachpb.NewError(repl1.mu.destroyed.err)
lease := *repl1.mu.state.Lease
repl1.mu.Unlock()

Expand Down

0 comments on commit 91b86db

Please sign in to comment.