Skip to content

Commit

Permalink
[WIP] storage: mark replicas added to replica GC queue as destroyed
Browse files Browse the repository at this point in the history
Before when adding replicas to the GC queue, they weren't fully considered
destroyed. This lead to redirectOnOrAcquireLease waiting for the
replica to be GCed before returning a NotLeaseHolderError.

Now redirectOnOrAcquireLease will respond faster and anything depending
on that such as an RPC will not hang.
  • Loading branch information
Masha Schneider committed Oct 25, 2017
1 parent a7a09db commit dc61bd4
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 48 deletions.
12 changes: 8 additions & 4 deletions pkg/storage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ type queueConfig struct {
processingNanos *metric.Counter
// purgatory is a gauge measuring current replica count in purgatory.
purgatory *metric.Gauge
// whether or not we want to process replicas that have been destroyed but not GCed.
processDestroyedReplicas bool
}

// baseQueue is the base implementation of the replicaQueue interface.
Expand Down Expand Up @@ -599,11 +601,13 @@ func (bq *baseQueue) processReplica(
return errors.New("cannot process uninitialized replica")
}

if err := repl.IsDestroyed(); err != nil {
if log.V(3) {
log.Infof(queueCtx, "replica destroyed (%s); skipping", err)
if err, reason := repl.IsDestroyed(); err != nil {
if !bq.queueConfig.processDestroyedReplicas || reason != destroyReasonRemovalPending {
if log.V(3) {
log.Infof(queueCtx, "replica destroyed (%s); skipping", err)
}
return nil
}
return nil
}

// If the queue requires a replica to have the range lease in
Expand Down
69 changes: 48 additions & 21 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,26 @@ func (d *atomicDescString) String() string {
return *(*string)(atomic.LoadPointer(&d.strPtr))
}

type destroyReason int

type destroyStatus struct {
reason destroyReason
err error
}

func (s destroyStatus) IsAlive() bool {
return s.reason == destroyReasonAlive
}

const (
destroyReasonAlive destroyReason = iota // `err` must be nil (Go has no enums :crying_cat_face: )
// Corrupted persistently (across process restarts) indicates whether the
// replica has been corrupted.
destroyReasonCorrupted
destroyReasonRemovalPending // replaces `pending==true`
destroyReasonRemoved // replaces `pending==false`
)

// A Replica is a contiguous keyspace with writes managed via an
// instance of the Raft consensus algorithm. Many ranges may exist
// in a store and they are unlikely to be contiguous. Ranges are
Expand Down Expand Up @@ -288,13 +308,9 @@ type Replica struct {
mu struct {
// Protects all fields in the mu struct.
syncutil.RWMutex
// Has the replica been destroyed.
destroyed error
// Corrupted persistently (across process restarts) indicates whether the
// replica has been corrupted.
//
// TODO(tschottdorf): remove/refactor this field.
corrupted bool
// Has the replica been destroyed. Pending will be set when a replica is
// added to the GC queue, but hasn't been GCed yet.
destroyed destroyStatus
// Is the range quiescent? Quiescent ranges are not Tick()'d and unquiesce
// whenever a Raft operation is performed.
quiescent bool
Expand Down Expand Up @@ -482,7 +498,7 @@ var _ KeyRange = &Replica{}
func (r *Replica) withRaftGroupLocked(
shouldCampaignOnCreation bool, f func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error),
) error {
if r.mu.destroyed != nil {
if r.mu.destroyed.reason == destroyReasonRemoved {
// Silently ignore all operations on destroyed replicas. We can't return an
// error here as all errors returned from this method are considered fatal.
return nil
Expand Down Expand Up @@ -686,8 +702,13 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked(
if err != nil {
return err
}
r.mu.destroyed = pErr.GetDetail()
r.mu.corrupted = r.mu.destroyed != nil

if r.mu.destroyed.reason != destroyReasonRemovalPending {
r.mu.destroyed.err = pErr.GetDetail()
if r.mu.destroyed.err != nil {
r.mu.destroyed.reason = destroyReasonRemoved
}
}

if replicaID == 0 {
repDesc, ok := desc.GetReplicaDescriptor(r.store.StoreID())
Expand Down Expand Up @@ -966,7 +987,7 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(
// replica can get destroyed is an option, alternatively we can clear
// our leader status and close the proposalQuota whenever the replica is
// destroyed.
if r.mu.destroyed != nil {
if r.mu.destroyed.reason == destroyReasonRemoved {
if r.mu.proposalQuota != nil {
r.mu.proposalQuota.close()
}
Expand Down Expand Up @@ -1145,11 +1166,13 @@ func (r *Replica) IsFirstRange() bool {
return r.RangeID == 1
}

// IsDestroyed returns a non-nil error if the replica has been destroyed.
func (r *Replica) IsDestroyed() error {
// IsDestroyed returns a non-nil error if the replica has been destroyed
// and the value of pending. If pending is true the replica has been added
// to the GC queue but hasn't been removed.
func (r *Replica) IsDestroyed() (error, destroyReason) {
r.mu.RLock()
defer r.mu.RUnlock()
return r.mu.destroyed
return r.mu.destroyed.err, r.mu.destroyed.reason
}

// getLease returns the current lease, and the tentative next one, if a lease
Expand Down Expand Up @@ -1764,7 +1787,6 @@ func (r *Replica) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
var br *roachpb.BatchResponse

if r.leaseholderStats != nil && ba.Header.GatewayNodeID != 0 {
r.leaseholderStats.record(ba.Header.GatewayNodeID)
}
Expand Down Expand Up @@ -2497,7 +2519,7 @@ func (r *Replica) executeReadOnlyBatch(
endCmds.done(br, pErr, proposalNoRetry)
}()

if err := r.IsDestroyed(); err != nil {
if err, _ := r.IsDestroyed(); err != nil {
return nil, roachpb.NewError(err)
}

Expand Down Expand Up @@ -2933,9 +2955,13 @@ func (r *Replica) propose(
spans *SpanSet,
) (chan proposalResult, func() bool, func(), *roachpb.Error) {
noop := func() {}
if err := r.IsDestroyed(); err != nil {

r.mu.Lock()
if err := r.mu.destroyed.err; err != nil && !r.mu.destroyed.IsAlive() {
r.mu.Unlock()
return nil, nil, noop, roachpb.NewError(err)
}
r.mu.Unlock()

rSpan, err := keys.Range(ba)
if err != nil {
Expand Down Expand Up @@ -3034,7 +3060,7 @@ func (r *Replica) propose(
// been destroyed between the initial check at the beginning of this method
// and the acquisition of Replica.mu. Failure to do so will leave pending
// proposals that never get cleared.
if err := r.mu.destroyed; err != nil {
if err := r.mu.destroyed.err; err != nil && r.mu.destroyed.reason == destroyReasonRemoved {
return nil, nil, undoQuotaAcquisition, roachpb.NewError(err)
}

Expand Down Expand Up @@ -4655,7 +4681,8 @@ func (r *Replica) acquireSplitLock(
// then presumably it was alive for some reason other than a concurrent
// split and shouldn't be destroyed.
rightRng.mu.Lock()
rightRng.mu.destroyed = errors.Errorf("%s: failed to initialize", rightRng)
rightRng.mu.destroyed.err = errors.Errorf("%s: failed to initialize", rightRng)
rightRng.mu.destroyed.reason = destroyReasonRemoved
rightRng.mu.Unlock()
r.store.mu.Lock()
r.store.mu.replicas.Delete(int64(rightRng.RangeID))
Expand Down Expand Up @@ -5509,8 +5536,8 @@ func (r *Replica) maybeSetCorrupt(ctx context.Context, pErr *roachpb.Error) *roa

log.Errorf(ctx, "stalling replica due to: %s", cErr.ErrorMsg)
cErr.Processed = true
r.mu.destroyed = cErr
r.mu.corrupted = true
r.mu.destroyed.err = cErr
r.mu.destroyed.reason = destroyReasonCorrupted
pErr = roachpb.NewError(cErr)

// Try to persist the destroyed error message. If the underlying store is
Expand Down
17 changes: 9 additions & 8 deletions pkg/storage/replica_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,15 @@ func newReplicaGCQueue(store *Store, db *client.DB, gossip *gossip.Gossip) *repl
rgcq.baseQueue = newBaseQueue(
"replicaGC", rgcq, store, gossip,
queueConfig{
maxSize: defaultQueueMaxSize,
needsLease: false,
needsSystemConfig: false,
acceptsUnsplitRanges: true,
successes: store.metrics.ReplicaGCQueueSuccesses,
failures: store.metrics.ReplicaGCQueueFailures,
pending: store.metrics.ReplicaGCQueuePending,
processingNanos: store.metrics.ReplicaGCQueueProcessingNanos,
maxSize: defaultQueueMaxSize,
needsLease: false,
needsSystemConfig: false,
acceptsUnsplitRanges: true,
successes: store.metrics.ReplicaGCQueueSuccesses,
failures: store.metrics.ReplicaGCQueueFailures,
pending: store.metrics.ReplicaGCQueuePending,
processingNanos: store.metrics.ReplicaGCQueueProcessingNanos,
processDestroyedReplicas: true,
},
)
return rgcq
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6102,7 +6102,7 @@ func TestReplicaCorruption(t *testing.T) {
r := tc.store.LookupReplica(rkey, rkey)
r.mu.Lock()
defer r.mu.Unlock()
if r.mu.destroyed.Error() != pErr.GetDetail().Error() {
if r.mu.destroyed.err.Error() != pErr.GetDetail().Error() {
t.Fatalf("expected r.mu.destroyed == pErr.GetDetail(), instead %q != %q", r.mu.destroyed, pErr.GetDetail())
}

Expand All @@ -6111,7 +6111,7 @@ func TestReplicaCorruption(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if r.mu.destroyed.Error() != pErr.GetDetail().Error() {
if r.mu.destroyed.err.Error() != pErr.GetDetail().Error() {
t.Fatalf("expected r.mu.destroyed == pErr.GetDetail(), instead %q != %q", r.mu.destroyed, pErr.GetDetail())
}

Expand Down
31 changes: 19 additions & 12 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (rs *storeReplicaVisitor) Visit(visitor func(*Replica) bool) {
destroyed := repl.mu.destroyed
initialized := repl.isInitializedRLocked()
repl.mu.RUnlock()
if initialized && destroyed == nil && !visitor(repl) {
if initialized && (destroyed.IsAlive() || destroyed.reason == destroyReasonRemovalPending) && !visitor(repl) {
break
}
}
Expand Down Expand Up @@ -2155,7 +2155,6 @@ func (s *Store) RemoveReplica(
<-s.snapshotApplySem
}()
}

rep.raftMu.Lock()
defer rep.raftMu.Unlock()
return s.removeReplicaImpl(ctx, rep, consistentDesc, destroy)
Expand Down Expand Up @@ -2215,7 +2214,8 @@ func (s *Store) removeReplicaImpl(
rep.mu.Lock()
rep.cancelPendingCommandsLocked()
rep.mu.internalRaftGroup = nil
rep.mu.destroyed = roachpb.NewRangeNotFoundError(rep.RangeID)
rep.mu.destroyed.err = roachpb.NewRangeNotFoundError(rep.RangeID)
rep.mu.destroyed.reason = destroyReasonRemoved
rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()

Expand All @@ -2239,6 +2239,7 @@ func (s *Store) removeReplicaImpl(
// TODO(peter): Could release s.mu.Lock() here.
s.maybeGossipOnCapacityChange(ctx, rangeChangeEvent)
s.scanner.RemoveReplica(rep)

return nil
}

Expand Down Expand Up @@ -2390,7 +2391,7 @@ func (s *Store) deadReplicas() roachpb.StoreDeadReplicas {
s.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool {
r := (*Replica)(v)
r.mu.RLock()
corrupted := r.mu.corrupted
corrupted := r.mu.destroyed.reason == destroyReasonCorrupted
desc := r.mu.state.Desc
r.mu.RUnlock()
replicaDesc, ok := desc.GetReplicaDescriptor(s.Ident.StoreID)
Expand Down Expand Up @@ -2576,7 +2577,6 @@ func (s *Store) Send(
if br, pErr = s.maybeWaitInPushTxnQueue(ctx, &ba, repl); br != nil || pErr != nil {
return br, pErr
}

br, pErr = repl.Send(ctx, ba)
if pErr == nil {
return br, nil
Expand Down Expand Up @@ -2834,7 +2834,6 @@ func (s *Store) HandleSnapshot(
if header.RaftMessageRequest.ToReplica.ReplicaID == 0 {
inSnap.snapType = snapTypePreemptive
}

if err := s.processRaftSnapshotRequest(ctx, &header.RaftMessageRequest, inSnap); err != nil {
return sendSnapError(errors.Wrap(err.GoError(), "failed to apply snapshot"))
}
Expand Down Expand Up @@ -3288,6 +3287,12 @@ func (s *Store) HandleRaftResponse(ctx context.Context, resp *RaftMessageRespons
if err != nil {
log.Errorf(ctx, "unable to add to replica GC queue: %s", err)
} else if added {
repl.mu.Lock()
if repl.mu.destroyed.IsAlive() {
repl.mu.destroyed.err = roachpb.NewRangeNotFoundError(repl.RangeID)
repl.mu.destroyed.reason = destroyReasonRemovalPending
}
repl.mu.Unlock()
log.Infof(ctx, "added to replica GC queue (peer suggestion)")
}
case *roachpb.StoreNotFoundError:
Expand Down Expand Up @@ -3914,15 +3919,16 @@ func (s *Store) tryGetOrCreateReplica(

repl.raftMu.Lock()
repl.mu.RLock()
destroyed, corrupted := repl.mu.destroyed, repl.mu.corrupted
destroyed := repl.mu.destroyed
repl.mu.RUnlock()
if destroyed != nil {
if destroyed.reason == destroyReasonRemoved {
repl.raftMu.Unlock()
if corrupted {
return nil, false, destroyed
}
return nil, false, errRetry
}
if destroyed.reason == destroyReasonCorrupted {
repl.raftMu.Unlock()
return nil, false, destroyed.err
}
repl.mu.Lock()
if err := repl.setReplicaIDRaftMuLockedMuLocked(replicaID); err != nil {
repl.mu.Unlock()
Expand Down Expand Up @@ -3983,7 +3989,8 @@ func (s *Store) tryGetOrCreateReplica(
if err := repl.initRaftMuLockedReplicaMuLocked(desc, s.Clock(), replicaID); err != nil {
// Mark the replica as destroyed and remove it from the replicas maps to
// ensure nobody tries to use it
repl.mu.destroyed = errors.Wrapf(err, "%s: failed to initialize", repl)
repl.mu.destroyed.err = errors.Wrapf(err, "%s: failed to initialize", repl)
repl.mu.destroyed.reason = destroyReasonRemoved
repl.mu.Unlock()
s.mu.Lock()
s.mu.replicas.Delete(int64(rangeID))
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func TestStoreRemoveReplicaDestroy(t *testing.T) {
}

repl1.mu.Lock()
expErr := roachpb.NewError(repl1.mu.destroyed)
expErr := roachpb.NewError(repl1.mu.destroyed.err)
lease := *repl1.mu.state.Lease
repl1.mu.Unlock()

Expand Down

0 comments on commit dc61bd4

Please sign in to comment.