Skip to content

Commit

Permalink
PeerDAS: Implement reconstruction. (#14036)
Browse files Browse the repository at this point in the history
* Wrap errors, add logs.

* `missingColumnRequest`: Fix blobs <-> data columns mix.

* `ColumnIndices`: Return `map[uint64]bool` instead of `[fieldparams.NumberOfColumns]bool`.

* `DataColumnSidecars`: `interfaces.SignedBeaconBlock` ==> `interfaces.ReadOnlySignedBeaconBlock`.

We don't need any of the non read-only methods.

* Fix comments.

* `handleUnblidedBlock` ==> `handleUnblindedBlock`.

* `SaveDataColumn`: Move log from debug to trace.

If we attempt to save an already existing data column sidecar,
a debug log was printed.

This case could be quite common now with the data column reconstruction enabled.

* `sampling_data_columns.go` --> `data_columns_sampling.go`.

* Reconstruct data columns.
  • Loading branch information
nalepae committed Nov 25, 2024
1 parent 71e185a commit 306a1bd
Show file tree
Hide file tree
Showing 18 changed files with 685 additions and 305 deletions.
2 changes: 1 addition & 1 deletion beacon-chain/core/peerdas/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func RecoverMatrix(cellFromCoordinate map[cellCoordinate]cKzg4844.Cell, blobCoun

// DataColumnSidecars computes the data column sidecars from the signed block and blobs.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/das-core.md#recover_matrix
func DataColumnSidecars(signedBlock interfaces.SignedBeaconBlock, blobs []cKzg4844.Blob) ([]*ethpb.DataColumnSidecar, error) {
func DataColumnSidecars(signedBlock interfaces.ReadOnlySignedBeaconBlock, blobs []cKzg4844.Blob) ([]*ethpb.DataColumnSidecar, error) {
blobsCount := len(blobs)
if blobsCount == 0 {
return nil, nil
Expand Down
50 changes: 36 additions & 14 deletions beacon-chain/db/filesystem/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,12 @@ func (bs *BlobStorage) SaveDataColumn(column blocks.VerifiedRODataColumn) error
if err != nil {
return err
}

if exists {
log.Debug("Ignoring a duplicate data column sidecar save attempt")
log.Trace("Ignoring a duplicate data column sidecar save attempt")
return nil
}

if bs.pruner != nil {
hRoot, err := column.SignedBlockHeader.Header.HashTreeRoot()
if err != nil {
Expand Down Expand Up @@ -399,38 +401,58 @@ func (bs *BlobStorage) Indices(root [32]byte) ([fieldparams.MaxBlobsPerBlock]boo
}

// ColumnIndices retrieve the stored column indexes from our filesystem.
func (bs *BlobStorage) ColumnIndices(root [32]byte) ([fieldparams.NumberOfColumns]bool, error) {
var mask [fieldparams.NumberOfColumns]bool
func (bs *BlobStorage) ColumnIndices(root [32]byte) (map[uint64]bool, error) {
custody := make(map[uint64]bool, fieldparams.NumberOfColumns)

// Get all the files in the directory.
rootDir := blobNamer{root: root}.dir()
entries, err := afero.ReadDir(bs.fs, rootDir)
if err != nil {
// If the directory does not exist, we do not custody any columns.
if os.IsNotExist(err) {
return mask, nil
return nil, nil
}
return mask, err

return nil, errors.Wrap(err, "read directory")
}
for i := range entries {
if entries[i].IsDir() {

// Iterate over all the entries in the directory.
for _, entry := range entries {
// If the entry is a directory, skip it.
if entry.IsDir() {
continue
}
name := entries[i].Name()

// If the entry does not have the correct extension, skip it.
name := entry.Name()
if !strings.HasSuffix(name, sszExt) {
continue
}

// The file should be in the `<index>.<extension>` format.
// Skip the file if it does not match the format.
parts := strings.Split(name, ".")
if len(parts) != 2 {
continue
}
u, err := strconv.ParseUint(parts[0], 10, 64)

// Get the column index from the file name.
columnIndexStr := parts[0]
columnIndex, err := strconv.ParseUint(columnIndexStr, 10, 64)
if err != nil {
return mask, errors.Wrapf(err, "unexpected directory entry breaks listing, %s", parts[0])
return nil, errors.Wrapf(err, "unexpected directory entry breaks listing, %s", parts[0])
}
if u >= fieldparams.NumberOfColumns {
return mask, errors.Wrapf(errIndexOutOfBounds, "invalid index %d", u)

// If the column index is out of bounds, return an error.
if columnIndex >= fieldparams.NumberOfColumns {
return nil, errors.Wrapf(errIndexOutOfBounds, "invalid index %d", columnIndex)
}
mask[u] = true

// Mark the column index as in custody.
custody[columnIndex] = true
}
return mask, nil

return custody, nil
}

// Clear deletes all files on the filesystem.
Expand Down
53 changes: 32 additions & 21 deletions beacon-chain/p2p/custody.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/sirupsen/logrus"
)

func (s *Service) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
Expand All @@ -20,17 +21,15 @@ func (s *Service) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
}
var validPeers []peer.ID
for _, pid := range peers {
remoteCount, err := s.CustodyCountFromRemotePeer(pid)
if err != nil {
return nil, err
}
remoteCount := s.CustodyCountFromRemotePeer(pid)

nodeId, err := ConvertPeerIDToNodeID(pid)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "convert peer ID to node ID")
}
remoteCustodiedColumns, err := peerdas.CustodyColumns(nodeId, remoteCount)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "custody columns")
}
invalidPeer := false
for c := range custodiedColumns {
Expand All @@ -49,24 +48,36 @@ func (s *Service) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
return validPeers, nil
}

func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) (uint64, error) {
func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) uint64 {
// By default, we assume the peer custodies the minimum number of subnets.
peerCustodyCountCount := params.BeaconConfig().CustodyRequirement

// Retrieve the ENR of the peer.
peerRecord, err := s.peers.ENR(pid)
if err != nil {
return 0, errors.Wrap(err, "ENR")
log.WithError(err).WithField("peerID", pid).Error("Failed to retrieve ENR for peer")
return peerCustodyCountCount
}
peerCustodiedSubnetCount := params.BeaconConfig().CustodyRequirement
if peerRecord != nil {
// Load the `custody_subnet_count`
custodyBytes := make([]byte, 8)
custodyObj := CustodySubnetCount(custodyBytes)
if err := peerRecord.Load(&custodyObj); err != nil {
return 0, errors.Wrap(err, "load custody_subnet_count")
}
actualCustodyCount := ssz.UnmarshallUint64(custodyObj)
if actualCustodyCount > peerCustodiedSubnetCount {
peerCustodiedSubnetCount = actualCustodyCount
}

if peerRecord == nil {
// This is the case for inbound peers. So we don't log an error for this.
log.WithField("peerID", pid).Debug("No ENR found for peer")
return peerCustodyCountCount
}

// Load the `custody_subnet_count`
custodyObj := CustodySubnetCount(make([]byte, 8))
if err := peerRecord.Load(&custodyObj); err != nil {
log.WithField("peerID", pid).Error("Cannot load the custody_subnet_count from peer")
return peerCustodyCountCount
}
return peerCustodiedSubnetCount, nil

// Unmarshal the custody count from the peer's ENR.
peerCustodyCountFromRecord := ssz.UnmarshallUint64(custodyObj)
log.WithFields(logrus.Fields{
"peerID": pid,
"custodyCount": peerCustodyCountFromRecord,
}).Debug("Custody count read from peer's ENR")

return peerCustodyCountFromRecord
}
2 changes: 1 addition & 1 deletion beacon-chain/p2p/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,6 @@ type MetadataProvider interface {
}

type CustodyHandler interface {
CustodyCountFromRemotePeer(peer.ID) (uint64, error)
CustodyCountFromRemotePeer(peer.ID) uint64
GetValidCustodyPeers([]peer.ID) ([]peer.ID, error)
}
4 changes: 2 additions & 2 deletions beacon-chain/p2p/testing/fuzz_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ func (*FakeP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.Disc
return true, 0
}

func (_ *FakeP2P) CustodyCountFromRemotePeer(peer.ID) (uint64, error) {
return 0, nil
func (_ *FakeP2P) CustodyCountFromRemotePeer(peer.ID) uint64 {
return 0
}

func (_ *FakeP2P) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,8 @@ func (_ *TestP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.Di
return true, 0
}

func (_ *TestP2P) CustodyCountFromRemotePeer(peer.ID) (uint64, error) {
return 0, nil
func (_ *TestP2P) CustodyCountFromRemotePeer(peer.ID) uint64 {
return 0
}

func (_ *TestP2P) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
Expand Down
4 changes: 3 additions & 1 deletion beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ go_library(
"block_batcher.go",
"broadcast_bls_changes.go",
"context.go",
"data_columns_reconstruct.go",
"data_columns_sampling.go",
"deadlines.go",
"decode_pubsub.go",
"doc.go",
Expand All @@ -32,7 +34,6 @@ go_library(
"rpc_ping.go",
"rpc_send_request.go",
"rpc_status.go",
"sampling_data_columns.go",
"service.go",
"subscriber.go",
"subscriber_beacon_aggregate_proof.go",
Expand Down Expand Up @@ -129,6 +130,7 @@ go_library(
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_btcsuite_btcd_btcec_v2//:go_default_library",
"@com_github_ethereum_c_kzg_4844//bindings/go:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_ethereum_go_ethereum//common/math:go_default_library",
"@com_github_ethereum_go_ethereum//crypto:go_default_library",
Expand Down
Loading

0 comments on commit 306a1bd

Please sign in to comment.