Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PeerDAS: Implement reconstruction. #14036

Merged
merged 9 commits into from
May 29, 2024
Merged
2 changes: 1 addition & 1 deletion beacon-chain/core/peerdas/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func RecoverMatrix(cellFromCoordinate map[cellCoordinate]cKzg4844.Cell, blobCoun

// DataColumnSidecars computes the data column sidecars from the signed block and blobs.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/das-core.md#recover_matrix
func DataColumnSidecars(signedBlock interfaces.SignedBeaconBlock, blobs []cKzg4844.Blob) ([]*ethpb.DataColumnSidecar, error) {
func DataColumnSidecars(signedBlock interfaces.ReadOnlySignedBeaconBlock, blobs []cKzg4844.Blob) ([]*ethpb.DataColumnSidecar, error) {
blobsCount := len(blobs)
if blobsCount == 0 {
return nil, nil
Expand Down
50 changes: 36 additions & 14 deletions beacon-chain/db/filesystem/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,12 @@ func (bs *BlobStorage) SaveDataColumn(column blocks.VerifiedRODataColumn) error
if err != nil {
return err
}

if exists {
log.Debug("Ignoring a duplicate data column sidecar save attempt")
log.Trace("Ignoring a duplicate data column sidecar save attempt")
return nil
}

if bs.pruner != nil {
hRoot, err := column.SignedBlockHeader.Header.HashTreeRoot()
if err != nil {
Expand Down Expand Up @@ -399,38 +401,58 @@ func (bs *BlobStorage) Indices(root [32]byte) ([fieldparams.MaxBlobsPerBlock]boo
}

// ColumnIndices retrieve the stored column indexes from our filesystem.
func (bs *BlobStorage) ColumnIndices(root [32]byte) ([fieldparams.NumberOfColumns]bool, error) {
var mask [fieldparams.NumberOfColumns]bool
func (bs *BlobStorage) ColumnIndices(root [32]byte) (map[uint64]bool, error) {
custody := make(map[uint64]bool, fieldparams.NumberOfColumns)

// Get all the files in the directory.
rootDir := blobNamer{root: root}.dir()
entries, err := afero.ReadDir(bs.fs, rootDir)
if err != nil {
// If the directory does not exist, we do not custody any columns.
if os.IsNotExist(err) {
return mask, nil
return nil, nil
}
return mask, err

return nil, errors.Wrap(err, "read directory")
}
for i := range entries {
if entries[i].IsDir() {

// Iterate over all the entries in the directory.
for _, entry := range entries {
// If the entry is a directory, skip it.
if entry.IsDir() {
continue
}
name := entries[i].Name()

// If the entry does not have the correct extension, skip it.
name := entry.Name()
if !strings.HasSuffix(name, sszExt) {
continue
}

// The file should be in the `<index>.<extension>` format.
// Skip the file if it does not match the format.
parts := strings.Split(name, ".")
if len(parts) != 2 {
continue
}
u, err := strconv.ParseUint(parts[0], 10, 64)

// Get the column index from the file name.
columnIndexStr := parts[0]
columnIndex, err := strconv.ParseUint(columnIndexStr, 10, 64)
if err != nil {
return mask, errors.Wrapf(err, "unexpected directory entry breaks listing, %s", parts[0])
return nil, errors.Wrapf(err, "unexpected directory entry breaks listing, %s", parts[0])
}
if u >= fieldparams.NumberOfColumns {
return mask, errors.Wrapf(errIndexOutOfBounds, "invalid index %d", u)

// If the column index is out of bounds, return an error.
if columnIndex >= fieldparams.NumberOfColumns {
return nil, errors.Wrapf(errIndexOutOfBounds, "invalid index %d", columnIndex)
}
mask[u] = true

// Mark the column index as in custody.
custody[columnIndex] = true
}
return mask, nil

return custody, nil
}

// Clear deletes all files on the filesystem.
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/db/kv/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
"go.opencensus.io/trace"
)

// used to represent errors for inconsistent slot ranges.
// Used to represent errors for inconsistent slot ranges.
var errInvalidSlotRange = errors.New("invalid end slot and start slot provided")

// Block retrieval by root.
// Block retrieval by root. Return nil if block is not found.
func (s *Store) Block(ctx context.Context, blockRoot [32]byte) (interfaces.ReadOnlySignedBeaconBlock, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.Block")
defer span.End()
Expand Down
53 changes: 32 additions & 21 deletions beacon-chain/p2p/custody.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/sirupsen/logrus"
)

func (s *Service) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
Expand All @@ -20,17 +21,15 @@ func (s *Service) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
}
var validPeers []peer.ID
for _, pid := range peers {
remoteCount, err := s.CustodyCountFromRemotePeer(pid)
if err != nil {
return nil, err
}
remoteCount := s.CustodyCountFromRemotePeer(pid)

nodeId, err := ConvertPeerIDToNodeID(pid)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "convert peer ID to node ID")
}
remoteCustodiedColumns, err := peerdas.CustodyColumns(nodeId, remoteCount)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "custody columns")
}
invalidPeer := false
for c := range custodiedColumns {
Expand All @@ -49,24 +48,36 @@ func (s *Service) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
return validPeers, nil
}

func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) (uint64, error) {
func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) uint64 {
// By default, we assume the peer custodies the minimum number of subnets.
peerCustodyCountCount := params.BeaconConfig().CustodyRequirement

// Retrieve the ENR of the peer.
peerRecord, err := s.peers.ENR(pid)
if err != nil {
return 0, errors.Wrap(err, "ENR")
log.WithError(err).WithField("peerID", pid).Error("Failed to retrieve ENR for peer")
return peerCustodyCountCount
}
peerCustodiedSubnetCount := params.BeaconConfig().CustodyRequirement
if peerRecord != nil {
// Load the `custody_subnet_count`
custodyBytes := make([]byte, 8)
custodyObj := CustodySubnetCount(custodyBytes)
if err := peerRecord.Load(&custodyObj); err != nil {
return 0, errors.Wrap(err, "load custody_subnet_count")
}
actualCustodyCount := ssz.UnmarshallUint64(custodyObj)
if actualCustodyCount > peerCustodiedSubnetCount {
peerCustodiedSubnetCount = actualCustodyCount
}

if peerRecord == nil {
// This is the case for inbound peers. So we don't log an error for this.
log.WithField("peerID", pid).Debug("No ENR found for peer")
return peerCustodyCountCount
}

// Load the `custody_subnet_count`
custodyObj := CustodySubnetCount(make([]byte, 8))
if err := peerRecord.Load(&custodyObj); err != nil {
log.WithField("peerID", pid).Error("Cannot load the custody_subnet_count from peer")
return peerCustodyCountCount
}
return peerCustodiedSubnetCount, nil

// Unmarshal the custody count from the peer's ENR.
peerCustodyCountFromRecord := ssz.UnmarshallUint64(custodyObj)
log.WithFields(logrus.Fields{
"peerID": pid,
"custodyCount": peerCustodyCountFromRecord,
}).Debug("Custody count read from peer's ENR")

return peerCustodyCountFromRecord
}
2 changes: 1 addition & 1 deletion beacon-chain/p2p/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,6 @@ type MetadataProvider interface {
}

type CustodyHandler interface {
CustodyCountFromRemotePeer(peer.ID) (uint64, error)
CustodyCountFromRemotePeer(peer.ID) uint64
GetValidCustodyPeers([]peer.ID) ([]peer.ID, error)
}
4 changes: 2 additions & 2 deletions beacon-chain/p2p/testing/fuzz_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ func (_ *FakeP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.Di
return true, 0
}

func (_ *FakeP2P) CustodyCountFromRemotePeer(peer.ID) (uint64, error) {
return 0, nil
func (_ *FakeP2P) CustodyCountFromRemotePeer(peer.ID) uint64 {
return 0
}

func (_ *FakeP2P) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ func (_ *TestP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.Di
return true, 0
}

func (_ *TestP2P) CustodyCountFromRemotePeer(peer.ID) (uint64, error) {
return 0, nil
func (_ *TestP2P) CustodyCountFromRemotePeer(peer.ID) uint64 {
return 0
}

func (_ *TestP2P) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
return nil, status.Errorf(codes.Internal, "%s: %v", "handle blinded block", err)
}
} else {
blobSidecars, dataColumnSideCars, err = handleUnblidedBlock(block, req, isPeerDASEnabled)
blobSidecars, dataColumnSideCars, err = handleUnblindedBlock(block, req, isPeerDASEnabled)
if err != nil {
return nil, status.Errorf(codes.Internal, "%s: %v", "handle unblided block", err)
}
Expand Down Expand Up @@ -373,7 +373,7 @@ func (vs *Server) handleBlindedBlock(ctx context.Context, block interfaces.Signe
}

// handleUnblindedBlock processes unblinded beacon blocks.
func handleUnblidedBlock(block interfaces.SignedBeaconBlock, req *ethpb.GenericSignedBeaconBlock, isPeerDASEnabled bool) ([]*ethpb.BlobSidecar, []*ethpb.DataColumnSidecar, error) {
func handleUnblindedBlock(block interfaces.SignedBeaconBlock, req *ethpb.GenericSignedBeaconBlock, isPeerDASEnabled bool) ([]*ethpb.BlobSidecar, []*ethpb.DataColumnSidecar, error) {
dbBlockContents := req.GetDeneb()

if dbBlockContents == nil {
Expand Down
4 changes: 3 additions & 1 deletion beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ go_library(
"block_batcher.go",
"broadcast_bls_changes.go",
"context.go",
"data_columns_reconstruct.go",
"data_columns_sampling.go",
"deadlines.go",
"decode_pubsub.go",
"doc.go",
Expand All @@ -32,7 +34,6 @@ go_library(
"rpc_ping.go",
"rpc_send_request.go",
"rpc_status.go",
"sampling_data_columns.go",
"service.go",
"subscriber.go",
"subscriber_beacon_aggregate_proof.go",
Expand Down Expand Up @@ -126,6 +127,7 @@ go_library(
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_btcsuite_btcd_btcec_v2//:go_default_library",
"@com_github_ethereum_c_kzg_4844//bindings/go:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_ethereum_go_ethereum//common/math:go_default_library",
"@com_github_ethereum_go_ethereum//crypto:go_default_library",
Expand Down
Loading
Loading