Skip to content

Commit

Permalink
kv: protect Replica's lastToReplica and lastFromReplica fields with r…
Browse files Browse the repository at this point in the history
…aftMu

This commit moves the Replica's lastToReplica and lastFromReplica from
under the `Replica.mu` mutex to the `Replica.raftMu` mutex. These are
strictly Raft-specific pieces of state, so we don't need fine-grained
locking around them. As a reward, we don't need to grab the `Replica.mu`
exclusively (or at all) when setting the fields in
`Store.withReplicaForRequest`.

The locking in `setLastReplicaDescriptors` showed up in a mutex profile
under a write-heavy workload. It was responsible for **3.44%** of mutex
wait time. Grabbing the mutex was probably also slowing down request
processing, as the exclusive lock acquisition had to wait for read locks
to be dropped.
  • Loading branch information
nvanbenschoten committed Jan 10, 2022
1 parent ad27839 commit 410ef29
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 68 deletions.
94 changes: 46 additions & 48 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,47 @@ type Replica struct {
stateMachine replicaStateMachine
// decoder is used to decode committed raft entries.
decoder replicaDecoder

// The last seen replica descriptors from incoming Raft messages. These are
// stored so that the replica still knows the replica descriptors for itself
// and for its message recipients in the circumstances when its RangeDescriptor
// is out of date.
//
// Normally, a replica knows about the other replica descriptors for a
// range via the RangeDescriptor stored in Replica.mu.state.Desc. But that
// descriptor is only updated during a Split or ChangeReplicas operation.
// There are periods during a Replica's lifetime when that information is
// out of date:
//
// 1. When a replica is being newly created as the result of an incoming
// Raft message for it. This is the common case for ChangeReplicas and an
// uncommon case for Splits. The leader will be sending the replica
// messages and the replica needs to be able to respond before it can
// receive an updated range descriptor (via a snapshot,
// changeReplicasTrigger, or splitTrigger).
//
// 2. If the node containing a replica is partitioned or down while the
// replicas for the range are updated. When the node comes back up, other
// replicas may begin communicating with it and it needs to be able to
// respond. Unlike 1 where there is no range descriptor, in this situation
// the replica has a range descriptor but it is out of date. Note that a
// replica being removed from a node and then quickly re-added before the
// replica has been GC'd will also use the last seen descriptors. In
// effect, this is another path for which the replica's local range
// descriptor is out of date.
//
// The last seen replica descriptors are updated on receipt of every raft
// message via Replica.setLastReplicaDescriptors (see
// Store.HandleRaftRequest). These last seen descriptors are used when
// the replica's RangeDescriptor contains missing or out of date descriptors
// for a replica (see Replica.sendRaftMessageRaftMuLocked).
//
// Removing a replica from Store.mu.replicas is not a problem because
// when a replica is completely removed, it won't be recreated until
// there is another event that will repopulate the replicas map in the
// range descriptor. When it is temporarily dropped and recreated, the
// newly recreated replica will have a complete range descriptor.
lastToReplica, lastFromReplica roachpb.ReplicaDescriptor
}

