Skip to content

Commit

Permalink
kvserver: incorporate remote tracing spans from snapshots
Browse files Browse the repository at this point in the history
This adds collected tracing spans into a `SnapshotResponse` object in
order to incorporate remote traces from the receiver side of a snapshot
into the client's (i.e. the sender's) context.

Release justification: Low-risk observability change.
Release note: None
  • Loading branch information
AlexTalks committed Aug 24, 2022
1 parent 2cdb9e2 commit 09a1d46
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 17 deletions.
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/kvserverpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ message SnapshotResponse {
Status status = 1;
string message = 2;
reserved 3;

// Traces from snapshot processing, returned on status APPLIED or ERROR.
repeated util.tracing.tracingpb.RecordedSpan collected_spans = 4 [(gogoproto.nullable) = false];
}

// DelegateSnapshotRequest is the request used to delegate send snapshot requests.
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/storage_services.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ import "kv/kvserver/api.proto";

service MultiRaft {
rpc RaftMessageBatch (stream cockroach.kv.kvserver.kvserverpb.RaftMessageRequestBatch) returns (stream cockroach.kv.kvserver.kvserverpb.RaftMessageResponse) {}
// RaftSnapshot asks the server to accept and apply a range snapshot.
// The client is expected to initially send a message consisting solely of
// a Header, upon which the server will respond with a message with status
// ACCEPTED, or ERROR if it cannot accept the snapshot. Once accepted, the
// client will send multiple messages with KVBatch data followed by a
// terminal message with the final flag set to true. Once finalized,
// the server will ultimately send a message back with status APPLIED, or
// ERROR, including any collected traces from processing.
rpc RaftSnapshot (stream cockroach.kv.kvserver.kvserverpb.SnapshotRequest) returns (stream cockroach.kv.kvserver.kvserverpb.SnapshotResponse) {}
// DelegateRaftSnapshot asks the server to send a range snapshot to a target
// (so the client delegates the sending of the snapshot to the server). The
Expand Down
45 changes: 28 additions & 17 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
Expand Down Expand Up @@ -906,6 +907,8 @@ func (s *Store) checkSnapshotOverlapLocked(
func (s *Store) receiveSnapshot(
ctx context.Context, header *kvserverpb.SnapshotRequest_Header, stream incomingSnapshotStream,
) error {
sp := tracing.SpanFromContext(ctx)

// Draining nodes will generally not be rebalanced to (see the filtering that
// happens in getStoreListFromIDsLocked()), but in case they are, they should
// reject the incoming rebalancing snapshots.
Expand Down Expand Up @@ -1028,29 +1031,43 @@ func (s *Store) receiveSnapshot(
s.metrics.RangeSnapshotUnknownRcvdBytes.Inc(inc)
}
}
ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "receive snapshot data")
ctx, rSp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "receive snapshot data")
defer rSp.Finish() // Ensure that the tracing span is closed, even if ss.Receive errors
inSnap, err := ss.Receive(ctx, stream, *header, recordBytesReceived)
sp.Finish() // Ensure that the tracing span is closed, even if ss.Receive errors
if err != nil {
return err
}
inSnap.placeholder = placeholder

rec := sp.GetConfiguredRecording()

// Use a background context for applying the snapshot, as handleRaftReady is
// not prepared to deal with arbitrary context cancellation. Also, we've
// already received the entire snapshot here, so there's no point in
// abandoning application half-way through if the caller goes away.
applyCtx := s.AnnotateCtx(context.Background())
if err := s.processRaftSnapshotRequest(applyCtx, header, inSnap); err != nil {
return sendSnapshotError(stream, errors.Wrap(err.GoError(), "failed to apply snapshot"))
return sendSnapshotErrorWithTrace(stream,
errors.Wrap(err.GoError(), "failed to apply snapshot"), rec,
)
}
return stream.Send(&kvserverpb.SnapshotResponse{Status: kvserverpb.SnapshotResponse_APPLIED})
return stream.Send(&kvserverpb.SnapshotResponse{
Status: kvserverpb.SnapshotResponse_APPLIED,
CollectedSpans: rec,
})
}

func sendSnapshotError(stream incomingSnapshotStream, err error) error {
return sendSnapshotErrorWithTrace(stream, err, nil /* trace */)
}

func sendSnapshotErrorWithTrace(
stream incomingSnapshotStream, err error, trace tracingpb.Recording,
) error {
return stream.Send(&kvserverpb.SnapshotResponse{
Status: kvserverpb.SnapshotResponse_ERROR,
Message: err.Error(),
Status: kvserverpb.SnapshotResponse_ERROR,
Message: err.Error(),
CollectedSpans: trace,
})
}

Expand Down Expand Up @@ -1449,6 +1466,7 @@ func sendSnapshot(
}
switch resp.Status {
case kvserverpb.SnapshotResponse_ERROR:
sp.ImportRemoteRecording(resp.CollectedSpans)
storePool.Throttle(storepool.ThrottleFailed, resp.Message, to.StoreID)
return errors.Errorf("%s: remote couldn't accept %s with error: %s",
to, snap, resp.Message)
Expand Down Expand Up @@ -1526,6 +1544,7 @@ func sendSnapshot(
if err != nil {
return errors.Wrapf(err, "%s: remote failed to apply snapshot", to)
}
sp.ImportRemoteRecording(resp.CollectedSpans)
// NB: wait for EOF which ensures that all processing on the server side has
// completed (such as defers that might be run after the previous message was
// received).
Expand Down Expand Up @@ -1601,17 +1620,9 @@ func delegateSnapshot(
unexpectedResp,
)
}
// Import the remotely collected spans, if any.
if len(resp.CollectedSpans) != 0 {
span := tracing.SpanFromContext(ctx)
if span == nil {
log.Warningf(
ctx,
"trying to ingest remote spans but there is no recording span set up",
)
} else {
span.ImportRemoteRecording(resp.CollectedSpans)
}
sp := tracing.SpanFromContext(ctx)
if sp != nil {
sp.ImportRemoteRecording(resp.CollectedSpans)
}
switch resp.SnapResponse.Status {
case kvserverpb.SnapshotResponse_ERROR:
Expand Down

0 comments on commit 09a1d46

Please sign in to comment.