Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start validating blobs closer to RPC response reader #13511

Merged
merged 1 commit into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 80 additions & 7 deletions beacon-chain/sync/rpc_send_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,22 @@ import (
"github.com/sirupsen/logrus"
)

// ErrInvalidFetchedData is used to signal that an error occurred which should result in peer downscoring.
var ErrInvalidFetchedData = errors.New("invalid data returned from peer")

var errMaxRequestBlobSidecarsExceeded = errors.Wrap(ErrInvalidFetchedData, "peer exceeded req blob chunk tx limit")
var errBlobChunkedReadFailure = errors.New("failed to read stream of chunk-encoded blobs")
var errBlobUnmarshal = errors.New("Could not unmarshal chunk-encoded blob")
var errUnrequested = errors.New("Received BlobSidecar in response that was not requested")
var errBlobResponseOutOfBounds = errors.New("received BlobSidecar with slot outside BlobSidecarsByRangeRequest bounds")

// Any error from the following declaration block should result in peer downscoring.
var (
// ErrInvalidFetchedData is used to signal that an error occurred which should result in peer downscoring.
ErrInvalidFetchedData = errors.New("invalid data returned from peer")
errBlobIndexOutOfBounds = errors.Wrap(ErrInvalidFetchedData, "blob index out of range")
errMaxRequestBlobSidecarsExceeded = errors.Wrap(ErrInvalidFetchedData, "peer exceeded req blob chunk tx limit")
errChunkResponseSlotNotAsc = errors.Wrap(ErrInvalidFetchedData, "blob slot not higher than previous block root")
errChunkResponseIndexNotAsc = errors.Wrap(ErrInvalidFetchedData, "blob indices for a block must start at 0 and increase by 1")
errUnrequested = errors.Wrap(ErrInvalidFetchedData, "received BlobSidecar in response that was not requested")
errBlobResponseOutOfBounds = errors.Wrap(ErrInvalidFetchedData, "received BlobSidecar with slot outside BlobSidecarsByRangeRequest bounds")
errChunkResponseBlockMismatch = errors.Wrap(ErrInvalidFetchedData, "blob block details do not match")
errChunkResponseParentMismatch = errors.Wrap(ErrInvalidFetchedData, "parent root for response element doesn't match previous element root")
)

// BeaconBlockProcessor defines a block processing function, which allows to start utilizing
// blocks even before all blocks are ready.
Expand Down Expand Up @@ -167,7 +175,8 @@ func SendBlobsByRangeRequest(ctx context.Context, tor blockchain.TemporalOracle,
if max > req.Count*fieldparams.MaxBlobsPerBlock {
max = req.Count * fieldparams.MaxBlobsPerBlock
}
return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobValidatorFromRangeReq(req), max)
blobVal := composeBlobValidations(blobValidatorFromRangeReq(req), newSequentialBlobValidator())
return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobVal, max)
}