// Contains the lease history when enabled.
Expand Down Expand Up @@ -499,47 +540,6 @@ type Replica struct {
// live node will not lose leaseholdership.
lastUpdateTimes lastUpdateTimesMap

// The last seen replica descriptors from incoming Raft messages. These are
// stored so that the replica still knows the replica descriptors for itself
// and for its message recipients in the circumstances when its RangeDescriptor
// is out of date.
//
// Normally, a replica knows about the other replica descriptors for a
// range via the RangeDescriptor stored in Replica.mu.state.Desc. But that
// descriptor is only updated during a Split or ChangeReplicas operation.
// There are periods during a Replica's lifetime when that information is
// out of date:
//
// 1. When a replica is being newly created as the result of an incoming
// Raft message for it. This is the common case for ChangeReplicas and an
// uncommon case for Splits. The leader will be sending the replica
// messages and the replica needs to be able to respond before it can
// receive an updated range descriptor (via a snapshot,
// changeReplicasTrigger, or splitTrigger).
//
// 2. If the node containing a replica is partitioned or down while the
// replicas for the range are updated. When the node comes back up, other
// replicas may begin communicating with it and it needs to be able to
// respond. Unlike 1 where there is no range descriptor, in this situation
// the replica has a range descriptor but it is out of date. Note that a
// replica being removed from a node and then quickly re-added before the
// replica has been GC'd will also use the last seen descriptors. In
// effect, this is another path for which the replica's local range
// descriptor is out of date.
//
// The last seen replica descriptors are updated on receipt of every raft
// message via Replica.setLastReplicaDescriptors (see
// Store.HandleRaftRequest). These last seen descriptors are used when
// the replica's RangeDescriptor contains missing or out of date descriptors
// for a replica (see Replica.sendRaftMessage).
//
// Removing a replica from Store.mu.replicas is not a problem because
// when a replica is completely removed, it won't be recreated until
// there is another event that will repopulate the replicas map in the
// range descriptor. When it is temporarily dropped and recreated, the
// newly recreated replica will have a complete range descriptor.
lastToReplica, lastFromReplica roachpb.ReplicaDescriptor

// Computed checksum at a snapshot UUID.
checksums map[uuid.UUID]ReplicaChecksum

Expand Down Expand Up @@ -1063,13 +1063,11 @@ func (r *Replica) mergeInProgressRLocked() bool {
}

// setLastReplicaDescriptors sets the most recently seen replica
// descriptors to those contained in the *RaftMessageRequest, acquiring r.mu
// to do so.
func (r *Replica) setLastReplicaDescriptors(req *RaftMessageRequest) {
r.mu.Lock()
r.mu.lastFromReplica = req.FromReplica
r.mu.lastToReplica = req.ToReplica
r.mu.Unlock()
// descriptors to those contained in the *RaftMessageRequest.
func (r *Replica) setLastReplicaDescriptorsRaftMuLocked(req *RaftMessageRequest) {
r.raftMu.AssertHeld()
r.raftMu.lastFromReplica = req.FromReplica
r.raftMu.lastToReplica = req.ToReplica
}

// GetMVCCStats returns a copy of the MVCC stats object for this range.
Expand Down
20 changes: 10 additions & 10 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(

msgApps, otherMsgs := splitMsgApps(rd.Messages)
r.traceMessageSends(msgApps, "sending msgApp")
r.sendRaftMessages(ctx, msgApps)
r.sendRaftMessagesRaftMuLocked(ctx, msgApps)

// Use a more efficient write-only batch because we don't need to do any
// reads from the batch. Any reads are performed on the underlying DB.
Expand Down Expand Up @@ -862,7 +862,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
// Update raft log entry cache. We clear any older, uncommitted log entries
// and cache the latest ones.
r.store.raftEntryCache.Add(r.RangeID, rd.Entries, true /* truncate */)
r.sendRaftMessages(ctx, otherMsgs)
r.sendRaftMessagesRaftMuLocked(ctx, otherMsgs)
r.traceEntries(rd.CommittedEntries, "committed, before applying any entries")

applicationStart := timeutil.Now()
Expand Down Expand Up @@ -1010,7 +1010,7 @@ func (r *Replica) tick(ctx context.Context, livenessMap liveness.IsLiveMap) (boo
}

now := r.store.Clock().NowAsClockTimestamp()
if r.maybeQuiesceLocked(ctx, now, livenessMap) {
if r.maybeQuiesceRaftMuLockedReplicaMuLocked(ctx, now, livenessMap) {
return false, nil
}

Expand Down Expand Up @@ -1207,7 +1207,7 @@ func (r *Replica) maybeCoalesceHeartbeat(
return true
}

func (r *Replica) sendRaftMessages(ctx context.Context, messages []raftpb.Message) {
func (r *Replica) sendRaftMessagesRaftMuLocked(ctx context.Context, messages []raftpb.Message) {
var lastAppResp raftpb.Message
for _, message := range messages {
drop := false
Expand Down Expand Up @@ -1275,19 +1275,19 @@ func (r *Replica) sendRaftMessages(ctx context.Context, messages []raftpb.Messag
}

if !drop {
r.sendRaftMessage(ctx, message)
r.sendRaftMessageRaftMuLocked(ctx, message)
}
}
if lastAppResp.Index > 0 {
r.sendRaftMessage(ctx, lastAppResp)
r.sendRaftMessageRaftMuLocked(ctx, lastAppResp)
}
}

// sendRaftMessage sends a Raft message.
func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) {
// sendRaftMessageRaftMuLocked sends a Raft message.
func (r *Replica) sendRaftMessageRaftMuLocked(ctx context.Context, msg raftpb.Message) {
r.mu.RLock()
fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.From), r.mu.lastToReplica)
toReplica, toErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.To), r.mu.lastFromReplica)
fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.From), r.raftMu.lastToReplica)
toReplica, toErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.To), r.raftMu.lastFromReplica)
var startKey roachpb.RKey
if msg.Type == raftpb.MsgApp && r.mu.internalRaftGroup != nil {
// When the follower is potentially an uninitialized replica waiting for
Expand Down
16 changes: 8 additions & 8 deletions pkg/kv/kvserver/replica_raft_quiesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ func (r *Replica) canUnquiesceRLocked() bool {
r.mu.internalRaftGroup != nil
}

// maybeQuiesceLocked checks to see if the replica is quiescable and initiates
// quiescence if it is. Returns true if the replica has been quiesced and false
// otherwise.
// maybeQuiesceRaftMuLockedReplicaMuLocked checks to see if the replica is
// quiescable and initiates quiescence if it is. Returns true if the replica has
// been quiesced and false otherwise.
//
// A quiesced range is not ticked and thus doesn't create MsgHeartbeat requests
// or cause elections. The Raft leader for a range checks various
Expand Down Expand Up @@ -178,14 +178,14 @@ func (r *Replica) canUnquiesceRLocked() bool {
// would quiesce. The fallout from this situation are undesirable raft
// elections which will cause throughput hiccups to the range, but not
// correctness issues.
func (r *Replica) maybeQuiesceLocked(
func (r *Replica) maybeQuiesceRaftMuLockedReplicaMuLocked(
ctx context.Context, now hlc.ClockTimestamp, livenessMap liveness.IsLiveMap,
) bool {
status, lagging, ok := shouldReplicaQuiesce(ctx, r, now, livenessMap)
if !ok {
return false
}
return r.quiesceAndNotifyLocked(ctx, status, lagging)
return r.quiesceAndNotifyRaftMuLockedReplicaMuLocked(ctx, status, lagging)
}

type quiescer interface {
Expand Down Expand Up @@ -398,10 +398,10 @@ func shouldReplicaQuiesce(
return status, lagging, true
}

func (r *Replica) quiesceAndNotifyLocked(
func (r *Replica) quiesceAndNotifyRaftMuLockedReplicaMuLocked(
ctx context.Context, status *raft.Status, lagging laggingReplicaSet,
) bool {
fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(r.mu.replicaID, r.mu.lastToReplica)
fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(r.mu.replicaID, r.raftMu.lastToReplica)
if fromErr != nil {
if log.V(4) {
log.Infof(ctx, "not quiescing: cannot find from replica (%d)", r.mu.replicaID)
Expand All @@ -416,7 +416,7 @@ func (r *Replica) quiesceAndNotifyLocked(
continue
}
toReplica, toErr := r.getReplicaDescriptorByIDRLocked(
roachpb.ReplicaID(id), r.mu.lastFromReplica)
roachpb.ReplicaID(id), r.raftMu.lastFromReplica)
if toErr != nil {
if log.V(4) {
log.Infof(ctx, "failed to quiesce: cannot find to replica (%d)", id)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ Store.HandleRaftRequest (which is part of the RaftMessageHandler interface),
ultimately resulting in a call to Replica.handleRaftReadyRaftMuLocked, which
houses the integration with the etcd/raft library (raft.RawNode). This may
generate Raft messages to be sent to other Stores; these are handed to
Replica.sendRaftMessages which ultimately hands them to the Store's
Replica.sendRaftMessagesRaftMuLocked which ultimately hands them to the Store's
RaftTransport.SendAsync method. Raft uses message passing (not
request-response), and outgoing messages will use a gRPC stream that differs
from that used for incoming messages (which makes asymmetric partitions more
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (s *Store) withReplicaForRequest(
return roachpb.NewError(err)
}
defer r.raftMu.Unlock()
r.setLastReplicaDescriptors(req)
r.setLastReplicaDescriptorsRaftMuLocked(req)
return f(ctx, r)
}

Expand Down

0 comments on commit 410ef29

Please sign in to comment.