Skip to content

Commit

Permalink
Add Request And Response RPC Methods For Data Columns (#13909)
Browse files Browse the repository at this point in the history
* Add RPC Handler

* Add Column Requests

* Update beacon-chain/db/filesystem/blob.go

Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>

* Update beacon-chain/p2p/rpc_topic_mappings.go

Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>

* Manu's Review

* Manu's Review

* Interface Fixes

* mock manager

---------

Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>
  • Loading branch information
nisdas and nalepae committed Nov 20, 2024
1 parent 40e554c commit e7282cb
Show file tree
Hide file tree
Showing 14 changed files with 263 additions and 6 deletions.
14 changes: 14 additions & 0 deletions beacon-chain/db/filesystem/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,20 @@ func (bs *BlobStorage) Get(root [32]byte, idx uint64) (blocks.VerifiedROBlob, er
return verification.BlobSidecarNoop(ro)
}

// GetColumn retrieves a single DataColumnSidecar by its root and index.
func (bs *BlobStorage) GetColumn(root [32]byte, idx uint64) (*ethpb.DataColumnSidecar, error) {
expected := blobNamer{root: root, index: idx}
encoded, err := afero.ReadFile(bs.fs, expected.path())
if err != nil {
return nil, err
}
s := &ethpb.DataColumnSidecar{}
if err := s.UnmarshalSSZ(encoded); err != nil {
return nil, err
}
return s, nil
}

// Remove removes all blobs for a given root.
func (bs *BlobStorage) Remove(root [32]byte) error {
rootDir := blobNamer{root: root}.dir()
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/p2p/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package p2p
import (
"context"

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/connmgr"
Expand Down Expand Up @@ -81,6 +82,7 @@ type PeerManager interface {
PeerID() peer.ID
Host() host.Host
ENR() *enr.Record
NodeID() enode.ID
DiscoveryAddresses() ([]multiaddr.Multiaddr, error)
RefreshPersistentSubnets()
FindPeersWithSubnet(ctx context.Context, topic string, subIndex uint64, threshold int) (bool, error)
Expand Down
6 changes: 6 additions & 0 deletions beacon-chain/p2p/rpc_topic_mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ const BlobSidecarsByRangeName = "/blob_sidecars_by_range"
// BlobSidecarsByRootName is the name for the BlobSidecarsByRoot v1 message topic.
const BlobSidecarsByRootName = "/blob_sidecars_by_root"

// DataColumnSidecarsByRootName is the name for the DataColumnSidecarsByRoot v1 message topic.
const DataColumnSidecarsByRootName = "/data_column_sidecars_by_root"

const (
// V1 RPC Topics
// RPCStatusTopicV1 defines the v1 topic for the status rpc method.
Expand All @@ -65,6 +68,9 @@ const (
// RPCBlobSidecarsByRootTopicV1 is a topic for requesting blob sidecars by their block root. New in deneb.
// /eth2/beacon_chain/req/blob_sidecars_by_root/1/
RPCBlobSidecarsByRootTopicV1 = protocolPrefix + BlobSidecarsByRootName + SchemaVersionV1
// RPCDataColumnSidecarsByRootTopicV1 is a topic for requesting data column sidecars by their block root. New in PeerDAS.
// /eth2/beacon_chain/req/data_column_sidecars_by_root/1
RPCDataColumnSidecarsByRootTopicV1 = protocolPrefix + DataColumnSidecarsByRootName + SchemaVersionV1

// V2 RPC Topics
// RPCBlocksByRangeTopicV2 defines v2 the topic for the blocks by range rpc method.
Expand Down
9 changes: 9 additions & 0 deletions beacon-chain/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,15 @@ func (s *Service) ENR() *enr.Record {
return s.dv5Listener.Self().Record()
}

// NodeID returns the local node's node ID
// for discovery.
func (s *Service) NodeID() enode.ID {
if s.dv5Listener == nil {
return [32]byte{}
}
return s.dv5Listener.Self().ID()
}

// DiscoveryAddresses represents our enr addresses as multiaddresses.
func (s *Service) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
if s.dv5Listener == nil {
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/subnets.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func computeSubscribedSubnets(nodeID enode.ID, epoch primitives.Epoch) ([]uint64
return subs, nil
}

func computeCustodyColumns(nodeID enode.ID) ([]uint64, error) {
func ComputeCustodyColumns(nodeID enode.ID) ([]uint64, error) {
subs, err := computeSubscribedColumnSubnets(nodeID)
if err != nil {
return nil, err
Expand Down
6 changes: 6 additions & 0 deletions beacon-chain/p2p/testing/fuzz_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package testing
import (
"context"

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/control"
Expand Down Expand Up @@ -55,6 +56,11 @@ func (_ *FakeP2P) ENR() *enr.Record {
return new(enr.Record)
}

// NodeID returns the node id of the local peer.
func (_ *FakeP2P) NodeID() enode.ID {
return [32]byte{}
}

// DiscoveryAddresses -- fake
func (_ *FakeP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
return nil, nil
Expand Down
6 changes: 6 additions & 0 deletions beacon-chain/p2p/testing/mock_peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -39,6 +40,11 @@ func (m MockPeerManager) ENR() *enr.Record {
return m.Enr
}

// NodeID .
func (m MockPeerManager) NodeID() enode.ID {
return [32]byte{}
}

// DiscoveryAddresses .
func (m MockPeerManager) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
if m.FailDiscoveryAddr {
Expand Down
6 changes: 6 additions & 0 deletions beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand Down Expand Up @@ -270,6 +271,11 @@ func (_ *TestP2P) ENR() *enr.Record {
return new(enr.Record)
}

// NodeID returns the node id of the local peer.
func (_ *TestP2P) NodeID() enode.ID {
return [32]byte{}
}

// DiscoveryAddresses --
func (_ *TestP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
return nil, nil
Expand Down
15 changes: 10 additions & 5 deletions beacon-chain/p2p/types/rpc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@ var (
ErrInvalidSequenceNum = errors.New("invalid sequence number provided")
ErrGeneric = errors.New("internal service error")

ErrRateLimited = errors.New("rate limited")
ErrIODeadline = errors.New("i/o deadline exceeded")
ErrInvalidRequest = errors.New("invalid range, step or count")
ErrBlobLTMinRequest = errors.New("blob slot < minimum_request_epoch")
ErrMaxBlobReqExceeded = errors.New("requested more than MAX_REQUEST_BLOB_SIDECARS")
ErrRateLimited = errors.New("rate limited")
ErrIODeadline = errors.New("i/o deadline exceeded")
ErrInvalidRequest = errors.New("invalid range, step or count")
ErrBlobLTMinRequest = errors.New("blob epoch < minimum_request_epoch")

ErrDataColumnLTMinRequest = errors.New("data column epoch < minimum_request_epoch")
ErrMaxBlobReqExceeded = errors.New("requested more than MAX_REQUEST_BLOB_SIDECARS")
ErrMaxDataColumnReqExceeded = errors.New("requested more than MAX_REQUEST_DATA_COLUMN_SIDECARS")

ErrResourceUnavailable = errors.New("resource requested unavailable")
ErrInvalidColumnIndex = errors.New("invalid column index requested")
)
1 change: 1 addition & 0 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"rpc_blob_sidecars_by_range.go",
"rpc_blob_sidecars_by_root.go",
"rpc_chunked_response.go",
"rpc_data_column_sidecars_by_root.go",
"rpc_goodbye.go",
"rpc_metadata.go",
"rpc_ping.go",
Expand Down
8 changes: 8 additions & 0 deletions beacon-chain/sync/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
ssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
Expand Down Expand Up @@ -99,6 +100,13 @@ func (s *Service) registerRPCHandlersAltair() {
}

func (s *Service) registerRPCHandlersDeneb() {
if features.Get().EnablePeerDAS {
s.registerRPC(
p2p.RPCDataColumnSidecarsByRootTopicV1,
s.dataColumnSidecarByRootRPCHandler,
)
return
}
s.registerRPC(
p2p.RPCBlobSidecarsByRangeTopicV1,
s.blobSidecarsByRangeRPCHandler,
Expand Down
20 changes: 20 additions & 0 deletions beacon-chain/sync/rpc_chunked_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/network/forks"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots"
)
Expand Down Expand Up @@ -155,3 +156,22 @@ func WriteBlobSidecarChunk(stream libp2pcore.Stream, tor blockchain.TemporalOrac
_, err = encoding.EncodeWithMaxLength(stream, sidecar)
return err
}

// WriteDataColumnSidecarChunk writes data column chunk object to stream.
// response_chunk ::= <result> | <context-bytes> | <encoding-dependent-header> | <encoded-payload>
func WriteDataColumnSidecarChunk(stream libp2pcore.Stream, tor blockchain.TemporalOracle, encoding encoder.NetworkEncoding, sidecar *ethpb.DataColumnSidecar) error {
if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {
return err
}
valRoot := tor.GenesisValidatorsRoot()
ctxBytes, err := forks.ForkDigestFromEpoch(slots.ToEpoch(sidecar.SignedBlockHeader.Header.Slot), valRoot[:])
if err != nil {
return err
}

if err := writeContextToStream(ctxBytes[:], stream); err != nil {
return err
}
_, err = encoding.EncodeWithMaxLength(stream, sidecar)
return err
}
151 changes: 151 additions & 0 deletions beacon-chain/sync/rpc_data_column_sidecars_by_root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package sync

import (
"context"
"fmt"
"math"
"sort"
"time"

libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
)

func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
ctx, span := trace.StartSpan(ctx, "sync.dataColumnSidecarByRootRPCHandler")
defer span.End()
ctx, cancel := context.WithTimeout(ctx, ttfbTimeout)
defer cancel()
SetRPCStreamDeadlines(stream)
log := log.WithField("handler", p2p.DataColumnSidecarsByRootName[1:]) // slice the leading slash off the name var
// We use the same type as for blobs as they are the same data structure.
// TODO: Make the type naming more generic to be extensible to data columns
ref, ok := msg.(*types.BlobSidecarsByRootReq)
if !ok {
return errors.New("message is not type BlobSidecarsByRootReq")
}

columnIdents := *ref
if err := validateDataColummnsByRootRequest(columnIdents); err != nil {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
return err
}
// Sort the identifiers so that requests for the same blob root will be adjacent, minimizing db lookups.
sort.Sort(columnIdents)

// TODO: Customize data column batches too
batchSize := flags.Get().BlobBatchLimit
var ticker *time.Ticker
if len(columnIdents) > batchSize {
ticker = time.NewTicker(time.Second)
}

// Compute the oldest slot we'll allow a peer to request, based on the current slot.
cs := s.cfg.clock.CurrentSlot()
minReqSlot, err := DataColumnsRPCMinValidSlot(cs)
if err != nil {
return errors.Wrapf(err, "unexpected error computing min valid blob request slot, current_slot=%d", cs)
}

for i := range columnIdents {
if err := ctx.Err(); err != nil {
closeStream(stream, log)
return err
}

// Throttle request processing to no more than batchSize/sec.
if ticker != nil && i != 0 && i%batchSize == 0 {
<-ticker.C
}
s.rateLimiter.add(stream, 1)
root, idx := bytesutil.ToBytes32(columnIdents[i].BlockRoot), columnIdents[i].Index
custodiedColumns, err := p2p.ComputeCustodyColumns(s.cfg.p2p.NodeID())
if err != nil {
log.WithError(err).Errorf("unexpected error retrieving the node id")
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return err
}
isCustodied := false
for _, col := range custodiedColumns {
if col == idx {
isCustodied = true
break
}
}
if !isCustodied {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.writeErrorResponseToStream(responseCodeInvalidRequest, types.ErrInvalidColumnIndex.Error(), stream)
return types.ErrInvalidColumnIndex
}

// TODO: Differentiate between blobs and columns for our storage engine
sc, err := s.cfg.blobStorage.GetColumn(root, idx)
if err != nil {
if db.IsNotFound(err) {
log.WithError(err).WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"index": idx,
}).Debugf("Peer requested data column sidecar by root not found in db")
continue
}
log.WithError(err).Errorf("unexpected db error retrieving data column, root=%x, index=%d", root, idx)
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return err
}

// If any root in the request content references a block earlier than minimum_request_epoch,
// peers MAY respond with error code 3: ResourceUnavailable or not include the data column in the response.
// note: we are deviating from the spec to allow requests for data column that are before minimum_request_epoch,
// up to the beginning of the retention period.
if sc.SignedBlockHeader.Header.Slot < minReqSlot {
s.writeErrorResponseToStream(responseCodeResourceUnavailable, types.ErrDataColumnLTMinRequest.Error(), stream)
log.WithError(types.ErrDataColumnLTMinRequest).
Debugf("requested data column for block %#x before minimum_request_epoch", columnIdents[i].BlockRoot)
return types.ErrDataColumnLTMinRequest
}

SetStreamWriteDeadline(stream, defaultWriteDuration)
if chunkErr := WriteDataColumnSidecarChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), sc); chunkErr != nil {
log.WithError(chunkErr).Debug("Could not send a chunked response")
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
tracing.AnnotateError(span, chunkErr)
return chunkErr
}
}
closeStream(stream, log)
return nil
}

func validateDataColummnsByRootRequest(colIdents types.BlobSidecarsByRootReq) error {
if uint64(len(colIdents)) > params.BeaconConfig().MaxRequestDataColumnSidecars {
return types.ErrMaxDataColumnReqExceeded
}
return nil
}

func DataColumnsRPCMinValidSlot(current primitives.Slot) (primitives.Slot, error) {
// Avoid overflow if we're running on a config where deneb is set to far future epoch.
if params.BeaconConfig().DenebForkEpoch == math.MaxUint64 || !features.Get().EnablePeerDAS {
return primitives.Slot(math.MaxUint64), nil
}
minReqEpochs := params.BeaconConfig().MinEpochsForDataColumnSidecarsRequest
currEpoch := slots.ToEpoch(current)
minStart := params.BeaconConfig().DenebForkEpoch
if currEpoch > minReqEpochs && currEpoch-minReqEpochs > minStart {
minStart = currEpoch - minReqEpochs
}
return slots.EpochStart(minStart)
}
Loading

0 comments on commit e7282cb

Please sign in to comment.