Skip to content

Commit

Permalink
Fix pending block/blob zero peer edge case (#13625)
Browse files Browse the repository at this point in the history
* pending broadcast err if missing blobs and 0 peers

* compute request first for len check

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
  • Loading branch information
kasey and kasey authored Feb 17, 2024
1 parent 92303b6 commit 04dd60d
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 45 deletions.
17 changes: 13 additions & 4 deletions beacon-chain/sync/pending_blocks_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ func (s *Service) hasPeer() bool {
return len(s.cfg.p2p.Peers().Connected()) > 0
}

var errNoPeersForPending = errors.New("no suitable peers to process pending block queue, delaying")

// processAndBroadcastBlock validates, processes, and broadcasts a block.
// part of the function is to request missing blobs from peers if the block contains kzg commitments.
func (s *Service) processAndBroadcastBlock(ctx context.Context, b interfaces.ReadOnlySignedBeaconBlock, blkRoot [32]byte) error {
Expand All @@ -202,10 +204,17 @@ func (s *Service) processAndBroadcastBlock(ctx context.Context, b interfaces.Rea
}
}

peers := s.getBestPeers()
peerCount := len(peers)
if peerCount > 0 {
if err := s.requestPendingBlobs(ctx, b, blkRoot, peers[rand.NewGenerator().Int()%peerCount]); err != nil {
request, err := s.pendingBlobsRequestForBlock(blkRoot, b)
if err != nil {
return err
}
if len(request) > 0 {
peers := s.getBestPeers()
peerCount := len(peers)
if peerCount == 0 {
return errors.Wrapf(errNoPeersForPending, "block root=%#x", blkRoot)
}
if err := s.sendAndSaveBlobSidecars(ctx, request, peers[rand.NewGenerator().Int()%peerCount], b); err != nil {
return err
}
}
Expand Down
62 changes: 28 additions & 34 deletions beacon-chain/sync/rpc_beacon_blocks_by_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, requests *t
}
return nil
})

for _, blk := range blks {
// Skip blocks before deneb because they have no blob.
if blk.Version() < version.Deneb {
Expand All @@ -56,11 +55,17 @@ func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, requests *t
if err != nil {
return err
}
if err := s.requestPendingBlobs(ctx, blk, blkRoot, id); err != nil {
request, err := s.pendingBlobsRequestForBlock(blkRoot, blk)
if err != nil {
return err
}
if len(request) == 0 {
continue
}
if err := s.sendAndSaveBlobSidecars(ctx, request, id, blk); err != nil {
return err
}
}

return err
}

Expand Down Expand Up @@ -128,41 +133,13 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
return nil
}

// requestPendingBlobs handles the request for pending blobs based on the given beacon block.
func (s *Service) requestPendingBlobs(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, peerID peer.ID) error {
if block.Version() < version.Deneb {
return nil // Block before deneb has no blob.
}

commitments, err := block.Block().Body().BlobKzgCommitments()
if err != nil {
return err
}

if len(commitments) == 0 {
return nil // No operation if the block has no blob commitments.
}

contextByte, err := ContextByteVersionsForValRoot(s.cfg.chain.GenesisValidatorsRoot())
if err != nil {
return err
}

request, err := s.constructPendingBlobsRequest(ctx, blockRoot, len(commitments))
if err != nil {
return err
}

return s.sendAndSaveBlobSidecars(ctx, request, contextByte, peerID, block)
}

