diff --git a/beacon-chain/sync/rpc_send_request.go b/beacon-chain/sync/rpc_send_request.go index 8f3f6a5aea1..e48065acf87 100644 --- a/beacon-chain/sync/rpc_send_request.go +++ b/beacon-chain/sync/rpc_send_request.go @@ -208,26 +208,130 @@ func SendBlobSidecarByRoot( } func SendDataColumnSidecarByRoot( - ctx context.Context, tor blockchain.TemporalOracle, p2pApi p2p.P2P, pid peer.ID, - ctxMap ContextByteVersions, req *p2ptypes.BlobSidecarsByRootReq, -) ([]blocks.ROBlob, error) { - if uint64(len(*req)) > params.BeaconConfig().MaxRequestDataColumnSidecars { - return nil, errors.Wrapf(p2ptypes.ErrMaxDataColumnReqExceeded, "length=%d", len(*req)) + ctx context.Context, + tor blockchain.TemporalOracle, + p2pApi p2p.P2P, + pid peer.ID, + ctxMap ContextByteVersions, + req *p2ptypes.BlobSidecarsByRootReq, +) ([]blocks.RODataColumn, error) { + reqCount := uint64(len(*req)) + maxRequestDataColumnSideCar := params.BeaconConfig().MaxRequestDataColumnSidecars + + // Verify that the request count is within the maximum allowed. + if reqCount > maxRequestDataColumnSideCar { + return nil, errors.Wrapf(p2ptypes.ErrMaxDataColumnReqExceeded, "current: %d, max: %d", reqCount, maxRequestDataColumnSideCar) } + // Get the topic for the request. topic, err := p2p.TopicFromMessage(p2p.DataColumnSidecarsByRootName, slots.ToEpoch(tor.CurrentSlot())) if err != nil { - return nil, err + return nil, errors.Wrap(err, "topic from message") } + + // Send the request to the peer. log.WithField("topic", topic).Debug("Sending data column sidecar request") stream, err := p2pApi.Send(ctx, req, topic, pid) if err != nil { - return nil, err + return nil, errors.Wrap(err, "send") } + + // Close the stream when done. defer closeStream(stream, log) - maxCol := params.BeaconConfig().MaxRequestDataColumnSidecars - return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobValidatorFromRootReq(req), maxCol) + // Group data column sidecar validation by block root then by index. + requestedDataColumnSidecars := make(map[[fieldparams.RootLength]byte]map[uint64]bool) + for dataColumn := range requestedDataColumnSidecars { + requestedDataColumnSidecars[dataColumn] = make(map[uint64]bool) + } + + for _, dataColumnIdentifier := range *req { + blockRoot := bytesutil.ToBytes32(dataColumnIdentifier.BlockRoot) + requestedDataColumnSidecars[blockRoot][dataColumnIdentifier.Index] = true + } + + // Read the data column sidecars from the stream. + roDataColumns := make([]blocks.RODataColumn, 0, reqCount) + + for i := uint64(0); ; /* no stop condition */ i++ { + roDataColumn, err := readChunkedDataColumnSideCar(stream, p2pApi, ctxMap, requestedDataColumnSidecars) + + if errors.Is(err, io.EOF) { + // End of stream. + break + } + + if err != nil { + return nil, errors.Wrap(err, "read chunked data column sidecar") + } + + if i >= reqCount { + // The response MUST contain no more than `reqCount` blocks. + // (`reqCount` is already capped by `maxRequestDataColumnSideCar`.) + return nil, errors.Wrap(ErrInvalidFetchedData, "response contains more data column sidecars than requested") + } + + roDataColumns = append(roDataColumns, *roDataColumn) + } + + return roDataColumns, nil +} + +func readChunkedDataColumnSideCar( + stream network.Stream, + p2pApi p2p.P2P, + ctxMap ContextByteVersions, + requestedDataColumnSidecars map[[fieldparams.RootLength]byte]map[uint64]bool, +) (*blocks.RODataColumn, error) { + // Read the status code from the stream. + statusCode, errMessage, err := ReadStatusCode(stream, p2pApi.Encoding()) + if err != nil { + return nil, errors.Wrap(err, "read status code") + } + + if statusCode != 0 { + return nil, errors.Wrap(errBlobChunkedReadFailure, errMessage) + } + // Retrieve the fork digest. + ctxBytes, err := readContextFromStream(stream) + if err != nil { + return nil, errors.Wrap(err, "read context from stream") + } + + // Check if the fork digest is recognized. + v, ok := ctxMap[bytesutil.ToBytes4(ctxBytes)] + if !ok { + return nil, errors.Errorf("unrecognized fork digest %#x", ctxBytes) + } + + // Check if we are on debeb. + // Only deneb is supported at this time, because we lack a fork-spanning interface/union type for blobs. + if v != version.Deneb { + return nil, errors.Errorf("unexpected context bytes for deneb DataColumnSidecar, ctx=%#x, v=%v", ctxBytes, v) + } + + // Decode the data column sidecar from the stream. + dataColumnSidecar := new(ethpb.DataColumnSidecar) + if err := p2pApi.Encoding().DecodeWithMaxLength(stream, dataColumnSidecar); err != nil { + return nil, errors.Wrap(err, "failed to decode the protobuf-encoded BlobSidecar message from RPC chunk stream") + } + + // Create a read-only data column from the data column sidecar. + roDataColumn, err := blocks.NewRODataColumn(dataColumnSidecar) + if err != nil { + return nil, errors.Wrap(err, "new read only data column") + } + + // Verify that the data column sidecar is requested. + dataColumnIndex := roDataColumn.ColumnIndex + dataColumnBlockRoot := roDataColumn.BlockRoot() + + isRequested := requestedDataColumnSidecars[dataColumnBlockRoot][dataColumnIndex] + if !isRequested { + return nil, errors.Errorf("unrequested data column sidecar, blockRoot=%#x, index=%d", dataColumnBlockRoot, dataColumnIndex) + } + + return &roDataColumn, nil } // BlobResponseValidation represents a function that can validate aspects of a single unmarshaled blob