func SendBlobSidecarByRoot(
Expand Down Expand Up @@ -198,6 +207,70 @@ func SendBlobSidecarByRoot(

type blobResponseValidation func(blocks.ROBlob) error

func composeBlobValidations(vf ...blobResponseValidation) blobResponseValidation {
return func(blob blocks.ROBlob) error {
for i := range vf {
if err := vf[i](blob); err != nil {
return err
}
Comment on lines +213 to +215
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe over-engineering here, but could these be processed in parallel?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline, these checks are too trivial to justify parallelization

}
return nil
}
}

type seqBlobValid struct {
prev *blocks.ROBlob
}

func (sbv *seqBlobValid) nextValid(blob blocks.ROBlob) error {
if blob.Index >= fieldparams.MaxBlobsPerBlock {
return errBlobIndexOutOfBounds
}
if sbv.prev == nil {
// The first blob we see for a block must have index 0.
if blob.Index != 0 {
return errChunkResponseIndexNotAsc
}
sbv.prev = &blob
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there isn't a previous blob, we need to check that the blob index is zero.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return nil
}
if sbv.prev.Slot() == blob.Slot() {
if sbv.prev.BlockRoot() != blob.BlockRoot() {
return errors.Wrap(errChunkResponseBlockMismatch, "block roots do not match")
}
if sbv.prev.ParentRoot() != blob.ParentRoot() {
return errors.Wrap(errChunkResponseBlockMismatch, "block parent roots do not match")
}
// Blob indices in responses should be strictly monotonically incrementing.
if blob.Index != sbv.prev.Index+1 {
return errChunkResponseIndexNotAsc
}
} else {
// If the slot is adjacent we know there are no intervening blocks with missing blobs, so we can
// check that the new blob descends from the last seen.
if blob.Slot() == sbv.prev.Slot()+1 && blob.ParentRoot() != sbv.prev.BlockRoot() {
return errChunkResponseParentMismatch
}
// The first blob we see for a block must have index 0.
if blob.Index != 0 {
return errChunkResponseIndexNotAsc
}
// Blocks must be in ascending slot order.
if sbv.prev.Slot() >= blob.Slot() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to check that blob index is zero here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return errChunkResponseSlotNotAsc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this, we should check that the prev.BlockRoot() matches blob.ParentBlockRoot(). It's a cheap check that ensures it's a connected chain 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}
sbv.prev = &blob
return nil
}

func newSequentialBlobValidator() blobResponseValidation {
sbv := &seqBlobValid{}
return func(blob blocks.ROBlob) error {
return sbv.nextValid(blob)
}
}

func blobValidatorFromRootReq(req *p2ptypes.BlobSidecarsByRootReq) blobResponseValidation {
blobIds := make(map[[32]byte]map[uint64]bool)
for _, sc := range *req {
Expand Down
86 changes: 86 additions & 0 deletions beacon-chain/sync/rpc_send_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
p2ptest "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
p2pTypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
Expand Down Expand Up @@ -601,3 +602,88 @@ func TestBlobValidatorFromRangeReq(t *testing.T) {
})
}
}

func TestSeqBlobValid(t *testing.T) {
one, oneBlobs := generateTestBlockWithSidecars(t, [32]byte{}, 0, 3)
r1, err := one.Block.HashTreeRoot()
require.NoError(t, err)
two, twoBlobs := generateTestBlockWithSidecars(t, r1, 1, 3)
r2, err := two.Block.HashTreeRoot()
require.NoError(t, err)
_, oops := generateTestBlockWithSidecars(t, r2, 0, 4)
oops[1].SignedBlockHeader.Header.ParentRoot = bytesutil.PadTo([]byte("derp"), 32)
wrongRoot, err := blocks.NewROBlobWithRoot(oops[2].BlobSidecar, bytesutil.ToBytes32([]byte("parentderp")))
require.NoError(t, err)
oob := oops[3]
oob.Index = fieldparams.MaxBlobsPerBlock

cases := []struct {
name string
seq []blocks.ROBlob
err error
errAt int
}{
{
name: "all valid",
seq: append(append([]blocks.ROBlob{}, oneBlobs...), twoBlobs...),
},
{
name: "idx out of bounds",
seq: []blocks.ROBlob{oob},
err: errBlobIndexOutOfBounds,
},
{
name: "first index is not zero",
seq: []blocks.ROBlob{oneBlobs[1]},
err: errChunkResponseIndexNotAsc,
},
{
name: "index out of order, same block",
seq: []blocks.ROBlob{oneBlobs[1], oneBlobs[0]},
err: errChunkResponseIndexNotAsc,
},
{
name: "second block starts at idx 1",
seq: []blocks.ROBlob{oneBlobs[0], twoBlobs[1]},
err: errChunkResponseIndexNotAsc,
errAt: 1,
},
{
name: "slots not ascending",
seq: append(append([]blocks.ROBlob{}, twoBlobs...), oops...),
err: errChunkResponseSlotNotAsc,
errAt: len(twoBlobs),
},
{
name: "same slot, different root",
seq: []blocks.ROBlob{oops[0], wrongRoot},
err: errChunkResponseBlockMismatch,
errAt: 1,
},
{
name: "same slot, different parent root",
seq: []blocks.ROBlob{oops[0], oops[1]},
err: errChunkResponseBlockMismatch,
errAt: 1,
},
{
name: "next slot, different parent root",
seq: []blocks.ROBlob{oops[0], twoBlobs[0]},
err: errChunkResponseParentMismatch,
errAt: 1,
},
}
for _, c := range cases {
sbv := newSequentialBlobValidator()
t.Run(c.name, func(t *testing.T) {
for i := range c.seq {
err := sbv(c.seq[i])
if c.err != nil && i == c.errAt {
require.ErrorIs(t, err, c.err)
return
}
require.NoError(t, err)
}
})
}
}
Loading