Skip to content

Commit

Permalink
moving some blob rpc validation closer to peer read
Browse files Browse the repository at this point in the history
  • Loading branch information
kasey committed Jan 23, 2024
1 parent c9fe53b commit 3050e60
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 7 deletions.
81 changes: 74 additions & 7 deletions beacon-chain/sync/rpc_send_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,21 @@ import (
"github.com/sirupsen/logrus"
)

// ErrInvalidFetchedData is used to signal that an error occurred which should result in peer downscoring.
var ErrInvalidFetchedData = errors.New("invalid data returned from peer")

var errMaxRequestBlobSidecarsExceeded = errors.Wrap(ErrInvalidFetchedData, "peer exceeded req blob chunk tx limit")
var errBlobChunkedReadFailure = errors.New("failed to read stream of chunk-encoded blobs")
var errBlobUnmarshal = errors.New("Could not unmarshal chunk-encoded blob")
var errUnrequested = errors.New("Received BlobSidecar in response that was not requested")
var errBlobResponseOutOfBounds = errors.New("received BlobSidecar with slot outside BlobSidecarsByRangeRequest bounds")

// Any error from the following declaration block should result in peer downscoring.
var (
// ErrInvalidFetchedData is used to signal that an error occurred which should result in peer downscoring.
ErrInvalidFetchedData = errors.New("invalid data returned from peer")
ErrBlobIndexOutOfBounds = errors.Wrap(ErrInvalidFetchedData, "blob index out of range")
errMaxRequestBlobSidecarsExceeded = errors.Wrap(ErrInvalidFetchedData, "peer exceeded req blob chunk tx limit")
errChunkResponseSlotNotAsc = errors.Wrap(ErrInvalidFetchedData, "blob slot not higher than previous block root")
errChunkResponseIndexNotAsc = errors.Wrap(ErrInvalidFetchedData, "blob indices for a block must start at 0 and increase by 1")
errUnrequested = errors.Wrap(ErrInvalidFetchedData, "received BlobSidecar in response that was not requested")
errBlobResponseOutOfBounds = errors.Wrap(ErrInvalidFetchedData, "received BlobSidecar with slot outside BlobSidecarsByRangeRequest bounds")
errChunkResponseBlockMismatch = errors.Wrap(ErrInvalidFetchedData, "blob block details do not match")
)

// BeaconBlockProcessor defines a block processing function, which allows to start utilizing
// blocks even before all blocks are ready.
Expand Down Expand Up @@ -167,7 +174,8 @@ func SendBlobsByRangeRequest(ctx context.Context, tor blockchain.TemporalOracle,
if max > req.Count*fieldparams.MaxBlobsPerBlock {
max = req.Count * fieldparams.MaxBlobsPerBlock
}
return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobValidatorFromRangeReq(req), max)
blobVal := composeBlobValidations(blobValidatorFromRangeReq(req), newSequentialBlobValidator())
return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobVal, max)
}

