From 282ac9d7190685579ef477a560f4277f875f93e7 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Wed, 29 May 2024 10:03:21 +0200 Subject: [PATCH] PeerDAS: Implement reconstruction. (#14036) * Wrap errors, add logs. * `missingColumnRequest`: Fix blobs <-> data columns mix. * `ColumnIndices`: Return `map[uint64]bool` instead of `[fieldparams.NumberOfColumns]bool`. * `DataColumnSidecars`: `interfaces.SignedBeaconBlock` ==> `interfaces.ReadOnlySignedBeaconBlock`. We don't need any of the non read-only methods. * Fix comments. * `handleUnblidedBlock` ==> `handleUnblindedBlock`. * `SaveDataColumn`: Move log from debug to trace. If we attempt to save an already existing data column sidecar, a debug log was printed. This case could be quite common now with the data column reconstruction enabled. * `sampling_data_columns.go` --> `data_columns_sampling.go`. * Reconstruct data columns. --- beacon-chain/core/peerdas/helpers.go | 2 +- beacon-chain/db/filesystem/blob.go | 50 ++- beacon-chain/db/kv/blocks.go | 4 +- beacon-chain/p2p/custody.go | 53 +-- beacon-chain/p2p/interfaces.go | 2 +- beacon-chain/p2p/testing/fuzz_p2p.go | 4 +- beacon-chain/p2p/testing/p2p.go | 4 +- .../rpc/prysm/v1alpha1/validator/proposer.go | 4 +- beacon-chain/sync/BUILD.bazel | 4 +- beacon-chain/sync/data_columns_reconstruct.go | 187 +++++++++++ beacon-chain/sync/data_columns_sampling.go | 303 ++++++++++++++++++ beacon-chain/sync/initial-sync/BUILD.bazel | 1 + beacon-chain/sync/initial-sync/service.go | 58 +++- .../sync/rpc_beacon_blocks_by_root.go | 32 +- .../sync/rpc_data_column_sidecars_by_root.go | 61 +++- beacon-chain/sync/sampling_data_columns.go | 219 ------------- beacon-chain/sync/service.go | 1 + .../sync/subscriber_data_column_sidecar.go | 6 + go.mod | 2 +- runtime/interop/genesis.go | 1 - 20 files changed, 689 insertions(+), 309 deletions(-) create mode 100644 beacon-chain/sync/data_columns_reconstruct.go create mode 100644 beacon-chain/sync/data_columns_sampling.go delete mode 100644 beacon-chain/sync/sampling_data_columns.go diff --git a/beacon-chain/core/peerdas/helpers.go b/beacon-chain/core/peerdas/helpers.go index a5907064780d..db2852072468 100644 --- a/beacon-chain/core/peerdas/helpers.go +++ b/beacon-chain/core/peerdas/helpers.go @@ -170,7 +170,7 @@ func RecoverMatrix(cellFromCoordinate map[cellCoordinate]cKzg4844.Cell, blobCoun // DataColumnSidecars computes the data column sidecars from the signed block and blobs. // https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/das-core.md#recover_matrix -func DataColumnSidecars(signedBlock interfaces.SignedBeaconBlock, blobs []cKzg4844.Blob) ([]*ethpb.DataColumnSidecar, error) { +func DataColumnSidecars(signedBlock interfaces.ReadOnlySignedBeaconBlock, blobs []cKzg4844.Blob) ([]*ethpb.DataColumnSidecar, error) { blobsCount := len(blobs) if blobsCount == 0 { return nil, nil diff --git a/beacon-chain/db/filesystem/blob.go b/beacon-chain/db/filesystem/blob.go index f35fe42e558b..6b6e0664d3da 100644 --- a/beacon-chain/db/filesystem/blob.go +++ b/beacon-chain/db/filesystem/blob.go @@ -230,10 +230,12 @@ func (bs *BlobStorage) SaveDataColumn(column blocks.VerifiedRODataColumn) error if err != nil { return err } + if exists { - log.Debug("Ignoring a duplicate data column sidecar save attempt") + log.Trace("Ignoring a duplicate data column sidecar save attempt") return nil } + if bs.pruner != nil { hRoot, err := column.SignedBlockHeader.Header.HashTreeRoot() if err != nil { @@ -399,38 +401,58 @@ func (bs *BlobStorage) Indices(root [32]byte) ([fieldparams.MaxBlobsPerBlock]boo } // ColumnIndices retrieve the stored column indexes from our filesystem. -func (bs *BlobStorage) ColumnIndices(root [32]byte) ([fieldparams.NumberOfColumns]bool, error) { - var mask [fieldparams.NumberOfColumns]bool +func (bs *BlobStorage) ColumnIndices(root [32]byte) (map[uint64]bool, error) { + custody := make(map[uint64]bool, fieldparams.NumberOfColumns) + + // Get all the files in the directory. rootDir := blobNamer{root: root}.dir() entries, err := afero.ReadDir(bs.fs, rootDir) if err != nil { + // If the directory does not exist, we do not custody any columns. if os.IsNotExist(err) { - return mask, nil + return nil, nil } - return mask, err + + return nil, errors.Wrap(err, "read directory") } - for i := range entries { - if entries[i].IsDir() { + + // Iterate over all the entries in the directory. + for _, entry := range entries { + // If the entry is a directory, skip it. + if entry.IsDir() { continue } - name := entries[i].Name() + + // If the entry does not have the correct extension, skip it. + name := entry.Name() if !strings.HasSuffix(name, sszExt) { continue } + + // The file should be in the `.` format. + // Skip the file if it does not match the format. parts := strings.Split(name, ".") if len(parts) != 2 { continue } - u, err := strconv.ParseUint(parts[0], 10, 64) + + // Get the column index from the file name. + columnIndexStr := parts[0] + columnIndex, err := strconv.ParseUint(columnIndexStr, 10, 64) if err != nil { - return mask, errors.Wrapf(err, "unexpected directory entry breaks listing, %s", parts[0]) + return nil, errors.Wrapf(err, "unexpected directory entry breaks listing, %s", parts[0]) } - if u >= fieldparams.NumberOfColumns { - return mask, errors.Wrapf(errIndexOutOfBounds, "invalid index %d", u) + + // If the column index is out of bounds, return an error. + if columnIndex >= fieldparams.NumberOfColumns { + return nil, errors.Wrapf(errIndexOutOfBounds, "invalid index %d", columnIndex) } - mask[u] = true + + // Mark the column index as in custody. + custody[columnIndex] = true } - return mask, nil + + return custody, nil } // Clear deletes all files on the filesystem. diff --git a/beacon-chain/db/kv/blocks.go b/beacon-chain/db/kv/blocks.go index 466afb3f6789..cc1a5fa71d46 100644 --- a/beacon-chain/db/kv/blocks.go +++ b/beacon-chain/db/kv/blocks.go @@ -23,10 +23,10 @@ import ( "go.opencensus.io/trace" ) -// used to represent errors for inconsistent slot ranges. +// Used to represent errors for inconsistent slot ranges. var errInvalidSlotRange = errors.New("invalid end slot and start slot provided") -// Block retrieval by root. +// Block retrieval by root. Return nil if block is not found. func (s *Store) Block(ctx context.Context, blockRoot [32]byte) (interfaces.ReadOnlySignedBeaconBlock, error) { ctx, span := trace.StartSpan(ctx, "BeaconDB.Block") defer span.End() diff --git a/beacon-chain/p2p/custody.go b/beacon-chain/p2p/custody.go index 960770d5633a..12971f563bbb 100644 --- a/beacon-chain/p2p/custody.go +++ b/beacon-chain/p2p/custody.go @@ -7,6 +7,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" "github.com/prysmaticlabs/prysm/v5/config/params" + "github.com/sirupsen/logrus" ) func (s *Service) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) { @@ -20,17 +21,15 @@ func (s *Service) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) { } var validPeers []peer.ID for _, pid := range peers { - remoteCount, err := s.CustodyCountFromRemotePeer(pid) - if err != nil { - return nil, err - } + remoteCount := s.CustodyCountFromRemotePeer(pid) + nodeId, err := ConvertPeerIDToNodeID(pid) if err != nil { - return nil, err + return nil, errors.Wrap(err, "convert peer ID to node ID") } remoteCustodiedColumns, err := peerdas.CustodyColumns(nodeId, remoteCount) if err != nil { - return nil, err + return nil, errors.Wrap(err, "custody columns") } invalidPeer := false for c := range custodiedColumns { @@ -49,24 +48,36 @@ func (s *Service) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) { return validPeers, nil } -func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) (uint64, error) { +func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) uint64 { + // By default, we assume the peer custodies the minimum number of subnets. + peerCustodyCountCount := params.BeaconConfig().CustodyRequirement + // Retrieve the ENR of the peer. peerRecord, err := s.peers.ENR(pid) if err != nil { - return 0, errors.Wrap(err, "ENR") + log.WithError(err).WithField("peerID", pid).Error("Failed to retrieve ENR for peer") + return peerCustodyCountCount } - peerCustodiedSubnetCount := params.BeaconConfig().CustodyRequirement - if peerRecord != nil { - // Load the `custody_subnet_count` - custodyBytes := make([]byte, 8) - custodyObj := CustodySubnetCount(custodyBytes) - if err := peerRecord.Load(&custodyObj); err != nil { - return 0, errors.Wrap(err, "load custody_subnet_count") - } - actualCustodyCount := ssz.UnmarshallUint64(custodyObj) - if actualCustodyCount > peerCustodiedSubnetCount { - peerCustodiedSubnetCount = actualCustodyCount - } + + if peerRecord == nil { + // This is the case for inbound peers. So we don't log an error for this. + log.WithField("peerID", pid).Debug("No ENR found for peer") + return peerCustodyCountCount + } + + // Load the `custody_subnet_count` + custodyObj := CustodySubnetCount(make([]byte, 8)) + if err := peerRecord.Load(&custodyObj); err != nil { + log.WithField("peerID", pid).Error("Cannot load the custody_subnet_count from peer") + return peerCustodyCountCount } - return peerCustodiedSubnetCount, nil + + // Unmarshal the custody count from the peer's ENR. + peerCustodyCountFromRecord := ssz.UnmarshallUint64(custodyObj) + log.WithFields(logrus.Fields{ + "peerID": pid, + "custodyCount": peerCustodyCountFromRecord, + }).Debug("Custody count read from peer's ENR") + + return peerCustodyCountFromRecord } diff --git a/beacon-chain/p2p/interfaces.go b/beacon-chain/p2p/interfaces.go index a6c0b25fccba..458f6ef29229 100644 --- a/beacon-chain/p2p/interfaces.go +++ b/beacon-chain/p2p/interfaces.go @@ -113,6 +113,6 @@ type MetadataProvider interface { } type CustodyHandler interface { - CustodyCountFromRemotePeer(peer.ID) (uint64, error) + CustodyCountFromRemotePeer(peer.ID) uint64 GetValidCustodyPeers([]peer.ID) ([]peer.ID, error) } diff --git a/beacon-chain/p2p/testing/fuzz_p2p.go b/beacon-chain/p2p/testing/fuzz_p2p.go index 4594efce455f..24089c1e972a 100644 --- a/beacon-chain/p2p/testing/fuzz_p2p.go +++ b/beacon-chain/p2p/testing/fuzz_p2p.go @@ -184,8 +184,8 @@ func (_ *FakeP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.Di return true, 0 } -func (_ *FakeP2P) CustodyCountFromRemotePeer(peer.ID) (uint64, error) { - return 0, nil +func (_ *FakeP2P) CustodyCountFromRemotePeer(peer.ID) uint64 { + return 0 } func (_ *FakeP2P) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) { diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index 1df9b8b35fd6..d595da7619ed 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -420,8 +420,8 @@ func (_ *TestP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.Di return true, 0 } -func (_ *TestP2P) CustodyCountFromRemotePeer(peer.ID) (uint64, error) { - return 0, nil +func (_ *TestP2P) CustodyCountFromRemotePeer(peer.ID) uint64 { + return 0 } func (_ *TestP2P) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) { diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go index 27b16e2ba5a0..fb2441fa45c7 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go @@ -292,7 +292,7 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign return nil, status.Errorf(codes.Internal, "%s: %v", "handle blinded block", err) } } else { - blobSidecars, dataColumnSideCars, err = handleUnblidedBlock(block, req, isPeerDASEnabled) + blobSidecars, dataColumnSideCars, err = handleUnblindedBlock(block, req, isPeerDASEnabled) if err != nil { return nil, status.Errorf(codes.Internal, "%s: %v", "handle unblided block", err) } @@ -376,7 +376,7 @@ func (vs *Server) handleBlindedBlock(ctx context.Context, block interfaces.Signe } // handleUnblindedBlock processes unblinded beacon blocks. -func handleUnblidedBlock(block interfaces.SignedBeaconBlock, req *ethpb.GenericSignedBeaconBlock, isPeerDASEnabled bool) ([]*ethpb.BlobSidecar, []*ethpb.DataColumnSidecar, error) { +func handleUnblindedBlock(block interfaces.SignedBeaconBlock, req *ethpb.GenericSignedBeaconBlock, isPeerDASEnabled bool) ([]*ethpb.BlobSidecar, []*ethpb.DataColumnSidecar, error) { dbBlockContents := req.GetDeneb() if dbBlockContents == nil { diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 4123d0885eb4..63d2555f89cd 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -7,6 +7,8 @@ go_library( "block_batcher.go", "broadcast_bls_changes.go", "context.go", + "data_columns_reconstruct.go", + "data_columns_sampling.go", "deadlines.go", "decode_pubsub.go", "doc.go", @@ -32,7 +34,6 @@ go_library( "rpc_ping.go", "rpc_send_request.go", "rpc_status.go", - "sampling_data_columns.go", "service.go", "subscriber.go", "subscriber_beacon_aggregate_proof.go", @@ -128,6 +129,7 @@ go_library( "//time:go_default_library", "//time/slots:go_default_library", "@com_github_btcsuite_btcd_btcec_v2//:go_default_library", + "@com_github_ethereum_c_kzg_4844//bindings/go:go_default_library", "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", "@com_github_ethereum_go_ethereum//common/math:go_default_library", "@com_github_ethereum_go_ethereum//crypto:go_default_library", diff --git a/beacon-chain/sync/data_columns_reconstruct.go b/beacon-chain/sync/data_columns_reconstruct.go new file mode 100644 index 000000000000..70b90ab2f5bc --- /dev/null +++ b/beacon-chain/sync/data_columns_reconstruct.go @@ -0,0 +1,187 @@ +package sync + +import ( + "context" + "fmt" + "time" + + cKzg4844 "github.com/ethereum/c-kzg-4844/bindings/go" + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed" + statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" + fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" + "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" + "github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces" + ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "github.com/sirupsen/logrus" +) + +// recoverBlobs recovers the blobs from the data column sidecars. +func recoverBlobs( + dataColumnSideCars []*ethpb.DataColumnSidecar, + columnsCount int, + blockRoot [fieldparams.RootLength]byte, +) ([]cKzg4844.Blob, error) { + recoveredBlobs := make([]cKzg4844.Blob, 0, fieldparams.MaxBlobsPerBlock) + + for blobIndex := 0; blobIndex < fieldparams.MaxBlobsPerBlock; blobIndex++ { + start := time.Now() + + cellsId := make([]uint64, 0, columnsCount) + cKzgCells := make([]cKzg4844.Cell, 0, columnsCount) + + for _, sidecar := range dataColumnSideCars { + // Build the cell ids. + cellsId = append(cellsId, sidecar.ColumnIndex) + + // Get the cell. + column := sidecar.DataColumn + cell := column[blobIndex] + + // Transform the cell as a cKzg cell. + var ckzgCell cKzg4844.Cell + for i := 0; i < cKzg4844.FieldElementsPerCell; i++ { + copy(ckzgCell[i][:], cell[32*i:32*(i+1)]) + } + + cKzgCells = append(cKzgCells, ckzgCell) + } + + // Recover the blob. + recoveredCells, err := cKzg4844.RecoverAllCells(cellsId, cKzgCells) + if err != nil { + return nil, errors.Wrapf(err, "recover all cells for blob %d", blobIndex) + } + + recoveredBlob, err := cKzg4844.CellsToBlob(recoveredCells) + if err != nil { + return nil, errors.Wrapf(err, "cells to blob for blob %d", blobIndex) + } + + recoveredBlobs = append(recoveredBlobs, recoveredBlob) + log.WithFields(logrus.Fields{ + "elapsed": time.Since(start), + "index": blobIndex, + "root": fmt.Sprintf("%x", blockRoot), + }).Debug("Recovered blob") + } + + return recoveredBlobs, nil +} + +// getSignedBlock retrieves the signed block corresponding to the given root. +// If the block is not available, it waits for it. +func (s *Service) getSignedBlock( + ctx context.Context, + blockRoot [fieldparams.RootLength]byte, +) (interfaces.ReadOnlySignedBeaconBlock, error) { + blocksChannel := make(chan *feed.Event, 1) + blockSub := s.cfg.blockNotifier.BlockFeed().Subscribe(blocksChannel) + defer blockSub.Unsubscribe() + + // Get the signedBlock corresponding to this root. + signedBlock, err := s.cfg.beaconDB.Block(ctx, blockRoot) + if err != nil { + return nil, errors.Wrap(err, "block") + } + + // If the block is here, return it. + if signedBlock != nil { + return signedBlock, nil + } + + // Wait for the block to be available. + for { + select { + case blockEvent := <-blocksChannel: + // Check the type of the event. + data, ok := blockEvent.Data.(*statefeed.BlockProcessedData) + if !ok || data == nil { + continue + } + + // Check if the block is the one we are looking for. + if data.BlockRoot != blockRoot { + continue + } + + // This is the block we are looking for. + return data.SignedBlock, nil + case err := <-blockSub.Err(): + return nil, errors.Wrap(err, "block subscriber error") + case <-ctx.Done(): + return nil, errors.New("context canceled") + } + } +} + +func (s *Service) reconstructDataColumns(ctx context.Context, verifiedRODataColumn blocks.VerifiedRODataColumn) error { + // Lock to prevent concurrent reconstruction. + s.dataColumsnReconstructionLock.Lock() + defer s.dataColumsnReconstructionLock.Unlock() + + // Get the block root. + blockRoot := verifiedRODataColumn.BlockRoot() + + // Get the columns we store. + storedColumnsIndices, err := s.cfg.blobStorage.ColumnIndices(blockRoot) + if err != nil { + return errors.Wrap(err, "columns indices") + } + + storedColumnsCount := len(storedColumnsIndices) + numberOfColumns := fieldparams.NumberOfColumns + + // If less than half of the columns are stored, reconstruction is not possible. + // If all columns are stored, no need to reconstruct. + if storedColumnsCount < numberOfColumns/2 || storedColumnsCount == numberOfColumns { + return nil + } + + // Load the data columns sidecars. + dataColumnSideCars := make([]*ethpb.DataColumnSidecar, 0, storedColumnsCount) + for index := range storedColumnsIndices { + dataColumnSidecar, err := s.cfg.blobStorage.GetColumn(blockRoot, index) + if err != nil { + return errors.Wrap(err, "get column") + } + + dataColumnSideCars = append(dataColumnSideCars, dataColumnSidecar) + } + + // Recover blobs. + recoveredBlobs, err := recoverBlobs(dataColumnSideCars, storedColumnsCount, blockRoot) + if err != nil { + return errors.Wrap(err, "recover blobs") + } + + // Get the signed block. + signedBlock, err := s.getSignedBlock(ctx, blockRoot) + if err != nil { + return errors.Wrap(err, "get signed block") + } + + // Reconstruct the data columns sidecars. + dataColumnSidecars, err := peerdas.DataColumnSidecars(signedBlock, recoveredBlobs) + if err != nil { + return errors.Wrap(err, "data column sidecars") + } + + // Save the data columns sidecars in the database. + for _, dataColumnSidecar := range dataColumnSidecars { + roDataColumn, err := blocks.NewRODataColumnWithRoot(dataColumnSidecar, blockRoot) + if err != nil { + return errors.Wrap(err, "new read-only data column with root") + } + + verifiedRoDataColumn := blocks.NewVerifiedRODataColumn(roDataColumn) + if err := s.cfg.blobStorage.SaveDataColumn(verifiedRoDataColumn); err != nil { + return errors.Wrap(err, "save column") + } + } + + log.WithField("root", fmt.Sprintf("%x", blockRoot)).Debug("Data columns reconstructed successfully") + + return nil +} diff --git a/beacon-chain/sync/data_columns_sampling.go b/beacon-chain/sync/data_columns_sampling.go new file mode 100644 index 000000000000..6cd0a2ca4245 --- /dev/null +++ b/beacon-chain/sync/data_columns_sampling.go @@ -0,0 +1,303 @@ +package sync + +import ( + "context" + "fmt" + "sort" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/ethereum/go-ethereum/common/math" + "github.com/ethereum/go-ethereum/crypto" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed" + statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" + fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" + "github.com/prysmaticlabs/prysm/v5/config/params" + "github.com/prysmaticlabs/prysm/v5/crypto/rand" + eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v5/runtime/version" + "github.com/sirupsen/logrus" +) + +// reandomIntegers returns a map of `count` random integers in the range [0, max[. +func randomIntegers(count uint64, max uint64) map[uint64]bool { + result := make(map[uint64]bool, count) + randGenerator := rand.NewGenerator() + + for uint64(len(result)) < count { + n := randGenerator.Uint64() % max + result[n] = true + } + + return result +} + +// sortedListFromMap returns a sorted list of keys from a map. +func sortedListFromMap(m map[uint64]bool) []uint64 { + result := make([]uint64, 0, len(m)) + for k := range m { + result = append(result, k) + } + + sort.Slice(result, func(i, j int) bool { + return result[i] < result[j] + }) + + return result +} + +// extractNodeID extracts the node ID from a peer ID. +func extractNodeID(pid peer.ID) ([32]byte, error) { + var nodeID [32]byte + + // Retrieve the public key object of the peer under "crypto" form. + pubkeyObjCrypto, err := pid.ExtractPublicKey() + if err != nil { + return nodeID, errors.Wrap(err, "extract public key") + } + + // Extract the bytes representation of the public key. + compressedPubKeyBytes, err := pubkeyObjCrypto.Raw() + if err != nil { + return nodeID, errors.Wrap(err, "public key raw") + } + + // Retrieve the public key object of the peer under "SECP256K1" form. + pubKeyObjSecp256k1, err := btcec.ParsePubKey(compressedPubKeyBytes) + if err != nil { + return nodeID, errors.Wrap(err, "parse public key") + } + + // Concatenate the X and Y coordinates represented in bytes. + buf := make([]byte, 64) + math.ReadBits(pubKeyObjSecp256k1.X(), buf[:32]) + math.ReadBits(pubKeyObjSecp256k1.Y(), buf[32:]) + + // Get the node ID by hashing the concatenated X and Y coordinates. + nodeIDBytes := crypto.Keccak256(buf) + copy(nodeID[:], nodeIDBytes) + + return nodeID, nil +} + +// sampleDataColumnFromPeer samples data columns from a peer. +// It returns the missing columns after sampling. +func (s *Service) sampleDataColumnFromPeer( + pid peer.ID, + columnsToSample map[uint64]bool, + requestedRoot [fieldparams.RootLength]byte, +) (map[uint64]bool, error) { + // Define missing columns. + missingColumns := make(map[uint64]bool, len(columnsToSample)) + for index := range columnsToSample { + missingColumns[index] = true + } + + // Retrieve the custody count of the peer. + peerCustodiedSubnetCount := s.cfg.p2p.CustodyCountFromRemotePeer(pid) + + // Extract the node ID from the peer ID. + nodeID, err := extractNodeID(pid) + if err != nil { + return nil, errors.Wrap(err, "extract node ID") + } + + // Determine which columns the peer should custody. + peerCustodiedColumns, err := peerdas.CustodyColumns(nodeID, peerCustodiedSubnetCount) + if err != nil { + return nil, errors.Wrap(err, "custody columns") + } + + peerCustodiedColumnsList := sortedListFromMap(peerCustodiedColumns) + + // Compute the intersection of the columns to sample and the columns the peer should custody. + peerRequestedColumns := make(map[uint64]bool, len(columnsToSample)) + for column := range columnsToSample { + if peerCustodiedColumns[column] { + peerRequestedColumns[column] = true + } + } + + peerRequestedColumnsList := sortedListFromMap(peerRequestedColumns) + + // Get the data column identifiers to sample from this peer. + dataColumnIdentifiers := make(types.BlobSidecarsByRootReq, 0, len(peerRequestedColumns)) + for index := range peerRequestedColumns { + dataColumnIdentifiers = append(dataColumnIdentifiers, ð.BlobIdentifier{ + BlockRoot: requestedRoot[:], + Index: index, + }) + } + + // Return early if there are no data columns to sample. + if len(dataColumnIdentifiers) == 0 { + log.WithFields(logrus.Fields{ + "peerID": pid, + "custodiedColumns": peerCustodiedColumnsList, + "requestedColumns": peerRequestedColumnsList, + }).Debug("Peer does not custody any of the requested columns") + return columnsToSample, nil + } + + // Sample data columns. + roDataColumns, err := SendDataColumnSidecarByRoot(s.ctx, s.cfg.clock, s.cfg.p2p, pid, s.ctxMap, &dataColumnIdentifiers) + if err != nil { + return nil, errors.Wrap(err, "send data column sidecar by root") + } + + peerRetrievedColumns := make(map[uint64]bool, len(roDataColumns)) + + // Remove retrieved items from rootsByDataColumnIndex. + for _, roDataColumn := range roDataColumns { + retrievedColumn := roDataColumn.ColumnIndex + + actualRoot := roDataColumn.BlockRoot() + if actualRoot != requestedRoot { + // TODO: Should we decrease the peer score here? + log.WithFields(logrus.Fields{ + "peerID": pid, + "requestedRoot": fmt.Sprintf("%#x", requestedRoot), + "actualRoot": fmt.Sprintf("%#x", actualRoot), + }).Warning("Actual root does not match requested root") + + continue + } + + peerRetrievedColumns[retrievedColumn] = true + + if !columnsToSample[retrievedColumn] { + // TODO: Should we decrease the peer score here? + log.WithFields(logrus.Fields{ + "peerID": pid, + "retrievedColumn": retrievedColumn, + "requestedColumns": peerRequestedColumnsList, + }).Warning("Retrieved column is was not requested") + } + + delete(missingColumns, retrievedColumn) + } + + peerRetrievedColumnsList := sortedListFromMap(peerRetrievedColumns) + remainingMissingColumnsList := sortedListFromMap(missingColumns) + + log.WithFields(logrus.Fields{ + "peerID": pid, + "custodiedColumns": peerCustodiedColumnsList, + "requestedColumns": peerRequestedColumnsList, + "retrievedColumns": peerRetrievedColumnsList, + "remainingMissingColumns": remainingMissingColumnsList, + }).Debug("Peer data column sampling summary") + + return missingColumns, nil +} + +// sampleDataColumns samples data columns from active peers. +func (s *Service) sampleDataColumns(requestedRoot [fieldparams.RootLength]byte, samplesCount uint64) error { + // Determine `samplesCount` random column indexes. + requestedColumns := randomIntegers(samplesCount, params.BeaconConfig().NumberOfColumns) + + missingColumns := make(map[uint64]bool, len(requestedColumns)) + for index := range requestedColumns { + missingColumns[index] = true + } + + // Get the active peers from the p2p service. + activePeers := s.cfg.p2p.Peers().Active() + + var err error + + // Sampling is done sequentially peer by peer. + // TODO: Add parallelism if (probably) needed. + for _, pid := range activePeers { + // Early exit if all needed columns are already sampled. (This is the happy path.) + if len(missingColumns) == 0 { + break + } + + // Sample data columns from the peer. + missingColumns, err = s.sampleDataColumnFromPeer(pid, missingColumns, requestedRoot) + if err != nil { + return errors.Wrap(err, "sample data column from peer") + } + } + + requestedColumnsList := sortedListFromMap(requestedColumns) + + if len(missingColumns) == 0 { + log.WithField("requestedColumns", requestedColumnsList).Debug("Successfully sampled all requested columns") + return nil + } + + missingColumnsList := sortedListFromMap(missingColumns) + log.WithFields(logrus.Fields{ + "requestedColumns": requestedColumnsList, + "missingColumns": missingColumnsList, + }).Warning("Failed to sample some requested columns") + + return nil +} + +func (s *Service) dataColumnSampling(ctx context.Context) { + // Create a subscription to the state feed. + stateChannel := make(chan *feed.Event, 1) + stateSub := s.cfg.stateNotifier.StateFeed().Subscribe(stateChannel) + + // Unsubscribe from the state feed when the function returns. + defer stateSub.Unsubscribe() + + for { + select { + case e := <-stateChannel: + if e.Type != statefeed.BlockProcessed { + continue + } + + data, ok := e.Data.(*statefeed.BlockProcessedData) + if !ok { + log.Error("Event feed data is not of type *statefeed.BlockProcessedData") + continue + } + + if !data.Verified { + // We only process blocks that have been verified + log.Error("Data is not verified") + continue + } + + if data.SignedBlock.Version() < version.Deneb { + log.Debug("Pre Deneb block, skipping data column sampling") + continue + } + + // Get the commitments for this block. + commitments, err := data.SignedBlock.Block().Body().BlobKzgCommitments() + if err != nil { + log.WithError(err).Error("Failed to get blob KZG commitments") + continue + } + + // Skip if there are no commitments. + if len(commitments) == 0 { + log.Debug("No commitments in block, skipping data column sampling") + continue + } + + dataColumnSamplingCount := params.BeaconConfig().SamplesPerSlot + + // Sample data columns. + if err := s.sampleDataColumns(data.BlockRoot, dataColumnSamplingCount); err != nil { + log.WithError(err).Error("Failed to sample data columns") + } + + case <-s.ctx.Done(): + log.Debug("Context closed, exiting goroutine") + return + + case err := <-stateSub.Err(): + log.WithError(err).Error("Subscription to state feed failed") + } + } +} diff --git a/beacon-chain/sync/initial-sync/BUILD.bazel b/beacon-chain/sync/initial-sync/BUILD.bazel index 68fc892aadcd..3b7056f4313a 100644 --- a/beacon-chain/sync/initial-sync/BUILD.bazel +++ b/beacon-chain/sync/initial-sync/BUILD.bazel @@ -34,6 +34,7 @@ go_library( "//beacon-chain/verification:go_default_library", "//cmd/beacon-chain/flags:go_default_library", "//config/features:go_default_library", + "//config/fieldparams:go_default_library", "//config/params:go_default_library", "//consensus-types/blocks:go_default_library", "//consensus-types/interfaces:go_default_library", diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 306d4f0902a6..eab1b1a79aca 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -15,6 +15,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain" blockfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/block" statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" "github.com/prysmaticlabs/prysm/v5/beacon-chain/das" "github.com/prysmaticlabs/prysm/v5/beacon-chain/db" "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem" @@ -25,6 +26,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/verification" "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" "github.com/prysmaticlabs/prysm/v5/config/features" + fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v5/crypto/rand" @@ -314,30 +316,56 @@ func missingBlobRequest(blk blocks.ROBlock, store *filesystem.BlobStorage) (p2pt return req, nil } -func missingColumnRequest(blk blocks.ROBlock, store *filesystem.BlobStorage) (p2ptypes.BlobSidecarsByRootReq, error) { - r := blk.Root() - if blk.Version() < version.Deneb { +func (s *Service) missingColumnRequest(roBlock blocks.ROBlock, store *filesystem.BlobStorage) (p2ptypes.BlobSidecarsByRootReq, error) { + // No columns for pre-Deneb blocks. + if roBlock.Version() < version.Deneb { return nil, nil } - cmts, err := blk.Block().Body().BlobKzgCommitments() + + // Get the block root. + blockRoot := roBlock.Root() + + // Get the commitments from the block. + commitments, err := roBlock.Block().Body().BlobKzgCommitments() if err != nil { - log.WithField("root", r).Error("Error reading commitments from checkpoint sync origin block") - return nil, err + return nil, errors.Wrap(err, "failed to get blob KZG commitments") } - if len(cmts) == 0 { + + // Return early if there are no commitments. + if len(commitments) == 0 { return nil, nil } - onDisk, err := store.ColumnIndices(r) + + // Check which columns are already on disk. + storedColumns, err := store.ColumnIndices(blockRoot) if err != nil { - return nil, errors.Wrapf(err, "error checking existing blobs for checkpoint sync block root %#x", r) + return nil, errors.Wrapf(err, "error checking existing blobs for checkpoint sync block root %#x", blockRoot) } - req := make(p2ptypes.BlobSidecarsByRootReq, 0, len(cmts)) - for i := range cmts { - if onDisk[i] { - continue + + // Get the number of columns we should custody. + custodyRequirement := params.BeaconConfig().CustodyRequirement + if features.Get().EnablePeerDAS { + custodyRequirement = fieldparams.NumberOfColumns + } + + // Get our node ID. + nodeID := s.cfg.P2P.NodeID() + + // Get the custodied columns. + custodiedColumns, err := peerdas.CustodyColumns(nodeID, custodyRequirement) + if err != nil { + return nil, errors.Wrap(err, "custody columns") + } + + // Build blob sidecars by root requests based on missing columns. + req := make(p2ptypes.BlobSidecarsByRootReq, 0, len(commitments)) + for columnIndex := range custodiedColumns { + isColumnAvailable := storedColumns[columnIndex] + if !isColumnAvailable { + req = append(req, ð.BlobIdentifier{BlockRoot: blockRoot[:], Index: columnIndex}) } - req = append(req, ð.BlobIdentifier{BlockRoot: r[:], Index: uint64(i)}) } + return req, nil } @@ -408,7 +436,7 @@ func (s *Service) fetchOriginColumns(pids []peer.ID) error { if err != nil { return err } - req, err := missingColumnRequest(rob, s.cfg.BlobStorage) + req, err := s.missingColumnRequest(rob, s.cfg.BlobStorage) if err != nil { return err } diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root.go b/beacon-chain/sync/rpc_beacon_blocks_by_root.go index f608669a9416..50209b4b0bc2 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root.go @@ -264,20 +264,34 @@ func (s *Service) constructPendingBlobsRequest(root [32]byte, commitments int) ( } func (s *Service) constructPendingColumnRequest(root [32]byte) (types.BlobSidecarsByRootReq, error) { - stored, err := s.cfg.blobStorage.ColumnIndices(root) + // Retrieve the storedColumns columns for the current root. + storedColumns, err := s.cfg.blobStorage.ColumnIndices(root) if err != nil { - return nil, err + return nil, errors.Wrap(err, "column indices") } + + // Compute how many subnets we should custody. custodiedSubnetCount := params.BeaconConfig().CustodyRequirement if flags.Get().SubscribeToAllSubnets { custodiedSubnetCount = params.BeaconConfig().DataColumnSidecarSubnetCount } + + // Retrieve the columns we should custody. custodiedColumns, err := peerdas.CustodyColumns(s.cfg.p2p.NodeID(), custodiedSubnetCount) if err != nil { - return nil, err + return nil, errors.Wrap(err, "custody columns") } - return requestsForMissingColumnIndices(stored, custodiedColumns, root), nil + // Build the request for the missing columns. + req := make(types.BlobSidecarsByRootReq, 0, len(custodiedColumns)) + for column := range custodiedColumns { + isColumnStored := storedColumns[column] + if !isColumnStored { + req = append(req, ð.BlobIdentifier{Index: column, BlockRoot: root[:]}) + } + } + + return req, nil } // requestsForMissingIndices constructs a slice of BlobIdentifiers that are missing from @@ -292,13 +306,3 @@ func requestsForMissingIndices(storedIndices [fieldparams.MaxBlobsPerBlock]bool, } return ids } - -func requestsForMissingColumnIndices(storedIndices [fieldparams.NumberOfColumns]bool, wantedIndices map[uint64]bool, root [32]byte) []*eth.BlobIdentifier { - var ids []*eth.BlobIdentifier - for i := range wantedIndices { - if !storedIndices[i] { - ids = append(ids, ð.BlobIdentifier{Index: i, BlockRoot: root[:]}) - } - } - return ids -} diff --git a/beacon-chain/sync/rpc_data_column_sidecars_by_root.go b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go index 2a7a74c1625f..c149a629556d 100644 --- a/beacon-chain/sync/rpc_data_column_sidecars_by_root.go +++ b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go @@ -28,10 +28,13 @@ import ( func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { ctx, span := trace.StartSpan(ctx, "sync.dataColumnSidecarByRootRPCHandler") defer span.End() + ctx, cancel := context.WithTimeout(ctx, ttfbTimeout) defer cancel() + SetRPCStreamDeadlines(stream) log := log.WithField("handler", p2p.DataColumnSidecarsByRootName[1:]) // slice the leading slash off the name var + // We use the same type as for blobs as they are the same data structure. // TODO: Make the type naming more generic to be extensible to data columns ref, ok := msg.(*types.BlobSidecarsByRootReq) @@ -39,19 +42,25 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int return errors.New("message is not type BlobSidecarsByRootReq") } - columnIdents := *ref - if err := validateDataColummnsByRootRequest(columnIdents); err != nil { + requestedColumnIdents := *ref + if err := validateDataColummnsByRootRequest(requestedColumnIdents); err != nil { s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream) - return err + return errors.Wrap(err, "validate data columns by root request") } + // Sort the identifiers so that requests for the same blob root will be adjacent, minimizing db lookups. - sort.Sort(columnIdents) + sort.Sort(requestedColumnIdents) + + requestedColumnsList := make([]uint64, 0, len(requestedColumnIdents)) + for _, ident := range requestedColumnIdents { + requestedColumnsList = append(requestedColumnsList, ident.Index) + } // TODO: Customize data column batches too batchSize := flags.Get().BlobBatchLimit var ticker *time.Ticker - if len(columnIdents) > batchSize { + if len(requestedColumnIdents) > batchSize { ticker = time.NewTicker(time.Second) } @@ -69,25 +78,50 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int } custodiedColumns, err := peerdas.CustodyColumns(s.cfg.p2p.NodeID(), custodiedSubnets) - if err != nil { log.WithError(err).Errorf("unexpected error retrieving the node id") s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) - return err + return errors.Wrap(err, "custody columns") } - for i := range columnIdents { + custodiedColumnsList := make([]uint64, 0, len(custodiedColumns)) + for column := range custodiedColumns { + custodiedColumnsList = append(custodiedColumnsList, column) + } + + // Sort the custodied columns by index. + sort.Slice(custodiedColumnsList, func(i, j int) bool { + return custodiedColumnsList[i] < custodiedColumnsList[j] + }) + + log.WithFields(logrus.Fields{ + "custodied": custodiedColumnsList, + "requested": requestedColumnsList, + "custodiedCount": len(custodiedColumnsList), + "requestedCount": len(requestedColumnsList), + }).Debug("Received data column sidecar by root request") + + for i := range requestedColumnIdents { if err := ctx.Err(); err != nil { closeStream(stream, log) - return err + return errors.Wrap(err, "context error") } // Throttle request processing to no more than batchSize/sec. if ticker != nil && i != 0 && i%batchSize == 0 { - <-ticker.C + for { + select { + case <-ticker.C: + log.Debug("Throttling data column sidecar request") + case <-ctx.Done(): + log.Debug("Context closed, exiting routine") + return nil + } + } } + s.rateLimiter.add(stream, 1) - root, idx := bytesutil.ToBytes32(columnIdents[i].BlockRoot), columnIdents[i].Index + root, idx := bytesutil.ToBytes32(requestedColumnIdents[i].BlockRoot), requestedColumnIdents[i].Index isCustodied := custodiedColumns[idx] if !isCustodied { @@ -124,7 +158,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int log.WithError(err).Errorf("unexpected db error retrieving data column, root=%x, index=%d", root, idx) s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) - return err + return errors.Wrap(err, "get column") } break @@ -137,7 +171,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int if sc.SignedBlockHeader.Header.Slot < minReqSlot { s.writeErrorResponseToStream(responseCodeResourceUnavailable, types.ErrDataColumnLTMinRequest.Error(), stream) log.WithError(types.ErrDataColumnLTMinRequest). - Debugf("requested data column for block %#x before minimum_request_epoch", columnIdents[i].BlockRoot) + Debugf("requested data column for block %#x before minimum_request_epoch", requestedColumnIdents[i].BlockRoot) return types.ErrDataColumnLTMinRequest } @@ -149,6 +183,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int return chunkErr } } + closeStream(stream, log) return nil } diff --git a/beacon-chain/sync/sampling_data_columns.go b/beacon-chain/sync/sampling_data_columns.go deleted file mode 100644 index 00c3f61f4ead..000000000000 --- a/beacon-chain/sync/sampling_data_columns.go +++ /dev/null @@ -1,219 +0,0 @@ -package sync - -import ( - "context" - "sort" - - "github.com/btcsuite/btcd/btcec/v2" - "github.com/ethereum/go-ethereum/common/math" - "github.com/ethereum/go-ethereum/crypto" - "github.com/pkg/errors" - "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed" - statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state" - "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" - "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" - fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" - "github.com/prysmaticlabs/prysm/v5/config/params" - "github.com/prysmaticlabs/prysm/v5/crypto/rand" - eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" - "github.com/prysmaticlabs/prysm/v5/runtime/version" - "github.com/sirupsen/logrus" -) - -// reandomIntegers returns a map of `count` random integers in the range [0, max[. -func randomIntegers(count uint64, max uint64) map[uint64]bool { - result := make(map[uint64]bool, count) - randGenerator := rand.NewGenerator() - - for uint64(len(result)) < count { - n := randGenerator.Uint64() % max - result[n] = true - } - - return result -} - -func (s *Service) sampleDataColumns(requestedRoot [fieldparams.RootLength]byte, samplesCount uint64) (map[uint64]bool, error) { - // Determine `samplesCount` random column indexes. - missingIndices := randomIntegers(samplesCount, params.BeaconConfig().NumberOfColumns) - - // Get the active peers from the p2p service. - activePeers := s.cfg.p2p.Peers().Active() - - // Sampling is done sequentially peer by peer. - // TODO: Add parallelism if (probably) needed. - for _, pid := range activePeers { - // Early exit if all needed columns are already sampled. - // This is the happy path. - if len(missingIndices) == 0 { - return nil, nil - } - peerCustodiedSubnetCount, err := s.cfg.p2p.CustodyCountFromRemotePeer(pid) - if err != nil { - return nil, err - } - - // Retrieve the public key object of the peer under "crypto" form. - pubkeyObjCrypto, err := pid.ExtractPublicKey() - if err != nil { - return nil, errors.Wrap(err, "extract public key") - } - - // Extract the bytes representation of the public key. - compressedPubKeyBytes, err := pubkeyObjCrypto.Raw() - if err != nil { - return nil, errors.Wrap(err, "public key raw") - } - - // Retrieve the public key object of the peer under "SECP256K1" form. - pubKeyObjSecp256k1, err := btcec.ParsePubKey(compressedPubKeyBytes) - if err != nil { - return nil, errors.Wrap(err, "parse public key") - } - - // Concatenate the X and Y coordinates represented in bytes. - buf := make([]byte, 64) - math.ReadBits(pubKeyObjSecp256k1.X(), buf[:32]) - math.ReadBits(pubKeyObjSecp256k1.Y(), buf[32:]) - - // Get the peer ID by hashing the concatenated X and Y coordinates. - peerIDBytes := crypto.Keccak256(buf) - - var peerID [32]byte - copy(peerID[:], peerIDBytes) - - // Determine which columns the peer should custody. - peerCustodiedColumns, err := peerdas.CustodyColumns(peerID, peerCustodiedSubnetCount) - if err != nil { - return nil, errors.Wrap(err, "custody columns") - } - - // Determine how many columns are yet missing. - missingColumnsCount := len(missingIndices) - - // Get the data column identifiers to sample from this particular peer. - dataColumnIdentifiers := make(types.BlobSidecarsByRootReq, 0, missingColumnsCount) - - for index := range missingIndices { - if peerCustodiedColumns[index] { - dataColumnIdentifiers = append(dataColumnIdentifiers, ð.BlobIdentifier{ - BlockRoot: requestedRoot[:], - Index: index, - }) - } - } - - // Skip the peer if there are no data columns to sample. - if len(dataColumnIdentifiers) == 0 { - continue - } - - // Sample data columns. - roDataColumns, err := SendDataColumnSidecarByRoot(s.ctx, s.cfg.clock, s.cfg.p2p, pid, s.ctxMap, &dataColumnIdentifiers) - if err != nil { - return nil, errors.Wrap(err, "send data column sidecar by root") - } - - // Remove retrieved items from rootsByDataColumnIndex. - for _, roDataColumn := range roDataColumns { - index := roDataColumn.ColumnIndex - - actualRoot := roDataColumn.BlockRoot() - if actualRoot != requestedRoot { - return nil, errors.Errorf("actual root (%#x) does not match requested root (%#x)", actualRoot, requestedRoot) - } - - delete(missingIndices, index) - } - } - - // We tried all our active peers and some columns are still missing. - // This is the unhappy path. - return missingIndices, nil -} - -func (s *Service) dataColumnSampling(ctx context.Context) { - // Create a subscription to the state feed. - stateChannel := make(chan *feed.Event, 1) - stateSub := s.cfg.stateNotifier.StateFeed().Subscribe(stateChannel) - - // Unsubscribe from the state feed when the function returns. - defer stateSub.Unsubscribe() - - for { - select { - case e := <-stateChannel: - if e.Type != statefeed.BlockProcessed { - continue - } - - data, ok := e.Data.(*statefeed.BlockProcessedData) - if !ok { - log.Error("Event feed data is not of type *statefeed.BlockProcessedData") - continue - } - - if !data.Verified { - // We only process blocks that have been verified - log.Error("Data is not verified") - continue - } - - if data.SignedBlock.Version() < version.Deneb { - log.Debug("Pre Deneb block, skipping data column sampling") - continue - } - - // Get the commitments for this block. - commitments, err := data.SignedBlock.Block().Body().BlobKzgCommitments() - if err != nil { - log.WithError(err).Error("Failed to get blob KZG commitments") - continue - } - - // Skip if there are no commitments. - if len(commitments) == 0 { - log.Debug("No commitments in block, skipping data column sampling") - continue - } - - dataColumnSamplingCount := params.BeaconConfig().SamplesPerSlot - - // Sample data columns. - missingColumns, err := s.sampleDataColumns(data.BlockRoot, dataColumnSamplingCount) - if err != nil { - log.WithError(err).Error("Failed to sample data columns") - continue - } - - missingColumnsCount := len(missingColumns) - - missingColumnsList := make([]uint64, 0, missingColumnsCount) - for column := range missingColumns { - missingColumnsList = append(missingColumnsList, column) - } - - // Sort the missing columns list. - sort.Slice(missingColumnsList, func(i, j int) bool { - return missingColumnsList[i] < missingColumnsList[j] - }) - - if missingColumnsCount > 0 { - log.WithFields(logrus.Fields{ - "missingColumns": missingColumnsList, - "sampledColumnsCount": dataColumnSamplingCount, - }).Warning("Failed to sample some data columns") - continue - } - - log.WithField("sampledColumnsCount", dataColumnSamplingCount).Info("Successfully sampled all data columns") - - case <-s.ctx.Done(): - log.Debug("Context closed, exiting goroutine") - return - - case err := <-stateSub.Err(): - log.WithError(err).Error("Subscription to state feed failed") - } - } -} diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index e4d81b22fdb2..1c1646b1b64b 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -164,6 +164,7 @@ type Service struct { verifierWaiter *verification.InitializerWaiter newBlobVerifier verification.NewBlobVerifier availableBlocker coverage.AvailableBlocker + dataColumsnReconstructionLock sync.Mutex ctxMap ContextByteVersions } diff --git a/beacon-chain/sync/subscriber_data_column_sidecar.go b/beacon-chain/sync/subscriber_data_column_sidecar.go index f8c1011c1d9c..879e2daa997b 100644 --- a/beacon-chain/sync/subscriber_data_column_sidecar.go +++ b/beacon-chain/sync/subscriber_data_column_sidecar.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed" opfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" @@ -29,5 +30,10 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e }, }) + // Reconstruct the data columns if needed. + if err := s.reconstructDataColumns(ctx, dc); err != nil { + return errors.Wrap(err, "reconstruct data columns") + } + return nil } diff --git a/go.mod b/go.mod index 969205e37aef..1a8fbe4277c1 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/emicklei/dot v0.11.0 github.com/ethereum/c-kzg-4844 v1.0.2-0.20240507203752-26d3b4156f7a github.com/ethereum/go-ethereum v1.13.5 + github.com/ferranbt/fastssz v0.0.0-20210120143747-11b9eff30ea9 github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 github.com/fsnotify/fsnotify v1.6.0 github.com/ghodss/yaml v1.0.0 @@ -135,7 +136,6 @@ require ( github.com/docker/go-units v0.5.0 // indirect github.com/dop251/goja v0.0.0-20230806174421-c933cf95e127 // indirect github.com/elastic/gosigar v0.14.2 // indirect - github.com/ferranbt/fastssz v0.0.0-20210120143747-11b9eff30ea9 // indirect github.com/flynn/noise v1.1.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect github.com/getsentry/sentry-go v0.25.0 // indirect diff --git a/runtime/interop/genesis.go b/runtime/interop/genesis.go index 6258b95902e4..952ffafb92ac 100644 --- a/runtime/interop/genesis.go +++ b/runtime/interop/genesis.go @@ -126,7 +126,6 @@ func GethPragueTime(genesisTime uint64, cfg *clparams.BeaconChainConfig) *uint64 // like in an e2e test. The parameters are minimal but the full value is returned unmarshaled so that it can be // customized as desired. func GethTestnetGenesis(genesisTime uint64, cfg *clparams.BeaconChainConfig) *core.Genesis { - shanghaiTime := GethShanghaiTime(genesisTime, cfg) cancunTime := GethCancunTime(genesisTime, cfg) pragueTime := GethPragueTime(genesisTime, cfg)