diff --git a/beacon-chain/db/filesystem/blob.go b/beacon-chain/db/filesystem/blob.go index f7e518022d30..f5ed34626918 100644 --- a/beacon-chain/db/filesystem/blob.go +++ b/beacon-chain/db/filesystem/blob.go @@ -246,6 +246,20 @@ func (bs *BlobStorage) Get(root [32]byte, idx uint64) (blocks.VerifiedROBlob, er return verification.BlobSidecarNoop(ro) } +// GetColumn retrieves a single DataColumnSidecar by its root and index. +func (bs *BlobStorage) GetColumn(root [32]byte, idx uint64) (*ethpb.DataColumnSidecar, error) { + expected := blobNamer{root: root, index: idx} + encoded, err := afero.ReadFile(bs.fs, expected.path()) + if err != nil { + return nil, err + } + s := ðpb.DataColumnSidecar{} + if err := s.UnmarshalSSZ(encoded); err != nil { + return nil, err + } + return s, nil +} + // Remove removes all blobs for a given root. func (bs *BlobStorage) Remove(root [32]byte) error { rootDir := blobNamer{root: root}.dir() diff --git a/beacon-chain/p2p/interfaces.go b/beacon-chain/p2p/interfaces.go index 2f9a1d0ce2ad..0e7271244325 100644 --- a/beacon-chain/p2p/interfaces.go +++ b/beacon-chain/p2p/interfaces.go @@ -3,6 +3,7 @@ package p2p import ( "context" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/connmgr" @@ -81,6 +82,7 @@ type PeerManager interface { PeerID() peer.ID Host() host.Host ENR() *enr.Record + NodeID() enode.ID DiscoveryAddresses() ([]multiaddr.Multiaddr, error) RefreshPersistentSubnets() FindPeersWithSubnet(ctx context.Context, topic string, subIndex uint64, threshold int) (bool, error) diff --git a/beacon-chain/p2p/rpc_topic_mappings.go b/beacon-chain/p2p/rpc_topic_mappings.go index 7d9c5ebdff0a..a954cdba90e4 100644 --- a/beacon-chain/p2p/rpc_topic_mappings.go +++ b/beacon-chain/p2p/rpc_topic_mappings.go @@ -43,6 +43,9 @@ const BlobSidecarsByRangeName = "/blob_sidecars_by_range" // BlobSidecarsByRootName is the name for the BlobSidecarsByRoot v1 message topic. const BlobSidecarsByRootName = "/blob_sidecars_by_root" +// DataColumnSidecarsByRootName is the name for the DataColumnSidecarsByRoot v1 message topic. +const DataColumnSidecarsByRootName = "/data_column_sidecars_by_root" + const ( // V1 RPC Topics // RPCStatusTopicV1 defines the v1 topic for the status rpc method. @@ -65,6 +68,9 @@ const ( // RPCBlobSidecarsByRootTopicV1 is a topic for requesting blob sidecars by their block root. New in deneb. // /eth2/beacon_chain/req/blob_sidecars_by_root/1/ RPCBlobSidecarsByRootTopicV1 = protocolPrefix + BlobSidecarsByRootName + SchemaVersionV1 + // RPCDataColumnSidecarsByRootTopicV1 is a topic for requesting data column sidecars by their block root. New in PeerDAS. + // /eth2/beacon_chain/req/data_column_sidecars_by_root/1 + RPCDataColumnSidecarsByRootTopicV1 = protocolPrefix + DataColumnSidecarsByRootName + SchemaVersionV1 // V2 RPC Topics // RPCBlocksByRangeTopicV2 defines v2 the topic for the blocks by range rpc method. diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index 2ed5bc7af973..eab29cf6f5ef 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -358,6 +358,15 @@ func (s *Service) ENR() *enr.Record { return s.dv5Listener.Self().Record() } +// NodeID returns the local node's node ID +// for discovery. +func (s *Service) NodeID() enode.ID { + if s.dv5Listener == nil { + return [32]byte{} + } + return s.dv5Listener.Self().ID() +} + // DiscoveryAddresses represents our enr addresses as multiaddresses. func (s *Service) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { if s.dv5Listener == nil { diff --git a/beacon-chain/p2p/subnets.go b/beacon-chain/p2p/subnets.go index f3a009561e9b..0002b1376829 100644 --- a/beacon-chain/p2p/subnets.go +++ b/beacon-chain/p2p/subnets.go @@ -239,7 +239,7 @@ func computeSubscribedSubnets(nodeID enode.ID, epoch primitives.Epoch) ([]uint64 return subs, nil } -func computeCustodyColumns(nodeID enode.ID) ([]uint64, error) { +func ComputeCustodyColumns(nodeID enode.ID) ([]uint64, error) { subs, err := computeSubscribedColumnSubnets(nodeID) if err != nil { return nil, err diff --git a/beacon-chain/p2p/testing/fuzz_p2p.go b/beacon-chain/p2p/testing/fuzz_p2p.go index 8e9179f496e4..0180adbbdf9a 100644 --- a/beacon-chain/p2p/testing/fuzz_p2p.go +++ b/beacon-chain/p2p/testing/fuzz_p2p.go @@ -3,6 +3,7 @@ package testing import ( "context" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/control" @@ -55,6 +56,11 @@ func (_ *FakeP2P) ENR() *enr.Record { return new(enr.Record) } +// NodeID returns the node id of the local peer. +func (_ *FakeP2P) NodeID() enode.ID { + return [32]byte{} +} + // DiscoveryAddresses -- fake func (_ *FakeP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { return nil, nil diff --git a/beacon-chain/p2p/testing/mock_peermanager.go b/beacon-chain/p2p/testing/mock_peermanager.go index 1c17af6642bc..549d9a26d20f 100644 --- a/beacon-chain/p2p/testing/mock_peermanager.go +++ b/beacon-chain/p2p/testing/mock_peermanager.go @@ -4,6 +4,7 @@ import ( "context" "errors" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" @@ -39,6 +40,11 @@ func (m MockPeerManager) ENR() *enr.Record { return m.Enr } +// NodeID . +func (m MockPeerManager) NodeID() enode.ID { + return [32]byte{} +} + // DiscoveryAddresses . func (m MockPeerManager) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { if m.FailDiscoveryAddr { diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index 57e73f8cb128..2acb8962c3a5 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" pubsub "github.com/libp2p/go-libp2p-pubsub" core "github.com/libp2p/go-libp2p/core" @@ -263,6 +264,11 @@ func (_ *TestP2P) ENR() *enr.Record { return new(enr.Record) } +// NodeID returns the node id of the local peer. +func (_ *TestP2P) NodeID() enode.ID { + return [32]byte{} +} + // DiscoveryAddresses -- func (_ *TestP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { return nil, nil diff --git a/beacon-chain/p2p/types/rpc_errors.go b/beacon-chain/p2p/types/rpc_errors.go index 46381876c118..2b88bef64c95 100644 --- a/beacon-chain/p2p/types/rpc_errors.go +++ b/beacon-chain/p2p/types/rpc_errors.go @@ -9,10 +9,15 @@ var ( ErrInvalidSequenceNum = errors.New("invalid sequence number provided") ErrGeneric = errors.New("internal service error") - ErrRateLimited = errors.New("rate limited") - ErrIODeadline = errors.New("i/o deadline exceeded") - ErrInvalidRequest = errors.New("invalid range, step or count") - ErrBlobLTMinRequest = errors.New("blob slot < minimum_request_epoch") - ErrMaxBlobReqExceeded = errors.New("requested more than MAX_REQUEST_BLOB_SIDECARS") + ErrRateLimited = errors.New("rate limited") + ErrIODeadline = errors.New("i/o deadline exceeded") + ErrInvalidRequest = errors.New("invalid range, step or count") + ErrBlobLTMinRequest = errors.New("blob epoch < minimum_request_epoch") + + ErrDataColumnLTMinRequest = errors.New("data column epoch < minimum_request_epoch") + ErrMaxBlobReqExceeded = errors.New("requested more than MAX_REQUEST_BLOB_SIDECARS") + ErrMaxDataColumnReqExceeded = errors.New("requested more than MAX_REQUEST_DATA_COLUMN_SIDECARS") + ErrResourceUnavailable = errors.New("resource requested unavailable") + ErrInvalidColumnIndex = errors.New("invalid column index requested") ) diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index ca8d049c2e48..ee9123729713 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "rpc_blob_sidecars_by_range.go", "rpc_blob_sidecars_by_root.go", "rpc_chunked_response.go", + "rpc_data_column_sidecars_by_root.go", "rpc_goodbye.go", "rpc_metadata.go", "rpc_ping.go", diff --git a/beacon-chain/sync/rpc.go b/beacon-chain/sync/rpc.go index 7ada5cc609f3..0cc3677ab240 100644 --- a/beacon-chain/sync/rpc.go +++ b/beacon-chain/sync/rpc.go @@ -14,6 +14,7 @@ import ( ssz "github.com/prysmaticlabs/fastssz" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" p2ptypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" + "github.com/prysmaticlabs/prysm/v5/config/features" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/monitoring/tracing" "github.com/prysmaticlabs/prysm/v5/time/slots" @@ -99,6 +100,13 @@ func (s *Service) registerRPCHandlersAltair() { } func (s *Service) registerRPCHandlersDeneb() { + if features.Get().EnablePeerDAS { + s.registerRPC( + p2p.RPCDataColumnSidecarsByRootTopicV1, + s.dataColumnSidecarByRootRPCHandler, + ) + return + } s.registerRPC( p2p.RPCBlobSidecarsByRangeTopicV1, s.blobSidecarsByRangeRPCHandler, diff --git a/beacon-chain/sync/rpc_chunked_response.go b/beacon-chain/sync/rpc_chunked_response.go index 6eac6fc8ff3d..0b6d9ce0eea6 100644 --- a/beacon-chain/sync/rpc_chunked_response.go +++ b/beacon-chain/sync/rpc_chunked_response.go @@ -11,6 +11,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces" "github.com/prysmaticlabs/prysm/v5/network/forks" + ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/runtime/version" "github.com/prysmaticlabs/prysm/v5/time/slots" ) @@ -155,3 +156,22 @@ func WriteBlobSidecarChunk(stream libp2pcore.Stream, tor blockchain.TemporalOrac _, err = encoding.EncodeWithMaxLength(stream, sidecar) return err } + +// WriteDataColumnSidecarChunk writes data column chunk object to stream. +// response_chunk ::= | | | +func WriteDataColumnSidecarChunk(stream libp2pcore.Stream, tor blockchain.TemporalOracle, encoding encoder.NetworkEncoding, sidecar *ethpb.DataColumnSidecar) error { + if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { + return err + } + valRoot := tor.GenesisValidatorsRoot() + ctxBytes, err := forks.ForkDigestFromEpoch(slots.ToEpoch(sidecar.SignedBlockHeader.Header.Slot), valRoot[:]) + if err != nil { + return err + } + + if err := writeContextToStream(ctxBytes[:], stream); err != nil { + return err + } + _, err = encoding.EncodeWithMaxLength(stream, sidecar) + return err +} diff --git a/beacon-chain/sync/rpc_data_column_sidecars_by_root.go b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go new file mode 100644 index 000000000000..9f4e6e0d6090 --- /dev/null +++ b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go @@ -0,0 +1,151 @@ +package sync + +import ( + "context" + "fmt" + "math" + "sort" + "time" + + libp2pcore "github.com/libp2p/go-libp2p/core" + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/db" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" + "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" + "github.com/prysmaticlabs/prysm/v5/config/features" + "github.com/prysmaticlabs/prysm/v5/config/params" + "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" + "github.com/prysmaticlabs/prysm/v5/monitoring/tracing" + "github.com/prysmaticlabs/prysm/v5/time/slots" + "github.com/sirupsen/logrus" + "go.opencensus.io/trace" +) + +func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { + ctx, span := trace.StartSpan(ctx, "sync.dataColumnSidecarByRootRPCHandler") + defer span.End() + ctx, cancel := context.WithTimeout(ctx, ttfbTimeout) + defer cancel() + SetRPCStreamDeadlines(stream) + log := log.WithField("handler", p2p.DataColumnSidecarsByRootName[1:]) // slice the leading slash off the name var + // We use the same type as for blobs as they are the same data structure. + // TODO: Make the type naming more generic to be extensible to data columns + ref, ok := msg.(*types.BlobSidecarsByRootReq) + if !ok { + return errors.New("message is not type BlobSidecarsByRootReq") + } + + columnIdents := *ref + if err := validateDataColummnsByRootRequest(columnIdents); err != nil { + s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) + s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream) + return err + } + // Sort the identifiers so that requests for the same blob root will be adjacent, minimizing db lookups. + sort.Sort(columnIdents) + + // TODO: Customize data column batches too + batchSize := flags.Get().BlobBatchLimit + var ticker *time.Ticker + if len(columnIdents) > batchSize { + ticker = time.NewTicker(time.Second) + } + + // Compute the oldest slot we'll allow a peer to request, based on the current slot. + cs := s.cfg.clock.CurrentSlot() + minReqSlot, err := DataColumnsRPCMinValidSlot(cs) + if err != nil { + return errors.Wrapf(err, "unexpected error computing min valid blob request slot, current_slot=%d", cs) + } + + for i := range columnIdents { + if err := ctx.Err(); err != nil { + closeStream(stream, log) + return err + } + + // Throttle request processing to no more than batchSize/sec. + if ticker != nil && i != 0 && i%batchSize == 0 { + <-ticker.C + } + s.rateLimiter.add(stream, 1) + root, idx := bytesutil.ToBytes32(columnIdents[i].BlockRoot), columnIdents[i].Index + custodiedColumns, err := p2p.ComputeCustodyColumns(s.cfg.p2p.NodeID()) + if err != nil { + log.WithError(err).Errorf("unexpected error retrieving the node id") + s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) + return err + } + isCustodied := false + for _, col := range custodiedColumns { + if col == idx { + isCustodied = true + break + } + } + if !isCustodied { + s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) + s.writeErrorResponseToStream(responseCodeInvalidRequest, types.ErrInvalidColumnIndex.Error(), stream) + return types.ErrInvalidColumnIndex + } + + // TODO: Differentiate between blobs and columns for our storage engine + sc, err := s.cfg.blobStorage.GetColumn(root, idx) + if err != nil { + if db.IsNotFound(err) { + log.WithError(err).WithFields(logrus.Fields{ + "root": fmt.Sprintf("%#x", root), + "index": idx, + }).Debugf("Peer requested data column sidecar by root not found in db") + continue + } + log.WithError(err).Errorf("unexpected db error retrieving data column, root=%x, index=%d", root, idx) + s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) + return err + } + + // If any root in the request content references a block earlier than minimum_request_epoch, + // peers MAY respond with error code 3: ResourceUnavailable or not include the data column in the response. + // note: we are deviating from the spec to allow requests for data column that are before minimum_request_epoch, + // up to the beginning of the retention period. + if sc.SignedBlockHeader.Header.Slot < minReqSlot { + s.writeErrorResponseToStream(responseCodeResourceUnavailable, types.ErrDataColumnLTMinRequest.Error(), stream) + log.WithError(types.ErrDataColumnLTMinRequest). + Debugf("requested data column for block %#x before minimum_request_epoch", columnIdents[i].BlockRoot) + return types.ErrDataColumnLTMinRequest + } + + SetStreamWriteDeadline(stream, defaultWriteDuration) + if chunkErr := WriteDataColumnSidecarChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), sc); chunkErr != nil { + log.WithError(chunkErr).Debug("Could not send a chunked response") + s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) + tracing.AnnotateError(span, chunkErr) + return chunkErr + } + } + closeStream(stream, log) + return nil +} + +func validateDataColummnsByRootRequest(colIdents types.BlobSidecarsByRootReq) error { + if uint64(len(colIdents)) > params.BeaconConfig().MaxRequestDataColumnSidecars { + return types.ErrMaxDataColumnReqExceeded + } + return nil +} + +func DataColumnsRPCMinValidSlot(current primitives.Slot) (primitives.Slot, error) { + // Avoid overflow if we're running on a config where deneb is set to far future epoch. + if params.BeaconConfig().DenebForkEpoch == math.MaxUint64 || !features.Get().EnablePeerDAS { + return primitives.Slot(math.MaxUint64), nil + } + minReqEpochs := params.BeaconConfig().MinEpochsForDataColumnSidecarsRequest + currEpoch := slots.ToEpoch(current) + minStart := params.BeaconConfig().DenebForkEpoch + if currEpoch > minReqEpochs && currEpoch-minReqEpochs > minStart { + minStart = currEpoch - minReqEpochs + } + return slots.EpochStart(minStart) +} diff --git a/beacon-chain/sync/rpc_send_request.go b/beacon-chain/sync/rpc_send_request.go index 30edbb09cf51..44014a82f185 100644 --- a/beacon-chain/sync/rpc_send_request.go +++ b/beacon-chain/sync/rpc_send_request.go @@ -208,6 +208,29 @@ func SendBlobSidecarByRoot( return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobValidatorFromRootReq(req), max) } +func SendDataColumnSidecarByRoot( + ctx context.Context, tor blockchain.TemporalOracle, p2pApi p2p.P2P, pid peer.ID, + ctxMap ContextByteVersions, req *p2ptypes.BlobSidecarsByRootReq, +) ([]blocks.ROBlob, error) { + if uint64(len(*req)) > params.BeaconConfig().MaxRequestDataColumnSidecars { + return nil, errors.Wrapf(p2ptypes.ErrMaxDataColumnReqExceeded, "length=%d", len(*req)) + } + + topic, err := p2p.TopicFromMessage(p2p.DataColumnSidecarsByRootName, slots.ToEpoch(tor.CurrentSlot())) + if err != nil { + return nil, err + } + log.WithField("topic", topic).Debug("Sending data column sidecar request") + stream, err := p2pApi.Send(ctx, req, topic, pid) + if err != nil { + return nil, err + } + defer closeStream(stream, log) + + maxCol := params.BeaconConfig().MaxRequestDataColumnSidecars + return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobValidatorFromRootReq(req), maxCol) +} + // BlobResponseValidation represents a function that can validate aspects of a single unmarshaled blob // that was received from a peer in response to an rpc request. type BlobResponseValidation func(blocks.ROBlob) error