From 1b7d6edcf6df67945fbbfcaefc2e1db6257d1137 Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Fri, 17 May 2024 13:18:08 +0300 Subject: [PATCH] Request Data Columns When Fetching Pending Blocks (#14007) * Support Data Columns For By Root Requests * Revert Config Changes * Fix Panic * Fix Process Block * Fix Flags * Lint * Support Checkpoint Sync * Manu's Review * Add Support For Columns in Remaining Methods * Unmarshal Uncorrectly --- beacon-chain/blockchain/BUILD.bazel | 1 + beacon-chain/blockchain/process_block.go | 7 +- beacon-chain/p2p/BUILD.bazel | 2 + beacon-chain/p2p/custody.go | 73 ++++++++++++ beacon-chain/p2p/interfaces.go | 6 + beacon-chain/p2p/testing/fuzz_p2p.go | 8 ++ beacon-chain/p2p/testing/p2p.go | 8 ++ .../sync/initial-sync/blocks_fetcher_utils.go | 43 ++++--- beacon-chain/sync/initial-sync/service.go | 94 +++++++++++++++- beacon-chain/sync/pending_blocks_queue.go | 43 +++++-- .../sync/rpc_beacon_blocks_by_root.go | 105 ++++++++++++++++-- beacon-chain/sync/sampling_data_columns.go | 30 +---- beacon-chain/sync/validate_blob.go | 10 ++ 13 files changed, 368 insertions(+), 62 deletions(-) create mode 100644 beacon-chain/p2p/custody.go diff --git a/beacon-chain/blockchain/BUILD.bazel b/beacon-chain/blockchain/BUILD.bazel index 2a0e8c3fa05d..a47f210d5f35 100644 --- a/beacon-chain/blockchain/BUILD.bazel +++ b/beacon-chain/blockchain/BUILD.bazel @@ -69,6 +69,7 @@ go_library( "//beacon-chain/startup:go_default_library", "//beacon-chain/state:go_default_library", "//beacon-chain/state/stategen:go_default_library", + "//cmd/beacon-chain/flags:go_default_library", "//config/features:go_default_library", "//config/fieldparams:go_default_library", "//config/params:go_default_library", diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index ae30faf78056..de2c6f65b92b 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" + "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" "github.com/sirupsen/logrus" "go.opencensus.io/trace" @@ -640,8 +641,12 @@ func (s *Service) isDataAvailableDataColumns(ctx context.Context, root [32]byte, if len(kzgCommitments) == 0 { return nil } + custodiedSubnetCount := params.BeaconConfig().CustodyRequirement + if flags.Get().SubscribeToAllSubnets { + custodiedSubnetCount = params.BeaconConfig().DataColumnSidecarSubnetCount + } - colMap, err := peerdas.CustodyColumns(s.cfg.P2P.NodeID(), params.BeaconConfig().CustodyRequirement) + colMap, err := peerdas.CustodyColumns(s.cfg.P2P.NodeID(), custodiedSubnetCount) if err != nil { return err } diff --git a/beacon-chain/p2p/BUILD.bazel b/beacon-chain/p2p/BUILD.bazel index 0ac323320851..4af322cc6c3b 100644 --- a/beacon-chain/p2p/BUILD.bazel +++ b/beacon-chain/p2p/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "broadcaster.go", "config.go", "connection_gater.go", + "custody.go", "dial_relay_node.go", "discovery.go", "doc.go", @@ -81,6 +82,7 @@ go_library( "@com_github_ethereum_go_ethereum//p2p/discover:go_default_library", "@com_github_ethereum_go_ethereum//p2p/enode:go_default_library", "@com_github_ethereum_go_ethereum//p2p/enr:go_default_library", + "@com_github_ferranbt_fastssz//:go_default_library", "@com_github_holiman_uint256//:go_default_library", "@com_github_kr_pretty//:go_default_library", "@com_github_libp2p_go_libp2p//:go_default_library", diff --git a/beacon-chain/p2p/custody.go b/beacon-chain/p2p/custody.go new file mode 100644 index 000000000000..38e7739e70c9 --- /dev/null +++ b/beacon-chain/p2p/custody.go @@ -0,0 +1,73 @@ +package p2p + +import ( + ssz "github.com/ferranbt/fastssz" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" + "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" + "github.com/prysmaticlabs/prysm/v5/config/params" +) + +func (s *Service) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) { + custodiedSubnetCount := params.BeaconConfig().CustodyRequirement + if flags.Get().SubscribeToAllSubnets { + custodiedSubnetCount = params.BeaconConfig().DataColumnSidecarSubnetCount + } + custodiedColumns, err := peerdas.CustodyColumns(s.NodeID(), custodiedSubnetCount) + if err != nil { + return nil, err + } + var validPeers []peer.ID + for _, pid := range peers { + remoteCount, err := s.CustodyCountFromRemotePeer(pid) + if err != nil { + return nil, err + } + nodeId, err := ConvertPeerIDToNodeID(pid) + if err != nil { + return nil, err + } + remoteCustodiedColumns, err := peerdas.CustodyColumns(nodeId, remoteCount) + if err != nil { + return nil, err + } + invalidPeer := false + for c := range custodiedColumns { + if !remoteCustodiedColumns[c] { + invalidPeer = true + break + } + } + if invalidPeer { + continue + } + copiedId := pid + // Add valid peer to list + validPeers = append(validPeers, copiedId) + } + return validPeers, nil +} + +func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) (uint64, error) { + // Retrieve the ENR of the peer. + peerRecord, err := s.peers.ENR(pid) + if err != nil { + return 0, errors.Wrap(err, "ENR") + } + peerCustodiedSubnetCount := params.BeaconConfig().CustodyRequirement + if peerRecord != nil { + // Load the `custody_subnet_count` + custodyBytes := make([]byte, 8) + custodyObj := CustodySubnetCount(custodyBytes) + if err := peerRecord.Load(&custodyObj); err != nil { + return 0, errors.Wrap(err, "load custody_subnet_count") + } + actualCustodyCount := ssz.UnmarshallUint64(custodyBytes) + + if actualCustodyCount > peerCustodiedSubnetCount { + peerCustodiedSubnetCount = actualCustodyCount + } + } + return peerCustodiedSubnetCount, nil +} diff --git a/beacon-chain/p2p/interfaces.go b/beacon-chain/p2p/interfaces.go index 901073c535b1..a6c0b25fccba 100644 --- a/beacon-chain/p2p/interfaces.go +++ b/beacon-chain/p2p/interfaces.go @@ -29,6 +29,7 @@ type P2P interface { ConnectionHandler PeersProvider MetadataProvider + CustodyHandler } type Acceser interface { @@ -110,3 +111,8 @@ type MetadataProvider interface { Metadata() metadata.Metadata MetadataSeq() uint64 } + +type CustodyHandler interface { + CustodyCountFromRemotePeer(peer.ID) (uint64, error) + GetValidCustodyPeers([]peer.ID) ([]peer.ID, error) +} diff --git a/beacon-chain/p2p/testing/fuzz_p2p.go b/beacon-chain/p2p/testing/fuzz_p2p.go index a4ad1763f18a..bec36a02152b 100644 --- a/beacon-chain/p2p/testing/fuzz_p2p.go +++ b/beacon-chain/p2p/testing/fuzz_p2p.go @@ -183,3 +183,11 @@ func (_ *FakeP2P) InterceptSecured(network.Direction, peer.ID, network.ConnMulti func (_ *FakeP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.DisconnectReason) { return true, 0 } + +func (_ *FakeP2P) CustodyCountFromRemotePeer(peer.ID) (uint64, error) { + return 0, nil +} + +func (_ *FakeP2P) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) { + return peers, nil +} diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index 73167a6c3d98..7d6d602f3fd1 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -419,3 +419,11 @@ func (_ *TestP2P) InterceptSecured(network.Direction, peer.ID, network.ConnMulti func (_ *TestP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.DisconnectReason) { return true, 0 } + +func (_ *TestP2P) CustodyCountFromRemotePeer(peer.ID) (uint64, error) { + return 0, nil +} + +func (_ *TestP2P) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) { + return peers, nil +} diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go index dcdaf161ab94..8b4f8df101b9 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go @@ -8,6 +8,7 @@ import ( "github.com/pkg/errors" p2pTypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" + "github.com/prysmaticlabs/prysm/v5/config/features" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces" @@ -236,18 +237,18 @@ func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot Count: reqCount, Step: 1, } - blocks, err := f.requestBlocks(ctx, req, pid) + reqBlocks, err := f.requestBlocks(ctx, req, pid) if err != nil { return nil, fmt.Errorf("cannot fetch blocks: %w", err) } - if len(blocks) == 0 { + if len(reqBlocks) == 0 { return nil, errNoAlternateBlocks } // If the first block is not connected to the current canonical chain, we'll stop processing this batch. // Instead, we'll work backwards from the first block until we find a common ancestor, // and then begin processing from there. - first := blocks[0] + first := reqBlocks[0] if !f.chain.HasBlock(ctx, first.Block().ParentRoot()) { // Backtrack on a root, to find a common ancestor from which we can resume syncing. fork, err := f.findAncestor(ctx, pid, first) @@ -260,8 +261,8 @@ func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot // Traverse blocks, and if we've got one that doesn't have parent in DB, backtrack on it. // Note that we start from the second element in the array, because we know that the first element is in the db, // otherwise we would have gone into the findAncestor early return path above. - for i := 1; i < len(blocks); i++ { - block := blocks[i] + for i := 1; i < len(reqBlocks); i++ { + block := reqBlocks[i] parentRoot := block.Block().ParentRoot() // Step through blocks until we find one that is not in the chain. The goal is to find the point where the // chain observed in the peer diverges from the locally known chain, and then collect up the remainder of the @@ -274,16 +275,25 @@ func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot "slot": block.Block().Slot(), "root": fmt.Sprintf("%#x", parentRoot), }).Debug("Block with unknown parent root has been found") - altBlocks, err := sortedBlockWithVerifiedBlobSlice(blocks[i-1:]) + altBlocks, err := sortedBlockWithVerifiedBlobSlice(reqBlocks[i-1:]) if err != nil { return nil, errors.Wrap(err, "invalid blocks received in findForkWithPeer") } + var bwb []blocks.BlockWithROBlobs + if features.Get().EnablePeerDAS { + bwb, err = f.fetchColumnsFromPeer(ctx, altBlocks, pid, []peer.ID{pid}) + if err != nil { + return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findForkWithPeer") + } + } else { + bwb, err = f.fetchBlobsFromPeer(ctx, altBlocks, pid, []peer.ID{pid}) + if err != nil { + return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findForkWithPeer") + } + } // We need to fetch the blobs for the given alt-chain if any exist, so that we can try to verify and import // the blocks. - bwb, err := f.fetchBlobsFromPeer(ctx, altBlocks, pid, []peer.ID{pid}) - if err != nil { - return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findForkWithPeer") - } + // The caller will use the BlocksWith VerifiedBlobs in bwb as the starting point for // round-robin syncing the alternate chain. return &forkData{peer: pid, bwb: bwb}, nil @@ -302,9 +312,16 @@ func (f *blocksFetcher) findAncestor(ctx context.Context, pid peer.ID, b interfa if err != nil { return nil, errors.Wrap(err, "received invalid blocks in findAncestor") } - bwb, err = f.fetchBlobsFromPeer(ctx, bwb, pid, []peer.ID{pid}) - if err != nil { - return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findAncestor") + if features.Get().EnablePeerDAS { + bwb, err = f.fetchColumnsFromPeer(ctx, bwb, pid, []peer.ID{pid}) + if err != nil { + return nil, errors.Wrap(err, "unable to retrieve columns for blocks found in findAncestor") + } + } else { + bwb, err = f.fetchBlobsFromPeer(ctx, bwb, pid, []peer.ID{pid}) + if err != nil { + return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findAncestor") + } } return &forkData{ peer: pid, diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index e79b22a07720..306d4f0902a6 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -24,6 +24,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync" "github.com/prysmaticlabs/prysm/v5/beacon-chain/verification" "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" + "github.com/prysmaticlabs/prysm/v5/config/features" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v5/crypto/rand" @@ -184,9 +185,16 @@ func (s *Service) Start() { log.WithError(err).Error("Error waiting for minimum number of peers") return } - if err := s.fetchOriginBlobs(peers); err != nil { - log.WithError(err).Error("Failed to fetch missing blobs for checkpoint origin") - return + if features.Get().EnablePeerDAS { + if err := s.fetchOriginColumns(peers); err != nil { + log.WithError(err).Error("Failed to fetch missing columns for checkpoint origin") + return + } + } else { + if err := s.fetchOriginBlobs(peers); err != nil { + log.WithError(err).Error("Failed to fetch missing blobs for checkpoint origin") + return + } } if err := s.roundRobinSync(gt); err != nil { if errors.Is(s.ctx.Err(), context.Canceled) { @@ -306,6 +314,33 @@ func missingBlobRequest(blk blocks.ROBlock, store *filesystem.BlobStorage) (p2pt return req, nil } +func missingColumnRequest(blk blocks.ROBlock, store *filesystem.BlobStorage) (p2ptypes.BlobSidecarsByRootReq, error) { + r := blk.Root() + if blk.Version() < version.Deneb { + return nil, nil + } + cmts, err := blk.Block().Body().BlobKzgCommitments() + if err != nil { + log.WithField("root", r).Error("Error reading commitments from checkpoint sync origin block") + return nil, err + } + if len(cmts) == 0 { + return nil, nil + } + onDisk, err := store.ColumnIndices(r) + if err != nil { + return nil, errors.Wrapf(err, "error checking existing blobs for checkpoint sync block root %#x", r) + } + req := make(p2ptypes.BlobSidecarsByRootReq, 0, len(cmts)) + for i := range cmts { + if onDisk[i] { + continue + } + req = append(req, ð.BlobIdentifier{BlockRoot: r[:], Index: uint64(i)}) + } + return req, nil +} + func (s *Service) fetchOriginBlobs(pids []peer.ID) error { r, err := s.cfg.DB.OriginCheckpointBlockRoot(s.ctx) if errors.Is(err, db.ErrNotFoundOriginBlockRoot) { @@ -356,6 +391,59 @@ func (s *Service) fetchOriginBlobs(pids []peer.ID) error { return fmt.Errorf("no connected peer able to provide blobs for checkpoint sync block %#x", r) } +func (s *Service) fetchOriginColumns(pids []peer.ID) error { + r, err := s.cfg.DB.OriginCheckpointBlockRoot(s.ctx) + if errors.Is(err, db.ErrNotFoundOriginBlockRoot) { + return nil + } + blk, err := s.cfg.DB.Block(s.ctx, r) + if err != nil { + log.WithField("root", fmt.Sprintf("%#x", r)).Error("Block for checkpoint sync origin root not found in db") + return err + } + if !params.WithinDAPeriod(slots.ToEpoch(blk.Block().Slot()), slots.ToEpoch(s.clock.CurrentSlot())) { + return nil + } + rob, err := blocks.NewROBlockWithRoot(blk, r) + if err != nil { + return err + } + req, err := missingColumnRequest(rob, s.cfg.BlobStorage) + if err != nil { + return err + } + if len(req) == 0 { + log.WithField("root", fmt.Sprintf("%#x", r)).Debug("All columns for checkpoint block are present") + return nil + } + shufflePeers(pids) + pids, err = s.cfg.P2P.GetValidCustodyPeers(pids) + if err != nil { + return err + } + for i := range pids { + sidecars, err := sync.SendDataColumnSidecarByRoot(s.ctx, s.clock, s.cfg.P2P, pids[i], s.ctxMap, &req) + if err != nil { + continue + } + if len(sidecars) != len(req) { + continue + } + avs := das.NewLazilyPersistentStoreColumn(s.cfg.BlobStorage, emptyVerifier{}, s.cfg.P2P.NodeID()) + current := s.clock.CurrentSlot() + if err := avs.PersistColumns(current, sidecars...); err != nil { + return err + } + if err := avs.IsDataAvailable(s.ctx, current, rob); err != nil { + log.WithField("root", fmt.Sprintf("%#x", r)).WithField("peerID", pids[i]).Warn("Columns from peer for origin block were unusable") + continue + } + log.WithField("nColumns", len(sidecars)).WithField("root", fmt.Sprintf("%#x", r)).Info("Successfully downloaded blobs for checkpoint sync block") + return nil + } + return fmt.Errorf("no connected peer able to provide columns for checkpoint sync block %#x", r) +} + func shufflePeers(pids []peer.ID) { rg := rand.NewGenerator() rg.Shuffle(len(pids), func(i, j int) { diff --git a/beacon-chain/sync/pending_blocks_queue.go b/beacon-chain/sync/pending_blocks_queue.go index 698ee4572654..bd303480c4ec 100644 --- a/beacon-chain/sync/pending_blocks_queue.go +++ b/beacon-chain/sync/pending_blocks_queue.go @@ -13,6 +13,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/async" "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain" p2ptypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" + "github.com/prysmaticlabs/prysm/v5/config/features" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces" @@ -203,20 +204,40 @@ func (s *Service) processAndBroadcastBlock(ctx context.Context, b interfaces.Rea return err } } - - request, err := s.pendingBlobsRequestForBlock(blkRoot, b) - if err != nil { - return err - } - if len(request) > 0 { - peers := s.getBestPeers() - peerCount := len(peers) - if peerCount == 0 { - return errors.Wrapf(errNoPeersForPending, "block root=%#x", blkRoot) + if features.Get().EnablePeerDAS { + request, err := s.pendingDataColumnRequestForBlock(blkRoot, b) + if err != nil { + return err + } + if len(request) > 0 { + peers := s.getBestPeers() + peers, err = s.cfg.p2p.GetValidCustodyPeers(peers) + if err != nil { + return err + } + peerCount := len(peers) + if peerCount == 0 { + return errors.Wrapf(errNoPeersForPending, "block root=%#x", blkRoot) + } + if err := s.sendAndSaveDataColumnSidecars(ctx, request, peers[rand.NewGenerator().Int()%peerCount], b); err != nil { + return err + } } - if err := s.sendAndSaveBlobSidecars(ctx, request, peers[rand.NewGenerator().Int()%peerCount], b); err != nil { + } else { + request, err := s.pendingBlobsRequestForBlock(blkRoot, b) + if err != nil { return err } + if len(request) > 0 { + peers := s.getBestPeers() + peerCount := len(peers) + if peerCount == 0 { + return errors.Wrapf(errNoPeersForPending, "block root=%#x", blkRoot) + } + if err := s.sendAndSaveBlobSidecars(ctx, request, peers[rand.NewGenerator().Int()%peerCount], b); err != nil { + return err + } + } } if err := s.cfg.chain.ReceiveBlock(ctx, b, blkRoot, nil); err != nil { diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root.go b/beacon-chain/sync/rpc_beacon_blocks_by_root.go index 6f7961968b52..f608669a9416 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root.go @@ -7,10 +7,13 @@ import ( libp2pcore "github.com/libp2p/go-libp2p/core" "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" "github.com/prysmaticlabs/prysm/v5/beacon-chain/execution" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/verify" "github.com/prysmaticlabs/prysm/v5/beacon-chain/verification" + "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" + "github.com/prysmaticlabs/prysm/v5/config/features" fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" @@ -55,15 +58,28 @@ func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, requests *t if err != nil { return err } - request, err := s.pendingBlobsRequestForBlock(blkRoot, blk) - if err != nil { - return errors.Wrap(err, "pending blobs request for block") - } - if len(request) == 0 { - continue - } - if err := s.sendAndSaveBlobSidecars(ctx, request, id, blk); err != nil { - return err + if features.Get().EnablePeerDAS { + request, err := s.pendingDataColumnRequestForBlock(blkRoot, blk) + if err != nil { + return errors.Wrap(err, "pending data column request for block") + } + if len(request) == 0 { + continue + } + if err := s.sendAndSaveDataColumnSidecars(ctx, request, id, blk); err != nil { + return errors.Wrap(err, "send and save data column sidecars") + } + } else { + request, err := s.pendingBlobsRequestForBlock(blkRoot, blk) + if err != nil { + return errors.Wrap(err, "pending blobs request for block") + } + if len(request) == 0 { + continue + } + if err := s.sendAndSaveBlobSidecars(ctx, request, id, blk); err != nil { + return errors.Wrap(err, "send and save blob sidecars") + } } } return err @@ -170,6 +186,36 @@ func (s *Service) sendAndSaveBlobSidecars(ctx context.Context, request types.Blo return nil } +func (s *Service) sendAndSaveDataColumnSidecars(ctx context.Context, request types.BlobSidecarsByRootReq, peerID peer.ID, block interfaces.ReadOnlySignedBeaconBlock) error { + if len(request) == 0 { + return nil + } + + sidecars, err := SendDataColumnSidecarByRoot(ctx, s.cfg.clock, s.cfg.p2p, peerID, s.ctxMap, &request) + if err != nil { + return err + } + + RoBlock, err := blocks.NewROBlock(block) + if err != nil { + return err + } + for _, sidecar := range sidecars { + if err := verify.ColumnAlignsWithBlock(sidecar, RoBlock); err != nil { + return err + } + log.WithFields(columnFields(sidecar)).Debug("Received data column sidecar RPC") + } + + for i := range sidecars { + verfiedCol := blocks.NewVerifiedRODataColumn(sidecars[i]) + if err := s.cfg.blobStorage.SaveDataColumn(verfiedCol); err != nil { + return err + } + } + return nil +} + func (s *Service) pendingBlobsRequestForBlock(root [32]byte, b interfaces.ReadOnlySignedBeaconBlock) (types.BlobSidecarsByRootReq, error) { if b.Version() < version.Deneb { return nil, nil // Block before deneb has no blob. @@ -190,6 +236,20 @@ func (s *Service) pendingBlobsRequestForBlock(root [32]byte, b interfaces.ReadOn return blobIdentifiers, nil } +func (s *Service) pendingDataColumnRequestForBlock(root [32]byte, b interfaces.ReadOnlySignedBeaconBlock) (types.BlobSidecarsByRootReq, error) { + if b.Version() < version.Deneb { + return nil, nil // Block before deneb has no blob. + } + cc, err := b.Block().Body().BlobKzgCommitments() + if err != nil { + return nil, err + } + if len(cc) == 0 { + return nil, nil + } + return s.constructPendingColumnRequest(root) +} + // constructPendingBlobsRequest creates a request for BlobSidecars by root, considering blobs already in DB. func (s *Service) constructPendingBlobsRequest(root [32]byte, commitments int) (types.BlobSidecarsByRootReq, error) { if commitments == 0 { @@ -203,6 +263,23 @@ func (s *Service) constructPendingBlobsRequest(root [32]byte, commitments int) ( return requestsForMissingIndices(stored, commitments, root), nil } +func (s *Service) constructPendingColumnRequest(root [32]byte) (types.BlobSidecarsByRootReq, error) { + stored, err := s.cfg.blobStorage.ColumnIndices(root) + if err != nil { + return nil, err + } + custodiedSubnetCount := params.BeaconConfig().CustodyRequirement + if flags.Get().SubscribeToAllSubnets { + custodiedSubnetCount = params.BeaconConfig().DataColumnSidecarSubnetCount + } + custodiedColumns, err := peerdas.CustodyColumns(s.cfg.p2p.NodeID(), custodiedSubnetCount) + if err != nil { + return nil, err + } + + return requestsForMissingColumnIndices(stored, custodiedColumns, root), nil +} + // requestsForMissingIndices constructs a slice of BlobIdentifiers that are missing from // local storage, based on a mapping that represents which indices are locally stored, // and the highest expected index. @@ -215,3 +292,13 @@ func requestsForMissingIndices(storedIndices [fieldparams.MaxBlobsPerBlock]bool, } return ids } + +func requestsForMissingColumnIndices(storedIndices [fieldparams.NumberOfColumns]bool, wantedIndices map[uint64]bool, root [32]byte) []*eth.BlobIdentifier { + var ids []*eth.BlobIdentifier + for i := range wantedIndices { + if !storedIndices[i] { + ids = append(ids, ð.BlobIdentifier{Index: i, BlockRoot: root[:]}) + } + } + return ids +} diff --git a/beacon-chain/sync/sampling_data_columns.go b/beacon-chain/sync/sampling_data_columns.go index 15f86ebcd97f..00c3f61f4ead 100644 --- a/beacon-chain/sync/sampling_data_columns.go +++ b/beacon-chain/sync/sampling_data_columns.go @@ -8,11 +8,9 @@ import ( "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/crypto" "github.com/pkg/errors" - ssz "github.com/prysmaticlabs/fastssz" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed" statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" - "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" "github.com/prysmaticlabs/prysm/v5/config/params" @@ -44,37 +42,19 @@ func (s *Service) sampleDataColumns(requestedRoot [fieldparams.RootLength]byte, // Sampling is done sequentially peer by peer. // TODO: Add parallelism if (probably) needed. - for _, peer := range activePeers { + for _, pid := range activePeers { // Early exit if all needed columns are already sampled. // This is the happy path. if len(missingIndices) == 0 { return nil, nil } - - // Retrieve the ENR of the peer. - peerRecord, err := s.cfg.p2p.Peers().ENR(peer) + peerCustodiedSubnetCount, err := s.cfg.p2p.CustodyCountFromRemotePeer(pid) if err != nil { - return nil, errors.Wrap(err, "ENR") - } - - peerCustodiedSubnetCount := params.BeaconConfig().CustodyRequirement - - if peerRecord != nil { - // Load the `custody_subnet_count` - // TODO: Do not harcode `custody_subnet_count` - custodyBytes := make([]byte, 8) - if err := peerRecord.Load(p2p.CustodySubnetCount(custodyBytes)); err != nil { - return nil, errors.Wrap(err, "load custody_subnet_count") - } - actualCustodyCount := ssz.UnmarshallUint64(custodyBytes) - - if actualCustodyCount > peerCustodiedSubnetCount { - peerCustodiedSubnetCount = actualCustodyCount - } + return nil, err } // Retrieve the public key object of the peer under "crypto" form. - pubkeyObjCrypto, err := peer.ExtractPublicKey() + pubkeyObjCrypto, err := pid.ExtractPublicKey() if err != nil { return nil, errors.Wrap(err, "extract public key") } @@ -129,7 +109,7 @@ func (s *Service) sampleDataColumns(requestedRoot [fieldparams.RootLength]byte, } // Sample data columns. - roDataColumns, err := SendDataColumnSidecarByRoot(s.ctx, s.cfg.clock, s.cfg.p2p, peer, s.ctxMap, &dataColumnIdentifiers) + roDataColumns, err := SendDataColumnSidecarByRoot(s.ctx, s.cfg.clock, s.cfg.p2p, pid, s.ctxMap, &dataColumnIdentifiers) if err != nil { return nil, errors.Wrap(err, "send data column sidecar by root") } diff --git a/beacon-chain/sync/validate_blob.go b/beacon-chain/sync/validate_blob.go index 275015d6543d..ff774396cadb 100644 --- a/beacon-chain/sync/validate_blob.go +++ b/beacon-chain/sync/validate_blob.go @@ -169,6 +169,16 @@ func blobFields(b blocks.ROBlob) logrus.Fields { } } +func columnFields(b blocks.RODataColumn) logrus.Fields { + return logrus.Fields{ + "slot": b.Slot(), + "proposerIndex": b.ProposerIndex(), + "blockRoot": fmt.Sprintf("%#x", b.BlockRoot()), + "kzgCommitments": fmt.Sprintf("%#x", b.KzgCommitments), + "columnIndex": b.ColumnIndex, + } +} + func computeSubnetForBlobSidecar(index uint64) uint64 { return index % params.BeaconConfig().BlobsidecarSubnetCount }