Skip to content

Commit

Permalink
kvserver: plumb MsgAppResp back to snapshot sender
Browse files Browse the repository at this point in the history
We want the initiator of a (potentially delegated) snapshot to be able to see
the MsgAppResp that is generated on the recipient of the snapshot as a result of
application. This commit does the plumbing, but the `*MsgAppResp` is always
`nil`, i.e. no actual logic was added yet.
  • Loading branch information
tbg committed Jul 20, 2023
1 parent 9d75c37 commit 378c91a
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 43 deletions.
26 changes: 15 additions & 11 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,10 @@ func (t *RaftTransport) dropFlowTokensForDisconnectedNodes() {

// SendSnapshot streams the given outgoing snapshot. The caller is responsible
// for closing the OutgoingSnapshot.
//
// The optional (but usually present) returned message is an MsgAppResp that
// results from the follower applying the snapshot, acking the log at the index
// of the snapshot.
func (t *RaftTransport) SendSnapshot(
ctx context.Context,
storePool *storepool.StorePool,
Expand All @@ -1107,17 +1111,17 @@ func (t *RaftTransport) SendSnapshot(
newWriteBatch func() storage.WriteBatch,
sent func(),
recordBytesSent snapshotRecordMetrics,
) error {
) (*kvserverpb.SnapshotResponse, error) {
nodeID := header.RaftMessageRequest.ToReplica.NodeID

conn, err := t.dialer.Dial(ctx, nodeID, rpc.DefaultClass)
if err != nil {
return err
return nil, err
}
client := NewMultiRaftClient(conn)
stream, err := client.RaftSnapshot(ctx)
if err != nil {
return err
return nil, err
}

defer func() {
Expand All @@ -1132,18 +1136,18 @@ func (t *RaftTransport) SendSnapshot(
// and determines if it encountered any errors when sending the snapshot.
func (t *RaftTransport) DelegateSnapshot(
ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest,
) error {
) (*kvserverpb.DelegateSnapshotResponse, error) {
nodeID := req.DelegatedSender.NodeID
conn, err := t.dialer.Dial(ctx, nodeID, rpc.DefaultClass)
if err != nil {
return errors.Mark(err, errMarkSnapshotError)
return nil, errors.Mark(err, errMarkSnapshotError)
}
client := NewMultiRaftClient(conn)

// Creates a rpc stream between the leaseholder and sender.
stream, err := client.DelegateRaftSnapshot(ctx)
if err != nil {
return errors.Mark(err, errMarkSnapshotError)
return nil, errors.Mark(err, errMarkSnapshotError)
}
defer func() {
if err := stream.CloseSend(); err != nil {
Expand All @@ -1154,12 +1158,12 @@ func (t *RaftTransport) DelegateSnapshot(
// Send the request.
wrappedRequest := &kvserverpb.DelegateSnapshotRequest{Value: &kvserverpb.DelegateSnapshotRequest_Send{Send: req}}
if err := stream.Send(wrappedRequest); err != nil {
return errors.Mark(err, errMarkSnapshotError)
return nil, errors.Mark(err, errMarkSnapshotError)
}
// Wait for response to see if the receiver successfully applied the snapshot.
resp, err := stream.Recv()
if err != nil {
return errors.Mark(
return nil, errors.Mark(
errors.Wrapf(err, "%v: remote failed to send snapshot", req), errMarkSnapshotError,
)
}
Expand All @@ -1175,14 +1179,14 @@ func (t *RaftTransport) DelegateSnapshot(

switch resp.Status {
case kvserverpb.DelegateSnapshotResponse_ERROR:
return errors.Mark(
return nil, errors.Mark(
errors.Wrapf(resp.Error(), "error sending couldn't accept %v", req), errMarkSnapshotError)
case kvserverpb.DelegateSnapshotResponse_APPLIED:
// This is the response we're expecting. Snapshot successfully applied.
log.VEventf(ctx, 3, "%s: delegated snapshot was successfully applied", resp)
return nil
return resp, nil
default:
return err
return nil, err
}
}

Expand Down
28 changes: 19 additions & 9 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2872,7 +2872,8 @@ func (r *Replica) sendSnapshotUsingDelegate(
retErr = timeutil.RunWithTimeout(
ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error {
// Sending snapshot
return r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest)
_, err := r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest)
return err
},
)
if !selfDelegate {
Expand Down Expand Up @@ -3053,7 +3054,7 @@ func (r *Replica) followerSendSnapshot(
ctx context.Context,
recipient roachpb.ReplicaDescriptor,
req *kvserverpb.DelegateSendSnapshotRequest,
) error {
) (*raftpb.Message, error) {
ctx = r.AnnotateCtx(ctx)
sendThreshold := traceSnapshotThreshold.Get(&r.ClusterSettings().SV)
if sendThreshold > 0 {
Expand Down Expand Up @@ -3082,28 +3083,28 @@ func (r *Replica) followerSendSnapshot(
// expensive to send.
err := r.validateSnapshotDelegationRequest(ctx, req)
if err != nil {
return err
return nil, err
}

// Throttle snapshot sending. Obtain the send semaphore and determine the rate limit.
rangeSize := r.GetMVCCStats().Total()
cleanup, err := r.store.reserveSendSnapshot(ctx, req, rangeSize)
if err != nil {
return errors.Wrap(err, "Unable to reserve space for sending this snapshot")
return nil, errors.Wrap(err, "Unable to reserve space for sending this snapshot")
}
defer cleanup()

// Check validity again, it is possible that the pending request should not be
// sent after we are doing waiting.
err = r.validateSnapshotDelegationRequest(ctx, req)
if err != nil {
return err
return nil, err
}

snapType := req.Type
snap, err := r.GetSnapshot(ctx, snapType, req.SnapId)
if err != nil {
return errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType)
return nil, errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType)
}
defer snap.Close()
log.Event(ctx, "generated snapshot")
Expand Down Expand Up @@ -3174,9 +3175,10 @@ func (r *Replica) followerSendSnapshot(
}
}

return timeutil.RunWithTimeout(
var msgAppResp *raftpb.Message
if err := timeutil.RunWithTimeout(
ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error {
return r.store.cfg.Transport.SendSnapshot(
resp, err := r.store.cfg.Transport.SendSnapshot(
ctx,
r.store.cfg.StorePool,
header,
Expand All @@ -3185,8 +3187,16 @@ func (r *Replica) followerSendSnapshot(
sent,
recordBytesSent,
)
if err != nil {
return err
}
msgAppResp = resp.MsgAppResp
return nil
},
)
); err != nil {
return nil, err
}
return msgAppResp, nil
}

// replicasCollocated is used in AdminMerge to ensure that the ranges are
Expand Down
13 changes: 10 additions & 3 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ func (s *Store) HandleDelegatedSnapshot(
}

// Pass the request to the sender replica.
if err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req); err != nil {
msgAppResp, err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req)
if err != nil {
// If an error occurred during snapshot sending, send an error response.
return &kvserverpb.DelegateSnapshotResponse{
Status: kvserverpb.DelegateSnapshotResponse_ERROR,
Expand All @@ -195,6 +196,7 @@ func (s *Store) HandleDelegatedSnapshot(
return &kvserverpb.DelegateSnapshotResponse{
Status: kvserverpb.DelegateSnapshotResponse_APPLIED,
CollectedSpans: sp.GetConfiguredRecording(),
MsgAppResp: msgAppResp,
}
}

Expand Down Expand Up @@ -426,8 +428,9 @@ func (s *Store) processRaftRequestWithReplica(
// will have been removed.
func (s *Store) processRaftSnapshotRequest(
ctx context.Context, snapHeader *kvserverpb.SnapshotRequest_Header, inSnap IncomingSnapshot,
) *kvpb.Error {
return s.withReplicaForRequest(ctx, &snapHeader.RaftMessageRequest, func(
) (*raftpb.Message, *kvpb.Error) {
var msgAppResp *raftpb.Message
pErr := s.withReplicaForRequest(ctx, &snapHeader.RaftMessageRequest, func(
ctx context.Context, r *Replica,
) (pErr *kvpb.Error) {
ctx = r.AnnotateCtx(ctx)
Expand Down Expand Up @@ -498,6 +501,10 @@ func (s *Store) processRaftSnapshotRequest(
}
return nil
})
if pErr != nil {
return nil, pErr
}
return msgAppResp, nil
}

// HandleRaftResponse implements the IncomingRaftMessageHandler interface. Per
Expand Down
37 changes: 21 additions & 16 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,8 @@ func (s *Store) receiveSnapshot(
// already received the entire snapshot here, so there's no point in
// abandoning application half-way through if the caller goes away.
applyCtx := s.AnnotateCtx(context.Background())
if pErr := s.processRaftSnapshotRequest(applyCtx, header, inSnap); pErr != nil {
msgAppResp, pErr := s.processRaftSnapshotRequest(applyCtx, header, inSnap)
if pErr != nil {
err := pErr.GoError()
// We mark this error as a snapshot error which will be interpreted by the
// sender as this being a retriable error, see isSnapshotError().
Expand All @@ -1151,6 +1152,7 @@ func (s *Store) receiveSnapshot(
return stream.Send(&kvserverpb.SnapshotResponse{
Status: kvserverpb.SnapshotResponse_APPLIED,
CollectedSpans: tracing.SpanFromContext(ctx).GetConfiguredRecording(),
MsgAppResp: msgAppResp,
})
}

Expand Down Expand Up @@ -1482,7 +1484,7 @@ func SendEmptySnapshot(
}
}()

return sendSnapshot(
if _, err := sendSnapshot(
ctx,
st,
tracer,
Expand All @@ -1493,7 +1495,10 @@ func SendEmptySnapshot(
eng.NewWriteBatch,
func() {},
nil, /* recordBytesSent */
)
); err != nil {
return err
}
return nil
}

// noopStorePool is a hollowed out StorePool that does not throttle. It's used in recovery scenarios.
Expand All @@ -1513,7 +1518,7 @@ func sendSnapshot(
newWriteBatch func() storage.WriteBatch,
sent func(),
recordBytesSent snapshotRecordMetrics,
) error {
) (*kvserverpb.SnapshotResponse, error) {
if recordBytesSent == nil {
// NB: Some tests and an offline tool (ResetQuorum) call into `sendSnapshotUsingDelegate`
// with a nil metrics tracking function. We pass in a fake metrics tracking function here that isn't
Expand All @@ -1526,7 +1531,7 @@ func sendSnapshot(
start := timeutil.Now()
to := header.RaftMessageRequest.ToReplica
if err := stream.Send(&kvserverpb.SnapshotRequest{Header: &header}); err != nil {
return err
return nil, err
}
log.Event(ctx, "sent SNAPSHOT_REQUEST message to server")
// Wait until we get a response from the server. The recipient may queue us
Expand All @@ -1536,21 +1541,21 @@ func sendSnapshot(
resp, err := stream.Recv()
if err != nil {
storePool.Throttle(storepool.ThrottleFailed, err.Error(), to.StoreID)
return err
return nil, err
}
switch resp.Status {
case kvserverpb.SnapshotResponse_ERROR:
sp.ImportRemoteRecording(resp.CollectedSpans)
storePool.Throttle(storepool.ThrottleFailed, resp.DeprecatedMessage, to.StoreID)
return errors.Wrapf(maybeHandleDeprecatedSnapErr(resp.Error()), "%s: remote couldn't accept %s", to, snap)
return nil, errors.Wrapf(maybeHandleDeprecatedSnapErr(resp.Error()), "%s: remote couldn't accept %s", to, snap)
case kvserverpb.SnapshotResponse_ACCEPTED:
// This is the response we're expecting. Continue with snapshot sending.
log.Event(ctx, "received SnapshotResponse_ACCEPTED message from server")
default:
err := errors.Errorf("%s: server sent an invalid status while negotiating %s: %s",
to, snap, resp.Status)
storePool.Throttle(storepool.ThrottleFailed, err.Error(), to.StoreID)
return err
return nil, err
}

durQueued := timeutil.Since(start)
Expand Down Expand Up @@ -1586,7 +1591,7 @@ func sendSnapshot(
// Record timings for snapshot send if kv.trace.snapshot.enable_threshold is enabled
numBytesSent, err := ss.Send(ctx, stream, header, snap, recordBytesSent)
if err != nil {
return err
return nil, err
}
durSent := timeutil.Since(start)

Expand All @@ -1595,7 +1600,7 @@ func sendSnapshot(
// applied.
sent()
if err := stream.Send(&kvserverpb.SnapshotRequest{Final: true}); err != nil {
return err
return nil, err
}
log.KvDistribution.Infof(
ctx,
Expand All @@ -1612,27 +1617,27 @@ func sendSnapshot(

resp, err = stream.Recv()
if err != nil {
return errors.Wrapf(err, "%s: remote failed to apply snapshot", to)
return nil, errors.Wrapf(err, "%s: remote failed to apply snapshot", to)
}
sp.ImportRemoteRecording(resp.CollectedSpans)
// NB: wait for EOF which ensures that all processing on the server side has
// completed (such as defers that might be run after the previous message was
// received).
if unexpectedResp, err := stream.Recv(); err != io.EOF {
if err != nil {
return errors.Wrapf(err, "%s: expected EOF, got resp=%v with error", to, unexpectedResp)
return nil, errors.Wrapf(err, "%s: expected EOF, got resp=%v with error", to, unexpectedResp)
}
return errors.Newf("%s: expected EOF, got resp=%v", to, unexpectedResp)
return nil, errors.Newf("%s: expected EOF, got resp=%v", to, unexpectedResp)
}
switch resp.Status {
case kvserverpb.SnapshotResponse_ERROR:
return errors.Wrapf(
return nil, errors.Wrapf(
maybeHandleDeprecatedSnapErr(resp.Error()), "%s: remote failed to apply snapshot", to,
)
case kvserverpb.SnapshotResponse_APPLIED:
return nil
return resp, nil
default:
return errors.Errorf("%s: server sent an invalid status during finalization: %s",
return nil, errors.Errorf("%s: server sent an invalid status during finalization: %s",
to, resp.Status,
)
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3048,13 +3048,14 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) {
require.NoError(t, err)
}

require.NoError(t, s.processRaftSnapshotRequest(ctx, req,
_, pErr := s.processRaftSnapshotRequest(ctx, req,
IncomingSnapshot{
SnapUUID: uuid.MakeV4(),
Desc: desc,
placeholder: placeholder,
},
).GoError())
)
require.NoError(t, pErr.GoError())

testutils.SucceedsSoon(t, func() error {
s.mu.Lock()
Expand Down Expand Up @@ -3127,7 +3128,7 @@ func TestSendSnapshotThrottling(t *testing.T) {
sp := &fakeStorePool{}
expectedErr := errors.New("")
c := fakeSnapshotStream{nil, expectedErr}
err := sendSnapshot(
_, err := sendSnapshot(
ctx, st, tr, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* recordBytesSent */
)
if sp.failedThrottles != 1 {
Expand All @@ -3146,7 +3147,7 @@ func TestSendSnapshotThrottling(t *testing.T) {
EncodedError: errors.EncodeError(ctx, errors.New("boom")),
}
c := fakeSnapshotStream{resp, nil}
err := sendSnapshot(
_, err := sendSnapshot(
ctx, st, tr, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* recordBytesSent */
)
if sp.failedThrottles != 1 {
Expand Down

0 comments on commit 378c91a

Please sign in to comment.