diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index b163fd210aa4..cfa152da63a8 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2872,8 +2872,24 @@ func (r *Replica) sendSnapshotUsingDelegate( retErr = timeutil.RunWithTimeout( ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { // Sending snapshot - _, err := r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest) - return err + resp, err := r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest) + if err != nil { + return err + } + if resp.MsgAppResp != nil { + const mayCampaignOnWake = false + _ = r.withRaftGroup(mayCampaignOnWake, func(rn *raft.RawNode) (unquiesceAndWakeLeader bool, _ error) { + msg := *resp.MsgAppResp + // With a delegated snapshot, the recipient received the snapshot + // from another replica and will thus respond to it instead. But the + // message is valid for the actual originator of the send as well. + msg.To = rn.BasicStatus().ID + // We do want to unquiesce here - we wouldn't ever want state transitions + // on a quiesced replica. + return true, rn.Step(*resp.MsgAppResp) + }) + } + return nil }, ) if !selfDelegate { diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index c8ee59bb8885..86972baddb95 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -930,6 +930,18 @@ func (r *Replica) handleRaftReadyRaftMuLocked( if err := r.applySnapshot(ctx, inSnap, snap, hs, subsumedRepls); err != nil { return stats, errors.Wrap(err, "while applying snapshot") } + for _, msg := range msgStorageAppend.Responses { + // The caller would like to see the MsgAppResp that usually results from + // applying the snapshot synchronously, so fish it out. + if msg.To == uint64(inSnap.FromReplica.ReplicaID) && + msg.Type == raftpb.MsgAppResp && + !msg.Reject && + msg.Index == snap.Metadata.Index { + + inSnap.msgAppRespCh <- msg + break + } + } stats.tSnapEnd = timeutil.Now() stats.snap.applied = true diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index bf7d374a2c9a..4865725f64ea 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -313,7 +313,8 @@ type IncomingSnapshot struct { DataSize int64 snapType kvserverpb.SnapshotRequest_Type placeholder *ReplicaPlaceholder - raftAppliedIndex kvpb.RaftIndex // logging only + raftAppliedIndex kvpb.RaftIndex // logging only + msgAppRespCh chan raftpb.Message // receives MsgAppResp if/when snap is applied } func (s IncomingSnapshot) String() string { diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 2deb6426ca0e..0c347ba7e020 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -499,6 +499,18 @@ func (s *Store) processRaftSnapshotRequest( log.Infof(ctx, "ignored stale snapshot at index %d", snapHeader.RaftMessageRequest.Message.Snapshot.Metadata.Index) s.metrics.RangeSnapshotRecvUnusable.Inc(1) } + // If the snapshot was applied and acked with an MsgAppResp, return that + // message up the stack. We're using msgAppRespCh as a shortcut to avoid + // plumbing return parameters through an additional few layers of raft + // handling. + // + // NB: in practice there's always an MsgAppResp here, but it is better not + // to rely on what is essentially discretionary raft behavior. + select { + case msg := <-inSnap.msgAppRespCh: + msgAppResp = &msg + default: + } return nil }) if pErr != nil { diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 5ed684fe3ff4..2c4f268cd04a 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -504,6 +504,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( DataSize: dataSize, snapType: header.Type, raftAppliedIndex: header.State.RaftAppliedIndex, + msgAppRespCh: make(chan raftpb.Message, 1), } timingTag.stop("totalTime")