// sendAndSaveBlobSidecars sends the blob request and saves received sidecars.
func (s *Service) sendAndSaveBlobSidecars(ctx context.Context, request types.BlobSidecarsByRootReq, contextByte ContextByteVersions, peerID peer.ID, block interfaces.ReadOnlySignedBeaconBlock) error {
func (s *Service) sendAndSaveBlobSidecars(ctx context.Context, request types.BlobSidecarsByRootReq, peerID peer.ID, block interfaces.ReadOnlySignedBeaconBlock) error {
if len(request) == 0 {
return nil
}

sidecars, err := SendBlobSidecarByRoot(ctx, s.cfg.clock, s.cfg.p2p, peerID, contextByte, &request)
sidecars, err := SendBlobSidecarByRoot(ctx, s.cfg.clock, s.cfg.p2p, peerID, s.ctxMap, &request)
if err != nil {
return err
}
Expand Down Expand Up @@ -193,8 +170,25 @@ func (s *Service) sendAndSaveBlobSidecars(ctx context.Context, request types.Blo
return nil
}

func (s *Service) pendingBlobsRequestForBlock(root [32]byte, b interfaces.ReadOnlySignedBeaconBlock) (types.BlobSidecarsByRootReq, error) {
if b.Version() < version.Deneb {
return nil, nil // Block before deneb has no blob.
}
cc, err := b.Block().Body().BlobKzgCommitments()
if err != nil {
return nil, err
}
if len(cc) == 0 {
return nil, nil
}
return s.constructPendingBlobsRequest(root, len(cc))
}

// constructPendingBlobsRequest creates a request for BlobSidecars by root, considering blobs already in DB.
func (s *Service) constructPendingBlobsRequest(ctx context.Context, root [32]byte, commitments int) (types.BlobSidecarsByRootReq, error) {
func (s *Service) constructPendingBlobsRequest(root [32]byte, commitments int) (types.BlobSidecarsByRootReq, error) {
if commitments == 0 {
return nil, nil
}
stored, err := s.cfg.blobStorage.Indices(root)
if err != nil {
return nil, err
Expand Down
19 changes: 12 additions & 7 deletions beacon-chain/sync/rpc_beacon_blocks_by_root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
leakybucket "github.com/prysmaticlabs/prysm/v5/container/leaky-bucket"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
"github.com/prysmaticlabs/prysm/v5/proto/eth/v2"
eth "github.com/prysmaticlabs/prysm/v5/proto/eth/v2"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/testing/assert"
"github.com/prysmaticlabs/prysm/v5/testing/require"
Expand Down Expand Up @@ -370,12 +370,16 @@ func TestRequestPendingBlobs(t *testing.T) {
t.Run("old block should not fail", func(t *testing.T) {
b, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlock())
require.NoError(t, err)
require.NoError(t, s.requestPendingBlobs(context.Background(), b, [32]byte{}, "test"))
request, err := s.pendingBlobsRequestForBlock([32]byte{}, b)
require.NoError(t, err)
require.NoError(t, s.sendAndSaveBlobSidecars(context.Background(), request, "test", b))
})
t.Run("empty commitment block should not fail", func(t *testing.T) {
b, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlock())
require.NoError(t, err)
require.NoError(t, s.requestPendingBlobs(context.Background(), b, [32]byte{}, "test"))
request, err := s.pendingBlobsRequestForBlock([32]byte{}, b)
require.NoError(t, err)
require.NoError(t, s.sendAndSaveBlobSidecars(context.Background(), request, "test", b))
})
t.Run("unsupported protocol", func(t *testing.T) {
p1 := p2ptest.NewTestP2P(t)
Expand Down Expand Up @@ -406,20 +410,21 @@ func TestRequestPendingBlobs(t *testing.T) {
b.Block.Body.BlobKzgCommitments = make([][]byte, 1)
b1, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
require.ErrorContains(t, "protocols not supported", s.requestPendingBlobs(context.Background(), b1, [32]byte{}, p2.PeerID()))
request, err := s.pendingBlobsRequestForBlock([32]byte{}, b1)
require.NoError(t, err)
require.ErrorContains(t, "protocols not supported", s.sendAndSaveBlobSidecars(context.Background(), request, p2.PeerID(), b1))
})
}

func TestConstructPendingBlobsRequest(t *testing.T) {
d := db.SetupDB(t)
bs := filesystem.NewEphemeralBlobStorage(t)
s := &Service{cfg: &config{beaconDB: d, blobStorage: bs}}
ctx := context.Background()

// No unknown indices.
root := [32]byte{1}
count := 3
actual, err := s.constructPendingBlobsRequest(ctx, root, count)
actual, err := s.constructPendingBlobsRequest(root, count)
require.NoError(t, err)
require.Equal(t, 3, len(actual))
for i, id := range actual {
Expand Down Expand Up @@ -449,7 +454,7 @@ func TestConstructPendingBlobsRequest(t *testing.T) {
expected := []*eth.BlobIdentifier{
{Index: 1, BlockRoot: root[:]},
}
actual, err = s.constructPendingBlobsRequest(ctx, root, count)
actual, err = s.constructPendingBlobsRequest(root, count)
require.NoError(t, err)
require.Equal(t, expected[0].Index, actual[0].Index)
require.DeepEqual(t, expected[0].BlockRoot, actual[0].BlockRoot)
Expand Down
10 changes: 10 additions & 0 deletions beacon-chain/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ type Service struct {
verifierWaiter *verification.InitializerWaiter
newBlobVerifier verification.NewBlobVerifier
availableBlocker coverage.AvailableBlocker
ctxMap ContextByteVersions
}

// NewService initializes new regular sync service.
Expand Down Expand Up @@ -295,6 +296,15 @@ func (s *Service) waitForChainStart() {
s.cfg.clock = clock
startTime := clock.GenesisTime()
log.WithField("starttime", startTime).Debug("Received state initialized event")

ctxMap, err := ContextByteVersionsForValRoot(clock.GenesisValidatorsRoot())
if err != nil {
log.WithError(err).WithField("genesis_validator_root", clock.GenesisValidatorsRoot()).
Error("sync service failed to initialize context version map")
return
}
s.ctxMap = ctxMap

// Register respective rpc handlers at state initialized event.
s.registerRPCHandlers()
// Wait for chainstart in separate routine.
Expand Down

0 comments on commit 04dd60d

Please sign in to comment.