func SendBlobSidecarByRoot(
Expand Down Expand Up @@ -198,6 +206,65 @@ func SendBlobSidecarByRoot(

type blobResponseValidation func(blocks.ROBlob) error

func composeBlobValidations(vf ...blobResponseValidation) blobResponseValidation {
return func(blob blocks.ROBlob) error {
for i := range vf {
if err := vf[i](blob); err != nil {
return err
}
}
return nil
}
}

type seqBlobValid struct {
prev *blocks.ROBlob
}

func (sbv *seqBlobValid) nextValid(blob blocks.ROBlob) error {
if blob.Index >= fieldparams.MaxBlobsPerBlock {
return ErrBlobIndexOutOfBounds
}
if sbv.prev == nil {
// The first blob we see for a block must have index 0.
if blob.Index != 0 {
return errChunkResponseIndexNotAsc
}
sbv.prev = &blob
return nil
}
if sbv.prev.Slot() == blob.Slot() {
if sbv.prev.BlockRoot() != blob.BlockRoot() {
return errors.Wrap(errChunkResponseBlockMismatch, "block roots do not match")
}
if sbv.prev.ParentRoot() != blob.ParentRoot() {
return errors.Wrap(errChunkResponseBlockMismatch, "block parent roots do not match")
}
// Blob indices in responses should be strictly monotonically incrementing.
if blob.Index != sbv.prev.Index+1 {
return errChunkResponseIndexNotAsc
}
} else {
// The first blob we see for a block must have index 0.
if blob.Index != 0 {
return errChunkResponseIndexNotAsc
}
// Blocks must be in ascending slot order.
if sbv.prev.Slot() >= blob.Slot() {
return errChunkResponseSlotNotAsc
}
}
sbv.prev = &blob
return nil
}

func newSequentialBlobValidator() blobResponseValidation {
sbv := &seqBlobValid{}
return func(blob blocks.ROBlob) error {
return sbv.nextValid(blob)
}
}

func blobValidatorFromRootReq(req *p2ptypes.BlobSidecarsByRootReq) blobResponseValidation {
blobIds := make(map[[32]byte]map[uint64]bool)
for _, sc := range *req {
Expand Down
78 changes: 78 additions & 0 deletions beacon-chain/sync/rpc_send_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
p2ptest "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
p2pTypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
Expand Down Expand Up @@ -601,3 +602,80 @@ func TestBlobValidatorFromRangeReq(t *testing.T) {
})
}
}

func TestSeqBlobValid(t *testing.T) {
one, oneBlobs := generateTestBlockWithSidecars(t, [32]byte{}, 0, 3)
r1, err := one.HashTreeRoot()
require.NoError(t, err)
_, twoBlobs := generateTestBlockWithSidecars(t, r1, 1, 3)
_, oops := generateTestBlockWithSidecars(t, r1, 1000, 4)
oops[1].SignedBlockHeader.Header.ParentRoot = bytesutil.PadTo([]byte("derp"), 32)
wrongRoot, err := blocks.NewROBlobWithRoot(oops[2].BlobSidecar, bytesutil.ToBytes32([]byte("parentderp")))
require.NoError(t, err)
oob := oops[3]
oob.Index = fieldparams.MaxBlobsPerBlock

cases := []struct {
name string
seq []blocks.ROBlob
err error
errAt int
}{
{
name: "all valid",
seq: append(append([]blocks.ROBlob{}, oneBlobs...), twoBlobs...),
},
{
name: "idx out of bounds",
seq: []blocks.ROBlob{oob},
err: ErrBlobIndexOutOfBounds,
},
{
name: "first index is not zero",
seq: []blocks.ROBlob{oneBlobs[1]},
err: errChunkResponseIndexNotAsc,
},
{
name: "index out of order, same block",
seq: []blocks.ROBlob{oneBlobs[1], oneBlobs[0]},
err: errChunkResponseIndexNotAsc,
},
{
name: "second block starts at idx 1",
seq: []blocks.ROBlob{oneBlobs[0], twoBlobs[1]},
err: errChunkResponseIndexNotAsc,
errAt: 1,
},
{
name: "slots not ascending",
seq: append(append([]blocks.ROBlob{}, twoBlobs...), oneBlobs...),
err: errChunkResponseSlotNotAsc,
errAt: len(twoBlobs),
},
{
name: "same slot, different root",
seq: []blocks.ROBlob{oops[0], wrongRoot},
err: errChunkResponseBlockMismatch,
errAt: 1,
},
{
name: "same slot, different parent root",
seq: []blocks.ROBlob{oops[0], oops[1]},
err: errChunkResponseBlockMismatch,
errAt: 1,
},
}
for _, c := range cases {
sbv := newSequentialBlobValidator()
t.Run(c.name, func(t *testing.T) {
for i := range c.seq {
err := sbv(c.seq[i])
if c.err != nil && i == c.errAt {
require.ErrorIs(t, err, c.err)
return
}
require.NoError(t, err)
}
})
}
}

0 comments on commit 3050e60

Please sign in to comment.