Skip to content

Commit

Permalink
Merge #39158
Browse files Browse the repository at this point in the history
39158: storage: clean up below-Raft Split and Merge locking r=bdarnell a=nvanbenschoten

This PR contains three related cleanups that I stumbled into while cleaning up raft entry application in preparation for #38954.

The first cleanup is that we avoid improperly handling errors that may be returned when acquiring the split/merge locks. If this operation fails then we can't simply return an error and not apply the command. We need to be deterministic at this point. The only option we have is to fatal. See #19448 (comment).

The second cleanup is that we then remove stale code that was attempting to recover from failed split and merge application. This error handling only made sense in a pre-proposer evaluated KV world.

The third commit addresses a TODO to assert that the RHS range during a split is not initialized. The TODO looks like it went in before proposer evaluated KV, which protects us against both replays and reproposals.

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
craig[bot] and nvanbenschoten committed Jul 30, 2019
2 parents 61376d9 + 82370ca commit 95a3d42
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 80 deletions.
2 changes: 1 addition & 1 deletion pkg/storage/cmd_app_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type cmdAppCtx struct {
proposalRetry proposalReevaluationReason
mutationCount int // number of mutations in the WriteBatch, for writeStats
// splitMergeUnlock is acquired for splits and merges.
splitMergeUnlock func(*storagepb.ReplicatedEvalResult)
splitMergeUnlock func()

// The below fields are set after the data has been written to the storage
// engine in prepareLocalResult.
Expand Down
52 changes: 18 additions & 34 deletions pkg/storage/replica_application.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,38 +286,6 @@ func (r *Replica) stageRaftCommand(
// thrown.
cmd.forcedErr = roachpb.NewError(r.requestCanProceed(roachpb.RSpan{}, ts))
}

// applyRaftCommandToBatch will return "expected" errors, but may also indicate
// replica corruption (as of now, signaled by a replicaCorruptionError).
// We feed its return through maybeSetCorrupt to act when that happens.
if cmd.forcedErr != nil {
log.VEventf(ctx, 1, "applying command with forced error: %s", cmd.forcedErr)
} else {
log.Event(ctx, "applying command")

if splitMergeUnlock, err := r.maybeAcquireSplitMergeLock(ctx, cmd.raftCmd); err != nil {
log.Eventf(ctx, "unable to acquire split lock: %s", err)
// Send a crash report because a former bug in the error handling might have
// been the root cause of #19172.
_ = r.store.stopper.RunAsyncTask(ctx, "crash report", func(ctx context.Context) {
log.SendCrashReport(
ctx,
&r.store.cfg.Settings.SV,
0, // depth
"while acquiring split lock: %s",
[]interface{}{err},
log.ReportTypeError,
)
})

cmd.forcedErr = roachpb.NewError(err)
} else if splitMergeUnlock != nil {
// Set the splitMergeUnlock on the cmdAppCtx to be called after the batch
// has been applied (see applyBatch).
cmd.splitMergeUnlock = splitMergeUnlock
}
}

if filter := r.store.cfg.TestingKnobs.TestingApplyFilter; cmd.forcedErr == nil && filter != nil {
var newPropRetry int
newPropRetry, cmd.forcedErr = filter(storagebase.ApplyFilterArgs{
Expand All @@ -330,12 +298,25 @@ func (r *Replica) stageRaftCommand(
cmd.proposalRetry = proposalReevaluationReason(newPropRetry)
}
}

if cmd.forcedErr != nil {
// Apply an empty entry.
*cmd.replicatedResult() = storagepb.ReplicatedEvalResult{}
cmd.raftCmd.WriteBatch = nil
cmd.raftCmd.LogicalOpLog = nil
log.VEventf(ctx, 1, "applying command with forced error: %s", cmd.forcedErr)
} else {
log.Event(ctx, "applying command")
}

// Acquire the split or merge lock, if necessary. If a split or merge
// command was rejected with a below-Raft forced error then its replicated
// result was just cleared and this will be a no-op.
if splitMergeUnlock, err := r.maybeAcquireSplitMergeLock(ctx, cmd.raftCmd); err != nil {
log.Fatalf(ctx, "unable to acquire split lock: %s", err)
} else if splitMergeUnlock != nil {
// Set the splitMergeUnlock on the cmdAppCtx to be called after the batch
// has been applied (see applyBatch).
cmd.splitMergeUnlock = splitMergeUnlock
}

// Update the node clock with the serviced request. This maintains
Expand All @@ -357,6 +338,9 @@ func (r *Replica) stageRaftCommand(

// Apply the Raft command to the batch's accumulated state. This may also
// have the effect of mutating cmd.replicatedResult().
// applyRaftCommandToBatch will return "expected" errors, but may also indicate
// replica corruption (as of now, signaled by a replicaCorruptionError).
// We feed its return through maybeSetCorrupt to act when that happens.
err := r.applyRaftCommandToBatch(cmd.ctx, cmd, replicaState, batch, writeAppliedState)
if err != nil {
// applyRaftCommandToBatch returned an error, which usually indicates
Expand Down Expand Up @@ -802,7 +786,7 @@ func (r *Replica) applyCmdAppBatch(
batchIsNonTrivial = true
// Deal with locking sometimes associated with complex commands.
if unlock := cmd.splitMergeUnlock; unlock != nil {
defer unlock(cmd.replicatedResult())
defer unlock()
cmd.splitMergeUnlock = nil
}
if cmd.replicatedResult().BlockReads {
Expand Down
58 changes: 13 additions & 45 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1453,13 +1453,12 @@ func (r *Replica) maybeAcquireSnapshotMergeLock(
}

// maybeAcquireSplitMergeLock examines the given raftCmd (which need
// not be evaluated yet) and acquires the split or merge lock if
// not be applied yet) and acquires the split or merge lock if
// necessary (in addition to other preparation). It returns a function
// which will release any lock acquired (or nil) and use the result of
// applying the command to perform any necessary cleanup.
// which will release any lock acquired (or nil).
func (r *Replica) maybeAcquireSplitMergeLock(
ctx context.Context, raftCmd storagepb.RaftCommand,
) (func(*storagepb.ReplicatedEvalResult), error) {
) (func(), error) {
if split := raftCmd.ReplicatedEvalResult.Split; split != nil {
return r.acquireSplitLock(ctx, &split.SplitTrigger)
} else if merge := raftCmd.ReplicatedEvalResult.Merge; merge != nil {
Expand All @@ -1470,50 +1469,21 @@ func (r *Replica) maybeAcquireSplitMergeLock(

func (r *Replica) acquireSplitLock(
ctx context.Context, split *roachpb.SplitTrigger,
) (func(*storagepb.ReplicatedEvalResult), error) {
rightRng, created, err := r.store.getOrCreateReplica(ctx, split.RightDesc.RangeID, 0, nil)
) (func(), error) {
rightRng, _, err := r.store.getOrCreateReplica(ctx, split.RightDesc.RangeID, 0, nil)
if err != nil {
return nil, err
}

// It would be nice to assert that rightRng is not initialized
// here. Unfortunately, due to reproposals and retries we might be executing
// a reproposal for a split trigger that was already executed via a
// retry. The reproposed command will not succeed (the transaction has
// already committed).
//
// TODO(peter): It might be okay to return an error here, but it is more
// conservative to hit the exact same error paths that we would hit for other
// commands that have reproposals interacting with retries (i.e. we don't
// treat splits differently).

return func(rResult *storagepb.ReplicatedEvalResult) {
if rResult.Split == nil && created && !rightRng.IsInitialized() {
// An error occurred during processing of the split and the RHS is still
// uninitialized. Mark the RHS destroyed and remove it from the replica's
// map as it is likely detritus. One reason this can occur is when
// concurrent splits on the same key are executed. Only one of the splits
// will succeed while the other will allocate a range ID, but fail to
// commit.
//
// We condition this removal on whether the RHS was newly created in
// order to be conservative. If a Raft message had created the Replica
// then presumably it was alive for some reason other than a concurrent
// split and shouldn't be destroyed.
rightRng.mu.Lock()
rightRng.mu.destroyStatus.Set(errors.Errorf("%s: failed to initialize", rightRng), destroyReasonRemoved)
rightRng.mu.Unlock()
r.store.mu.Lock()
r.store.unlinkReplicaByRangeIDLocked(rightRng.RangeID)
r.store.mu.Unlock()
}
rightRng.raftMu.Unlock()
}, nil
if rightRng.IsInitialized() {
return nil, errors.Errorf("RHS of split %s / %s already initialized before split application",
&split.LeftDesc, &split.RightDesc)
}
return rightRng.raftMu.Unlock, nil
}

func (r *Replica) acquireMergeLock(
ctx context.Context, merge *roachpb.MergeTrigger,
) (func(*storagepb.ReplicatedEvalResult), error) {
) (func(), error) {
// The merge lock is the right-hand replica's raftMu. The right-hand replica
// is required to exist on this store. Otherwise, an incoming snapshot could
// create the right-hand replica before the merge trigger has a chance to
Expand All @@ -1528,12 +1498,10 @@ func (r *Replica) acquireMergeLock(
}
rightDesc := rightRepl.Desc()
if !rightDesc.StartKey.Equal(merge.RightDesc.StartKey) || !rightDesc.EndKey.Equal(merge.RightDesc.EndKey) {
log.Fatalf(ctx, "RHS of merge %s <- %s not present on store; found %s in place of the RHS",
return nil, errors.Errorf("RHS of merge %s <- %s not present on store; found %s in place of the RHS",
&merge.LeftDesc, &merge.RightDesc, rightDesc)
}
return func(*storagepb.ReplicatedEvalResult) {
rightRepl.raftMu.Unlock()
}, nil
return rightRepl.raftMu.Unlock, nil
}

// handleTruncatedStateBelowRaft is called when a Raft command updates the truncated
Expand Down

0 comments on commit 95a3d42

Please sign in to comment.