From a1ca4e1db8652eb6532e5c898784427d412c2bac Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 22 Dec 2022 14:12:06 -0500 Subject: [PATCH] kv: remove lastToReplica and lastFromReplica from raftMu In 410ef29, we moved these fields from under the Replica.mu to under the Replica.raftMu. This was done to avoid lock contention. In this commit, we move these fields under their own mutex so that they can be accessed without holding the raftMu. This allows us to send Raft messages from other goroutines. The commit also switches from calling RawNode.ReportUnreachable directly in sendRaftMessage to using the more flexible unreachablesMu set, which defers the call to ReportUnreachable until the next Raft tick. As a result, the commit closes #84246. Release note: None Epic: None --- pkg/kv/kvserver/replica.go | 116 +++++++++++++----------- pkg/kv/kvserver/replica_raft.go | 34 +++---- pkg/kv/kvserver/replica_raft_quiesce.go | 5 +- pkg/kv/kvserver/store.go | 2 +- pkg/kv/kvserver/store_raft.go | 2 +- 5 files changed, 88 insertions(+), 71 deletions(-) diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 3dce83b117af..7bca93c7569e 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -258,47 +258,52 @@ type Replica struct { stateMachine replicaStateMachine // decoder is used to decode committed raft entries. decoder replicaDecoder + } - // The last seen replica descriptors from incoming Raft messages. These are - // stored so that the replica still knows the replica descriptors for itself - // and for its message recipients in the circumstances when its RangeDescriptor - // is out of date. - // - // Normally, a replica knows about the other replica descriptors for a - // range via the RangeDescriptor stored in Replica.mu.state.Desc. But that - // descriptor is only updated during a Split or ChangeReplicas operation. - // There are periods during a Replica's lifetime when that information is - // out of date: - // - // 1. When a replica is being newly created as the result of an incoming - // Raft message for it. This is the common case for ChangeReplicas and an - // uncommon case for Splits. The leader will be sending the replica - // messages and the replica needs to be able to respond before it can - // receive an updated range descriptor (via a snapshot, - // changeReplicasTrigger, or splitTrigger). - // - // 2. If the node containing a replica is partitioned or down while the - // replicas for the range are updated. When the node comes back up, other - // replicas may begin communicating with it and it needs to be able to - // respond. Unlike 1 where there is no range descriptor, in this situation - // the replica has a range descriptor but it is out of date. Note that a - // replica being removed from a node and then quickly re-added before the - // replica has been GC'd will also use the last seen descriptors. In - // effect, this is another path for which the replica's local range - // descriptor is out of date. - // - // The last seen replica descriptors are updated on receipt of every raft - // message via Replica.setLastReplicaDescriptors (see - // Store.HandleRaftRequest). These last seen descriptors are used when - // the replica's RangeDescriptor contains missing or out of date descriptors - // for a replica (see Replica.sendRaftMessageRaftMuLocked). - // - // Removing a replica from Store.mu.replicas is not a problem because - // when a replica is completely removed, it won't be recreated until - // there is another event that will repopulate the replicas map in the - // range descriptor. When it is temporarily dropped and recreated, the - // newly recreated replica will have a complete range descriptor. - lastToReplica, lastFromReplica roachpb.ReplicaDescriptor + // The last seen replica descriptors from incoming Raft messages. These are + // stored so that the replica still knows the replica descriptors for itself + // and for its message recipients in the circumstances when its RangeDescriptor + // is out of date. + // + // Normally, a replica knows about the other replica descriptors for a + // range via the RangeDescriptor stored in Replica.mu.state.Desc. But that + // descriptor is only updated during a Split or ChangeReplicas operation. + // There are periods during a Replica's lifetime when that information is + // out of date: + // + // 1. When a replica is being newly created as the result of an incoming + // Raft message for it. This is the common case for ChangeReplicas and an + // uncommon case for Splits. The leader will be sending the replica + // messages and the replica needs to be able to respond before it can + // receive an updated range descriptor (via a snapshot, + // changeReplicasTrigger, or splitTrigger). + // + // 2. If the node containing a replica is partitioned or down while the + // replicas for the range are updated. When the node comes back up, other + // replicas may begin communicating with it and it needs to be able to + // respond. Unlike 1 where there is no range descriptor, in this situation + // the replica has a range descriptor but it is out of date. Note that a + // replica being removed from a node and then quickly re-added before the + // replica has been GC'd will also use the last seen descriptors. In + // effect, this is another path for which the replica's local range + // descriptor is out of date. + // + // The last seen replica descriptors are updated on receipt of every raft + // message via Replica.setLastReplicaDescriptors (see + // Store.HandleRaftRequest). These last seen descriptors are used when + // the replica's RangeDescriptor contains missing or out of date descriptors + // for a replica (see Replica.sendRaftMessageRaftMuLocked). + // + // Removing a replica from Store.mu.replicas is not a problem because + // when a replica is completely removed, it won't be recreated until + // there is another event that will repopulate the replicas map in the + // range descriptor. When it is temporarily dropped and recreated, the + // newly recreated replica will have a complete range descriptor. + // + // Locking notes: Replica.raftMu < Replica.mu < Replica.lastSeenReplicas + lastSeenReplicas struct { + syncutil.Mutex + to, from roachpb.ReplicaDescriptor } // Contains the lease history when enabled. @@ -654,11 +659,8 @@ type Replica struct { // loadBasedSplitter keeps information about load-based splitting. loadBasedSplitter split.Decider - // TODO(tbg): this is effectively unused, we only use it to call ReportUnreachable - // when a heartbeat gets dropped but it's unclear whether a) that ever fires in - // practice b) if it provides any benefit. - // - // See: https://github.com/cockroachdb/cockroach/issues/84246 + // unreachablesMu contains a set of remote ReplicaIDs that are to be reported + // as unreachable on the next raft tick. unreachablesMu struct { syncutil.Mutex remotes map[roachpb.ReplicaID]struct{} @@ -1087,12 +1089,24 @@ func (r *Replica) mergeInProgressRLocked() bool { return r.mu.mergeComplete != nil } -// setLastReplicaDescriptors sets the most recently seen replica -// descriptors to those contained in the *RaftMessageRequest. -func (r *Replica) setLastReplicaDescriptorsRaftMuLocked(req *kvserverpb.RaftMessageRequest) { - r.raftMu.AssertHeld() - r.raftMu.lastFromReplica = req.FromReplica - r.raftMu.lastToReplica = req.ToReplica +// setLastReplicaDescriptors sets the most recently seen replica descriptors to +// those contained in the *RaftMessageRequest. +// See the comment on Replica.lastSeenReplicas. +func (r *Replica) setLastReplicaDescriptors(req *kvserverpb.RaftMessageRequest) { + lsr := &r.lastSeenReplicas + lsr.Lock() + defer lsr.Unlock() + lsr.to = req.ToReplica + lsr.from = req.FromReplica +} + +// getLastReplicaDescriptors gets the most recently seen replica descriptors. +// See the comment on Replica.lastSeenReplicas. +func (r *Replica) getLastReplicaDescriptors() (to, from roachpb.ReplicaDescriptor) { + lsr := &r.lastSeenReplicas + lsr.Lock() + defer lsr.Unlock() + return lsr.to, lsr.from } // GetMVCCStats returns a copy of the MVCC stats object for this range. diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index ea9ed9da0700..287425136e61 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -928,7 +928,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( msgApps, otherMsgs := splitMsgApps(rd.Messages) r.traceMessageSends(msgApps, "sending msgApp") - r.sendRaftMessagesRaftMuLocked(ctx, msgApps, pausedFollowers) + r.sendRaftMessages(ctx, msgApps, pausedFollowers) // TODO(pavelkalinnikov): find a way to move it to storeEntries. if !raft.IsEmptyHardState(rd.HardState) { @@ -977,7 +977,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.store.replicateQueue.MaybeAddAsync(ctx, r, r.store.Clock().NowAsClockTimestamp()) } - r.sendRaftMessagesRaftMuLocked(ctx, otherMsgs, nil /* blocked */) + r.sendRaftMessages(ctx, otherMsgs, nil /* blocked */) r.traceEntries(rd.CommittedEntries, "committed, before applying any entries") stats.tApplicationBegin = timeutil.Now() @@ -1414,7 +1414,7 @@ func (r *Replica) maybeCoalesceHeartbeat( return true } -func (r *Replica) sendRaftMessagesRaftMuLocked( +func (r *Replica) sendRaftMessages( ctx context.Context, messages []raftpb.Message, blocked map[roachpb.ReplicaID]struct{}, ) { var lastAppResp raftpb.Message @@ -1487,19 +1487,24 @@ func (r *Replica) sendRaftMessagesRaftMuLocked( } if !drop { - r.sendRaftMessageRaftMuLocked(ctx, message) + r.sendRaftMessage(ctx, message) } } if lastAppResp.Index > 0 { - r.sendRaftMessageRaftMuLocked(ctx, lastAppResp) + r.sendRaftMessage(ctx, lastAppResp) } } -// sendRaftMessageRaftMuLocked sends a Raft message. -func (r *Replica) sendRaftMessageRaftMuLocked(ctx context.Context, msg raftpb.Message) { +// sendRaftMessage sends a Raft message. +// +// When calling this method, the raftMu may be held, but it does not need to be. +// The Replica mu must not be held. +func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) { + lastToReplica, lastFromReplica := r.getLastReplicaDescriptors() + r.mu.RLock() - fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.From), r.raftMu.lastToReplica) - toReplica, toErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.To), r.raftMu.lastFromReplica) + fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.From), lastToReplica) + toReplica, toErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.To), lastFromReplica) var startKey roachpb.RKey if msg.Type == raftpb.MsgApp && r.mu.internalRaftGroup != nil { // When the follower is potentially an uninitialized replica waiting for @@ -1550,13 +1555,10 @@ func (r *Replica) sendRaftMessageRaftMuLocked(ctx context.Context, msg raftpb.Me RangeStartKey: startKey, // usually nil } if !r.sendRaftMessageRequest(ctx, req) { - if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { - r.mu.droppedMessages++ - raftGroup.ReportUnreachable(msg.To) - return true, nil - }); err != nil && !errors.Is(err, errRemoved) { - log.Fatalf(ctx, "%v", err) - } + r.mu.Lock() + r.mu.droppedMessages++ + r.mu.Unlock() + r.addUnreachableRemoteReplica(toReplica.ReplicaID) } } diff --git a/pkg/kv/kvserver/replica_raft_quiesce.go b/pkg/kv/kvserver/replica_raft_quiesce.go index 1fb80c595813..e2d2adccac56 100644 --- a/pkg/kv/kvserver/replica_raft_quiesce.go +++ b/pkg/kv/kvserver/replica_raft_quiesce.go @@ -421,7 +421,8 @@ func shouldReplicaQuiesce( func (r *Replica) quiesceAndNotifyRaftMuLockedReplicaMuLocked( ctx context.Context, status *raftSparseStatus, lagging laggingReplicaSet, ) bool { - fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(r.replicaID, r.raftMu.lastToReplica) + lastToReplica, lastFromReplica := r.getLastReplicaDescriptors() + fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(r.replicaID, lastToReplica) if fromErr != nil { if log.V(4) { log.Infof(ctx, "not quiescing: cannot find from replica (%d)", r.replicaID) @@ -436,7 +437,7 @@ func (r *Replica) quiesceAndNotifyRaftMuLockedReplicaMuLocked( continue } toReplica, toErr := r.getReplicaDescriptorByIDRLocked( - roachpb.ReplicaID(id), r.raftMu.lastFromReplica) + roachpb.ReplicaID(id), lastFromReplica) if toErr != nil { if log.V(4) { log.Infof(ctx, "failed to quiesce: cannot find to replica (%d)", id) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 19a8cc4262c5..2db6a844cccf 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -538,7 +538,7 @@ Store.HandleRaftRequest (which is part of the RaftMessageHandler interface), ultimately resulting in a call to Replica.handleRaftReadyRaftMuLocked, which houses the integration with the etcd/raft library (raft.RawNode). This may generate Raft messages to be sent to other Stores; these are handed to -Replica.sendRaftMessagesRaftMuLocked which ultimately hands them to the Store's +Replica.sendRaftMessages which ultimately hands them to the Store's RaftTransport.SendAsync method. Raft uses message passing (not request-response), and outgoing messages will use a gRPC stream that differs from that used for incoming messages (which makes asymmetric partitions more diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 1615a2136ab5..60d543d71685 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -337,7 +337,7 @@ func (s *Store) withReplicaForRequest( return roachpb.NewError(err) } defer r.raftMu.Unlock() - r.setLastReplicaDescriptorsRaftMuLocked(req) + r.setLastReplicaDescriptors(req) return f(ctx, r) }