Skip to content

Commit

Permalink
Implement peer DAS proposer RPC (#13922)
Browse files Browse the repository at this point in the history
* Remove capital letter from error messages.

* `[4]byte` => `[fieldparams.VersionLength]byte`.

* Prometheus: Remove extra `committee`.

They are probably due to a bad copy/paste.

Note: The name of the probe itself is remaining,
to ensure backward compatibility.

* Implement Proposer RPC for data columns.

* Fix TestProposer_ProposeBlock_OK test.

* Remove default peerDAS activation.

* `validateDataColumn`: Workaround to return a `VerifiedRODataColumn`
  • Loading branch information
nalepae committed Nov 22, 2024
1 parent 3afffb9 commit 059418b
Show file tree
Hide file tree
Showing 28 changed files with 479 additions and 77 deletions.
2 changes: 1 addition & 1 deletion beacon-chain/blockchain/receive_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type BlobReceiver interface {
// DataColumnReceiver interface defines the methods of chain service for receiving new
// data columns
type DataColumnReceiver interface {
ReceiveDataColumn(context.Context, *ethpb.DataColumnSidecar) error
ReceiveDataColumn(context.Context, blocks.VerifiedRODataColumn) error
}

// SlashingReceiver interface defines the methods of chain service for receiving validated slashing over the wire.
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/blockchain/receive_sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package blockchain
import (
"context"

ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
)

func (s *Service) ReceiveDataColumn(ctx context.Context, ds *ethpb.DataColumnSidecar) error {
func (s *Service) ReceiveDataColumn(ctx context.Context, verifiedRODataColumn blocks.VerifiedRODataColumn) error {
// TODO
return nil
}
5 changes: 5 additions & 0 deletions beacon-chain/blockchain/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ func (mb *mockBroadcaster) BroadcastBlob(_ context.Context, _ uint64, _ *ethpb.B
return nil
}

func (mb *mockBroadcaster) BroadcastDataColumn(_ context.Context, _ uint64, _ *ethpb.DataColumnSidecar) error {
mb.broadcastCalled = true
return nil
}

func (mb *mockBroadcaster) BroadcastBLSChanges(_ context.Context, _ []*ethpb.SignedBLSToExecutionChange) {
}

Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/blockchain/testing/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ func (c *ChainService) ReceiveBlob(_ context.Context, b blocks.VerifiedROBlob) e
}

// ReceiveDataColumn implements the same method in chain service
func (c *ChainService) ReceiveDataColumn(_ context.Context, _ *ethpb.DataColumnSidecar) error {
func (c *ChainService) ReceiveDataColumn(_ context.Context, _ blocks.VerifiedRODataColumn) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/core/feed/operation/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,5 @@ type AttesterSlashingReceivedData struct {
}

type DataColumnSidecarReceivedData struct {
DataColumn *ethpb.DataColumnSidecar
DataColumn *blocks.VerifiedRODataColumn
}
29 changes: 12 additions & 17 deletions beacon-chain/core/peerdas/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ import (
)

const (
// Number of field elements per extended blob
fieldElementsPerExtBlob = 2 * cKzg4844.FieldElementsPerBlob

// Bytes per cell
bytesPerCell = cKzg4844.FieldElementsPerCell * cKzg4844.BytesPerFieldElement

Expand All @@ -38,12 +35,8 @@ type (
)

var (
errCustodySubnetCountTooLarge = errors.New("custody subnet count larger than data column sidecar subnet count")
errCellNotFound = errors.New("cell not found (should never happen)")
errCurveOrder = errors.New("could not set bls curve order as big int")
errBlsFieldElementNil = errors.New("bls field element is nil")
errBlsFieldElementBiggerThanCurveOrder = errors.New("bls field element higher than curve order")
errBlsFieldElementDoesNotFit = errors.New("bls field element does not fit in BytesPerFieldElement")
errCustodySubnetCountTooLarge = errors.New("custody subnet count larger than data column sidecar subnet count")
errCellNotFound = errors.New("cell not found (should never happen)")
)

// https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/das-core.md#helper-functions
Expand Down Expand Up @@ -100,9 +93,9 @@ func CustodyColumnSubnets(nodeId enode.ID, custodySubnetCount uint64) (map[uint6
return subnetIds, nil
}

// computeExtendedMatrix computes the extended matrix from the blobs.
// ComputeExtendedMatrix computes the extended matrix from the blobs.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/das-core.md#compute_extended_matrix
func computeExtendedMatrix(blobs []cKzg4844.Blob) (extendedMatrix, error) {
func ComputeExtendedMatrix(blobs []cKzg4844.Blob) (extendedMatrix, error) {
matrix := make(extendedMatrix, 0, extendedMatrixSize)

for i := range blobs {
Expand All @@ -119,9 +112,9 @@ func computeExtendedMatrix(blobs []cKzg4844.Blob) (extendedMatrix, error) {
return matrix, nil
}

// recoverMatrix recovers the extended matrix from some cells.
// RecoverMatrix recovers the extended matrix from some cells.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/das-core.md#recover_matrix
func recoverMatrix(cellFromCoordinate map[cellCoordinate]cKzg4844.Cell, blobCount uint64) (extendedMatrix, error) {
func RecoverMatrix(cellFromCoordinate map[cellCoordinate]cKzg4844.Cell, blobCount uint64) (extendedMatrix, error) {
matrix := make(extendedMatrix, 0, extendedMatrixSize)

for blobIndex := uint64(0); blobIndex < blobCount; blobIndex++ {
Expand Down Expand Up @@ -160,7 +153,7 @@ func recoverMatrix(cellFromCoordinate map[cellCoordinate]cKzg4844.Cell, blobCoun
}

// https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/das-core.md#recover_matrix
func dataColumnSidecars(signedBlock interfaces.SignedBeaconBlock, blobs []cKzg4844.Blob) ([]ethpb.DataColumnSidecar, error) {
func DataColumnSidecars(signedBlock interfaces.SignedBeaconBlock, blobs []cKzg4844.Blob) ([]*ethpb.DataColumnSidecar, error) {
blobsCount := len(blobs)

// Get the signed block header.
Expand Down Expand Up @@ -201,7 +194,7 @@ func dataColumnSidecars(signedBlock interfaces.SignedBeaconBlock, blobs []cKzg48
}

// Get the column sidecars.
sidecars := make([]ethpb.DataColumnSidecar, cKzg4844.CellsPerExtBlob)
sidecars := make([]*ethpb.DataColumnSidecar, cKzg4844.CellsPerExtBlob)
for columnIndex := uint64(0); columnIndex < cKzg4844.CellsPerExtBlob; columnIndex++ {
column := make([]cKzg4844.Cell, 0, blobsCount)
kzgProofOfColumn := make([]cKzg4844.KZGProof, 0, blobsCount)
Expand Down Expand Up @@ -231,14 +224,16 @@ func dataColumnSidecars(signedBlock interfaces.SignedBeaconBlock, blobs []cKzg48
kzgProofOfColumnBytes = append(kzgProofOfColumnBytes, kzgProof[:])
}

sidecars = append(sidecars, ethpb.DataColumnSidecar{
sidecar := &ethpb.DataColumnSidecar{
ColumnIndex: columnIndex,
DataColumn: columnBytes,
KzgCommitments: blobKzgCommitments,
KzgProof: kzgProofOfColumnBytes,
SignedBlockHeader: signedBlockHeader,
KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof,
})
}

sidecars = append(sidecars, sidecar)
}

return sidecars, nil
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ go_library(
"//beacon-chain/startup:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/features:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//consensus-types/wrapper:go_default_library",
Expand Down
124 changes: 116 additions & 8 deletions beacon-chain/p2p/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
ssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/altair"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/crypto/hash"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing"
Expand Down Expand Up @@ -96,7 +97,12 @@ func (s *Service) BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint
return nil
}

func (s *Service) internalBroadcastAttestation(ctx context.Context, subnet uint64, att ethpb.Att, forkDigest [4]byte) {
func (s *Service) internalBroadcastAttestation(
ctx context.Context,
subnet uint64,
att ethpb.Att,
forkDigest [fieldparams.VersionLength]byte,
) {
_, span := trace.StartSpan(ctx, "p2p.internalBroadcastAttestation")
defer span.End()
ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline.
Expand Down Expand Up @@ -152,7 +158,7 @@ func (s *Service) internalBroadcastAttestation(ctx context.Context, subnet uint6
}
}

func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage, forkDigest [4]byte) {
func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage, forkDigest [fieldparams.VersionLength]byte) {
_, span := trace.StartSpan(ctx, "p2p.broadcastSyncCommittee")
defer span.End()
ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline.
Expand Down Expand Up @@ -228,7 +234,12 @@ func (s *Service) BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.
return nil
}

func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blobSidecar *ethpb.BlobSidecar, forkDigest [4]byte) {
func (s *Service) internalBroadcastBlob(
ctx context.Context,
subnet uint64,
blobSidecar *ethpb.BlobSidecar,
forkDigest [fieldparams.VersionLength]byte,
) {
_, span := trace.StartSpan(ctx, "p2p.internalBroadcastBlob")
defer span.End()
ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline.
Expand All @@ -243,7 +254,7 @@ func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blob
s.subnetLocker(wrappedSubIdx).RUnlock()

if !hasPeer {
blobSidecarCommitteeBroadcastAttempts.Inc()
blobSidecarBroadcastAttempts.Inc()
if err := func() error {
s.subnetLocker(wrappedSubIdx).Lock()
defer s.subnetLocker(wrappedSubIdx).Unlock()
Expand All @@ -252,7 +263,7 @@ func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blob
return err
}
if ok {
blobSidecarCommitteeBroadcasts.Inc()
blobSidecarBroadcasts.Inc()
return nil
}
return errors.New("failed to find peers for subnet")
Expand All @@ -268,6 +279,99 @@ func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blob
}
}

// BroadcastDataColumn broadcasts a data column to the p2p network, the message is assumed to be
// broadcasted to the current fork and to the input column subnet.
// TODO: Add tests
func (s *Service) BroadcastDataColumn(ctx context.Context, columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar) error {
// Add tracing to the function.
ctx, span := trace.StartSpan(ctx, "p2p.BroadcastBlob")
defer span.End()

// Ensure the data column sidecar is not nil.
if dataColumnSidecar == nil {
return errors.New("attempted to broadcast nil data column sidecar")
}

// Retrieve the current fork digest.
forkDigest, err := s.currentForkDigest()
if err != nil {
err := errors.Wrap(err, "current fork digest")
tracing.AnnotateError(span, err)
return err
}

// Non-blocking broadcast, with attempts to discover a column subnet peer if none available.
go s.internalBroadcastDataColumn(ctx, columnSubnet, dataColumnSidecar, forkDigest)

return nil
}

func (s *Service) internalBroadcastDataColumn(
ctx context.Context,
columnSubnet uint64,
dataColumnSidecar *ethpb.DataColumnSidecar,
forkDigest [fieldparams.VersionLength]byte,
) {
// Add tracing to the function.
_, span := trace.StartSpan(ctx, "p2p.internalBroadcastDataColumn")
defer span.End()

// Increase the number of broadcast attempts.
dataColumnSidecarBroadcastAttempts.Inc()

// Clear parent context / deadline.
ctx = trace.NewContext(context.Background(), span)

// Define a one-slot length context timeout.
oneSlot := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
ctx, cancel := context.WithTimeout(ctx, oneSlot)
defer cancel()

// Build the topic corresponding to this column subnet and this fork digest.
topic := dataColumnSubnetToTopic(columnSubnet, forkDigest)

// Compute the wrapped subnet index.
wrappedSubIdx := columnSubnet + dataColumnSubnetVal

// Check if we have peers with this subnet.
hasPeer := func() bool {
s.subnetLocker(wrappedSubIdx).RLock()
defer s.subnetLocker(wrappedSubIdx).RUnlock()

return s.hasPeerWithSubnet(topic)
}()

// If no peers are found, attempt to find peers with this subnet.
if !hasPeer {
if err := func() error {
s.subnetLocker(wrappedSubIdx).Lock()
defer s.subnetLocker(wrappedSubIdx).Unlock()

ok, err := s.FindPeersWithSubnet(ctx, topic, columnSubnet, 1 /*threshold*/)
if err != nil {
return errors.Wrap(err, "find peers for subnet")
}

if ok {
return nil
}
return errors.New("failed to find peers for subnet")
}(); err != nil {
log.WithError(err).Error("Failed to find peers")
tracing.AnnotateError(span, err)
}
}

// Broadcast the data column sidecar to the network.
if err := s.broadcastObject(ctx, dataColumnSidecar, topic); err != nil {
log.WithError(err).Error("Failed to broadcast blob sidecar")
tracing.AnnotateError(span, err)
}

// Increase the number of successful broadcasts.
blobSidecarBroadcasts.Inc()
}

// method to broadcast messages to other peers in our gossip mesh.
func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic string) error {
ctx, span := trace.StartSpan(ctx, "p2p.broadcastObject")
Expand Down Expand Up @@ -297,14 +401,18 @@ func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic
return nil
}

func attestationToTopic(subnet uint64, forkDigest [4]byte) string {
func attestationToTopic(subnet uint64, forkDigest [fieldparams.VersionLength]byte) string {
return fmt.Sprintf(AttestationSubnetTopicFormat, forkDigest, subnet)
}

func syncCommitteeToTopic(subnet uint64, forkDigest [4]byte) string {
func syncCommitteeToTopic(subnet uint64, forkDigest [fieldparams.VersionLength]byte) string {
return fmt.Sprintf(SyncCommitteeSubnetTopicFormat, forkDigest, subnet)
}

func blobSubnetToTopic(subnet uint64, forkDigest [4]byte) string {
func blobSubnetToTopic(subnet uint64, forkDigest [fieldparams.VersionLength]byte) string {
return fmt.Sprintf(BlobSubnetTopicFormat, forkDigest, subnet)
}

func dataColumnSubnetToTopic(subnet uint64, forkDigest [fieldparams.VersionLength]byte) string {
return fmt.Sprintf(DataColumnSubnetTopicFormat, forkDigest, subnet)
}
1 change: 1 addition & 0 deletions beacon-chain/p2p/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Broadcaster interface {
BroadcastAttestation(ctx context.Context, subnet uint64, att ethpb.Att) error
BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage) error
BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error
BroadcastDataColumn(ctx context.Context, columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar) error
}

// SetStreamHandler configures p2p to handle streams of a certain topic ID.
Expand Down
12 changes: 8 additions & 4 deletions beacon-chain/p2p/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,21 @@ var (
"the subnet. The beacon node increments this counter when the broadcast is blocked " +
"until a subnet peer can be found.",
})
blobSidecarCommitteeBroadcasts = promauto.NewCounter(prometheus.CounterOpts{
blobSidecarBroadcasts = promauto.NewCounter(prometheus.CounterOpts{
Name: "p2p_blob_sidecar_committee_broadcasts",
Help: "The number of blob sidecar committee messages that were broadcast with no peer on.",
Help: "The number of blob sidecar messages that were broadcast with no peer on.",
})
syncCommitteeBroadcastAttempts = promauto.NewCounter(prometheus.CounterOpts{
Name: "p2p_sync_committee_subnet_attempted_broadcasts",
Help: "The number of sync committee that were attempted to be broadcast.",
})
blobSidecarCommitteeBroadcastAttempts = promauto.NewCounter(prometheus.CounterOpts{
blobSidecarBroadcastAttempts = promauto.NewCounter(prometheus.CounterOpts{
Name: "p2p_blob_sidecar_committee_attempted_broadcasts",
Help: "The number of blob sidecar committee messages that were attempted to be broadcast.",
Help: "The number of blob sidecar messages that were attempted to be broadcast.",
})
dataColumnSidecarBroadcastAttempts = promauto.NewCounter(prometheus.CounterOpts{
Name: "p2p_data_column_sidecar_attempted_broadcasts",
Help: "The number of data column sidecar messages that were attempted to be broadcast.",
})

// Gossip Tracer Metrics
Expand Down
6 changes: 3 additions & 3 deletions beacon-chain/p2p/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,14 @@ func (s *Service) pubsubOptions() []pubsub.Option {
func parsePeersEnr(peers []string) ([]peer.AddrInfo, error) {
addrs, err := PeersFromStringAddrs(peers)
if err != nil {
return nil, fmt.Errorf("Cannot convert peers raw ENRs into multiaddresses: %w", err)
return nil, fmt.Errorf("cannot convert peers raw ENRs into multiaddresses: %w", err)
}
if len(addrs) == 0 {
return nil, fmt.Errorf("Converting peers raw ENRs into multiaddresses resulted in an empty list")
return nil, fmt.Errorf("converting peers raw ENRs into multiaddresses resulted in an empty list")
}
directAddrInfos, err := peer.AddrInfosFromP2pAddrs(addrs...)
if err != nil {
return nil, fmt.Errorf("Cannot convert peers multiaddresses into AddrInfos: %w", err)
return nil, fmt.Errorf("cannot convert peers multiaddresses into AddrInfos: %w", err)
}
return directAddrInfos, nil
}
Expand Down
Loading

0 comments on commit 059418b

Please sign in to comment.