Skip to content

Commit

Permalink
Request Data Columns When Fetching Pending Blocks (#14007)
Browse files Browse the repository at this point in the history
* Support Data Columns For By Root Requests

* Revert Config Changes

* Fix Panic

* Fix Process Block

* Fix Flags

* Lint

* Support Checkpoint Sync

* Manu's Review

* Add Support For Columns in Remaining Methods

* Unmarshal Uncorrectly
  • Loading branch information
nisdas authored and nalepae committed May 29, 2024
1 parent 303db09 commit 1b7d6ed
Show file tree
Hide file tree
Showing 13 changed files with 368 additions and 62 deletions.
1 change: 1 addition & 0 deletions beacon-chain/blockchain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ go_library(
"//beacon-chain/startup:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/features:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
Expand Down
7 changes: 6 additions & 1 deletion beacon-chain/blockchain/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"

Expand Down Expand Up @@ -640,8 +641,12 @@ func (s *Service) isDataAvailableDataColumns(ctx context.Context, root [32]byte,
if len(kzgCommitments) == 0 {
return nil
}
custodiedSubnetCount := params.BeaconConfig().CustodyRequirement
if flags.Get().SubscribeToAllSubnets {
custodiedSubnetCount = params.BeaconConfig().DataColumnSidecarSubnetCount
}

colMap, err := peerdas.CustodyColumns(s.cfg.P2P.NodeID(), params.BeaconConfig().CustodyRequirement)
colMap, err := peerdas.CustodyColumns(s.cfg.P2P.NodeID(), custodiedSubnetCount)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"broadcaster.go",
"config.go",
"connection_gater.go",
"custody.go",
"dial_relay_node.go",
"discovery.go",
"doc.go",
Expand Down Expand Up @@ -81,6 +82,7 @@ go_library(
"@com_github_ethereum_go_ethereum//p2p/discover:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_ferranbt_fastssz//:go_default_library",
"@com_github_holiman_uint256//:go_default_library",
"@com_github_kr_pretty//:go_default_library",
"@com_github_libp2p_go_libp2p//:go_default_library",
Expand Down
73 changes: 73 additions & 0 deletions beacon-chain/p2p/custody.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package p2p

import (
ssz "github.com/ferranbt/fastssz"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/params"
)

func (s *Service) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
custodiedSubnetCount := params.BeaconConfig().CustodyRequirement
if flags.Get().SubscribeToAllSubnets {
custodiedSubnetCount = params.BeaconConfig().DataColumnSidecarSubnetCount
}
custodiedColumns, err := peerdas.CustodyColumns(s.NodeID(), custodiedSubnetCount)
if err != nil {
return nil, err
}
var validPeers []peer.ID
for _, pid := range peers {
remoteCount, err := s.CustodyCountFromRemotePeer(pid)
if err != nil {
return nil, err
}
nodeId, err := ConvertPeerIDToNodeID(pid)
if err != nil {
return nil, err
}
remoteCustodiedColumns, err := peerdas.CustodyColumns(nodeId, remoteCount)
if err != nil {
return nil, err
}
invalidPeer := false
for c := range custodiedColumns {
if !remoteCustodiedColumns[c] {
invalidPeer = true
break
}
}
if invalidPeer {
continue
}
copiedId := pid
// Add valid peer to list
validPeers = append(validPeers, copiedId)
}
return validPeers, nil
}

func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) (uint64, error) {
// Retrieve the ENR of the peer.
peerRecord, err := s.peers.ENR(pid)
if err != nil {
return 0, errors.Wrap(err, "ENR")
}
peerCustodiedSubnetCount := params.BeaconConfig().CustodyRequirement
if peerRecord != nil {
// Load the `custody_subnet_count`
custodyBytes := make([]byte, 8)
custodyObj := CustodySubnetCount(custodyBytes)
if err := peerRecord.Load(&custodyObj); err != nil {
return 0, errors.Wrap(err, "load custody_subnet_count")
}
actualCustodyCount := ssz.UnmarshallUint64(custodyBytes)

if actualCustodyCount > peerCustodiedSubnetCount {
peerCustodiedSubnetCount = actualCustodyCount
}
}
return peerCustodiedSubnetCount, nil
}
6 changes: 6 additions & 0 deletions beacon-chain/p2p/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type P2P interface {
ConnectionHandler
PeersProvider
MetadataProvider
CustodyHandler
}

type Acceser interface {
Expand Down Expand Up @@ -110,3 +111,8 @@ type MetadataProvider interface {
Metadata() metadata.Metadata
MetadataSeq() uint64
}

type CustodyHandler interface {
CustodyCountFromRemotePeer(peer.ID) (uint64, error)
GetValidCustodyPeers([]peer.ID) ([]peer.ID, error)
}
8 changes: 8 additions & 0 deletions beacon-chain/p2p/testing/fuzz_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,11 @@ func (_ *FakeP2P) InterceptSecured(network.Direction, peer.ID, network.ConnMulti
func (_ *FakeP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.DisconnectReason) {
return true, 0
}

func (_ *FakeP2P) CustodyCountFromRemotePeer(peer.ID) (uint64, error) {
return 0, nil
}

func (_ *FakeP2P) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
return peers, nil
}
8 changes: 8 additions & 0 deletions beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,3 +419,11 @@ func (_ *TestP2P) InterceptSecured(network.Direction, peer.ID, network.ConnMulti
func (_ *TestP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.DisconnectReason) {
return true, 0
}

func (_ *TestP2P) CustodyCountFromRemotePeer(peer.ID) (uint64, error) {
return 0, nil
}

func (_ *TestP2P) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
return peers, nil
}
43 changes: 30 additions & 13 deletions beacon-chain/sync/initial-sync/blocks_fetcher_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/pkg/errors"
p2pTypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
Expand Down Expand Up @@ -236,18 +237,18 @@ func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot
Count: reqCount,
Step: 1,
}
blocks, err := f.requestBlocks(ctx, req, pid)
reqBlocks, err := f.requestBlocks(ctx, req, pid)
if err != nil {
return nil, fmt.Errorf("cannot fetch blocks: %w", err)
}
if len(blocks) == 0 {
if len(reqBlocks) == 0 {
return nil, errNoAlternateBlocks
}

// If the first block is not connected to the current canonical chain, we'll stop processing this batch.
// Instead, we'll work backwards from the first block until we find a common ancestor,
// and then begin processing from there.
first := blocks[0]
first := reqBlocks[0]
if !f.chain.HasBlock(ctx, first.Block().ParentRoot()) {
// Backtrack on a root, to find a common ancestor from which we can resume syncing.
fork, err := f.findAncestor(ctx, pid, first)
Expand All @@ -260,8 +261,8 @@ func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot
// Traverse blocks, and if we've got one that doesn't have parent in DB, backtrack on it.
// Note that we start from the second element in the array, because we know that the first element is in the db,
// otherwise we would have gone into the findAncestor early return path above.
for i := 1; i < len(blocks); i++ {
block := blocks[i]
for i := 1; i < len(reqBlocks); i++ {
block := reqBlocks[i]
parentRoot := block.Block().ParentRoot()
// Step through blocks until we find one that is not in the chain. The goal is to find the point where the
// chain observed in the peer diverges from the locally known chain, and then collect up the remainder of the
Expand All @@ -274,16 +275,25 @@ func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot
"slot": block.Block().Slot(),
"root": fmt.Sprintf("%#x", parentRoot),
}).Debug("Block with unknown parent root has been found")
altBlocks, err := sortedBlockWithVerifiedBlobSlice(blocks[i-1:])
altBlocks, err := sortedBlockWithVerifiedBlobSlice(reqBlocks[i-1:])
if err != nil {
return nil, errors.Wrap(err, "invalid blocks received in findForkWithPeer")
}
var bwb []blocks.BlockWithROBlobs
if features.Get().EnablePeerDAS {
bwb, err = f.fetchColumnsFromPeer(ctx, altBlocks, pid, []peer.ID{pid})
if err != nil {
return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findForkWithPeer")
}
} else {
bwb, err = f.fetchBlobsFromPeer(ctx, altBlocks, pid, []peer.ID{pid})
if err != nil {
return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findForkWithPeer")
}
}
// We need to fetch the blobs for the given alt-chain if any exist, so that we can try to verify and import
// the blocks.
bwb, err := f.fetchBlobsFromPeer(ctx, altBlocks, pid, []peer.ID{pid})
if err != nil {
return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findForkWithPeer")
}

// The caller will use the BlocksWith VerifiedBlobs in bwb as the starting point for
// round-robin syncing the alternate chain.
return &forkData{peer: pid, bwb: bwb}, nil
Expand All @@ -302,9 +312,16 @@ func (f *blocksFetcher) findAncestor(ctx context.Context, pid peer.ID, b interfa
if err != nil {
return nil, errors.Wrap(err, "received invalid blocks in findAncestor")
}
bwb, err = f.fetchBlobsFromPeer(ctx, bwb, pid, []peer.ID{pid})
if err != nil {
return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findAncestor")
if features.Get().EnablePeerDAS {
bwb, err = f.fetchColumnsFromPeer(ctx, bwb, pid, []peer.ID{pid})
if err != nil {
return nil, errors.Wrap(err, "unable to retrieve columns for blocks found in findAncestor")
}
} else {
bwb, err = f.fetchBlobsFromPeer(ctx, bwb, pid, []peer.ID{pid})
if err != nil {
return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findAncestor")
}
}
return &forkData{
peer: pid,
Expand Down
94 changes: 91 additions & 3 deletions beacon-chain/sync/initial-sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/crypto/rand"
Expand Down Expand Up @@ -184,9 +185,16 @@ func (s *Service) Start() {
log.WithError(err).Error("Error waiting for minimum number of peers")
return
}
if err := s.fetchOriginBlobs(peers); err != nil {
log.WithError(err).Error("Failed to fetch missing blobs for checkpoint origin")
return
if features.Get().EnablePeerDAS {
if err := s.fetchOriginColumns(peers); err != nil {
log.WithError(err).Error("Failed to fetch missing columns for checkpoint origin")
return
}
} else {
if err := s.fetchOriginBlobs(peers); err != nil {
log.WithError(err).Error("Failed to fetch missing blobs for checkpoint origin")
return
}
}
if err := s.roundRobinSync(gt); err != nil {
if errors.Is(s.ctx.Err(), context.Canceled) {
Expand Down Expand Up @@ -306,6 +314,33 @@ func missingBlobRequest(blk blocks.ROBlock, store *filesystem.BlobStorage) (p2pt
return req, nil
}

func missingColumnRequest(blk blocks.ROBlock, store *filesystem.BlobStorage) (p2ptypes.BlobSidecarsByRootReq, error) {
r := blk.Root()
if blk.Version() < version.Deneb {
return nil, nil
}
cmts, err := blk.Block().Body().BlobKzgCommitments()
if err != nil {
log.WithField("root", r).Error("Error reading commitments from checkpoint sync origin block")
return nil, err
}
if len(cmts) == 0 {
return nil, nil
}
onDisk, err := store.ColumnIndices(r)
if err != nil {
return nil, errors.Wrapf(err, "error checking existing blobs for checkpoint sync block root %#x", r)
}
req := make(p2ptypes.BlobSidecarsByRootReq, 0, len(cmts))
for i := range cmts {
if onDisk[i] {
continue
}
req = append(req, &eth.BlobIdentifier{BlockRoot: r[:], Index: uint64(i)})
}
return req, nil
}

func (s *Service) fetchOriginBlobs(pids []peer.ID) error {
r, err := s.cfg.DB.OriginCheckpointBlockRoot(s.ctx)
if errors.Is(err, db.ErrNotFoundOriginBlockRoot) {
Expand Down Expand Up @@ -356,6 +391,59 @@ func (s *Service) fetchOriginBlobs(pids []peer.ID) error {
return fmt.Errorf("no connected peer able to provide blobs for checkpoint sync block %#x", r)
}

func (s *Service) fetchOriginColumns(pids []peer.ID) error {
r, err := s.cfg.DB.OriginCheckpointBlockRoot(s.ctx)
if errors.Is(err, db.ErrNotFoundOriginBlockRoot) {
return nil
}
blk, err := s.cfg.DB.Block(s.ctx, r)
if err != nil {
log.WithField("root", fmt.Sprintf("%#x", r)).Error("Block for checkpoint sync origin root not found in db")
return err
}
if !params.WithinDAPeriod(slots.ToEpoch(blk.Block().Slot()), slots.ToEpoch(s.clock.CurrentSlot())) {
return nil
}
rob, err := blocks.NewROBlockWithRoot(blk, r)
if err != nil {
return err
}
req, err := missingColumnRequest(rob, s.cfg.BlobStorage)
if err != nil {
return err
}
if len(req) == 0 {
log.WithField("root", fmt.Sprintf("%#x", r)).Debug("All columns for checkpoint block are present")
return nil
}
shufflePeers(pids)
pids, err = s.cfg.P2P.GetValidCustodyPeers(pids)
if err != nil {
return err
}
for i := range pids {
sidecars, err := sync.SendDataColumnSidecarByRoot(s.ctx, s.clock, s.cfg.P2P, pids[i], s.ctxMap, &req)
if err != nil {
continue
}
if len(sidecars) != len(req) {
continue
}
avs := das.NewLazilyPersistentStoreColumn(s.cfg.BlobStorage, emptyVerifier{}, s.cfg.P2P.NodeID())
current := s.clock.CurrentSlot()
if err := avs.PersistColumns(current, sidecars...); err != nil {
return err
}
if err := avs.IsDataAvailable(s.ctx, current, rob); err != nil {
log.WithField("root", fmt.Sprintf("%#x", r)).WithField("peerID", pids[i]).Warn("Columns from peer for origin block were unusable")
continue
}
log.WithField("nColumns", len(sidecars)).WithField("root", fmt.Sprintf("%#x", r)).Info("Successfully downloaded blobs for checkpoint sync block")
return nil
}
return fmt.Errorf("no connected peer able to provide columns for checkpoint sync block %#x", r)
}

func shufflePeers(pids []peer.ID) {
rg := rand.NewGenerator()
rg.Shuffle(len(pids), func(i, j int) {
Expand Down
Loading

0 comments on commit 1b7d6ed

Please sign in to comment.