Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nalepae committed Apr 26, 2024
1 parent 6c005c2 commit 2630855
Show file tree
Hide file tree
Showing 24 changed files with 448 additions and 73 deletions.
2 changes: 1 addition & 1 deletion beacon-chain/blockchain/receive_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type BlobReceiver interface {
// DataColumnReceiver interface defines the methods of chain service for receiving new
// data columns
type DataColumnReceiver interface {
ReceiveDataColumn(context.Context, *ethpb.DataColumnSidecar) error
ReceiveDataColumn(context.Context, blocks.VerifiedRODataColumn) error
}

// SlashingReceiver interface defines the methods of chain service for receiving validated slashing over the wire.
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/blockchain/receive_sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package blockchain
import (
"context"

ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
)

func (s *Service) ReceiveDataColumn(ctx context.Context, ds *ethpb.DataColumnSidecar) error {
func (s *Service) ReceiveDataColumn(ctx context.Context, verifiedRODataColumn blocks.VerifiedRODataColumn) error {
// TODO
return nil
}
5 changes: 5 additions & 0 deletions beacon-chain/blockchain/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ func (mb *mockBroadcaster) BroadcastBlob(_ context.Context, _ uint64, _ *ethpb.B
return nil
}

func (mb *mockBroadcaster) BroadcastDataColumn(_ context.Context, _ uint64, _ *ethpb.DataColumnSidecar) error {
mb.broadcastCalled = true
return nil
}

func (mb *mockBroadcaster) BroadcastBLSChanges(_ context.Context, _ []*ethpb.SignedBLSToExecutionChange) {
}

Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/blockchain/testing/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ func (c *ChainService) ReceiveBlob(_ context.Context, b blocks.VerifiedROBlob) e
}

// ReceiveDataColumn implements the same method in chain service
func (c *ChainService) ReceiveDataColumn(_ context.Context, _ *ethpb.DataColumnSidecar) error {
func (c *ChainService) ReceiveDataColumn(_ context.Context, _ blocks.VerifiedRODataColumn) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/core/feed/operation/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,5 @@ type AttesterSlashingReceivedData struct {
}

type DataColumnSidecarReceivedData struct {
DataColumn *ethpb.DataColumnSidecar
DataColumn *blocks.VerifiedRODataColumn
}
10 changes: 6 additions & 4 deletions beacon-chain/core/peerdas/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func recoverMatrix(cellFromCoordinate map[cellCoordinate]cKzg4844.Cell, blobCoun
}

// https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/das-core.md#recover_matrix
func dataColumnSidecars(signedBlock interfaces.SignedBeaconBlock, blobs []cKzg4844.Blob) ([]ethpb.DataColumnSidecar, error) {
func DataColumnSidecars(signedBlock interfaces.SignedBeaconBlock, blobs []cKzg4844.Blob) ([]*ethpb.DataColumnSidecar, error) {
blobsCount := len(blobs)

// Get the signed block header.
Expand Down Expand Up @@ -201,7 +201,7 @@ func dataColumnSidecars(signedBlock interfaces.SignedBeaconBlock, blobs []cKzg48
}

// Get the column sidecars.
sidecars := make([]ethpb.DataColumnSidecar, cKzg4844.CellsPerExtBlob)
sidecars := make([]*ethpb.DataColumnSidecar, cKzg4844.CellsPerExtBlob)
for columnIndex := uint64(0); columnIndex < cKzg4844.CellsPerExtBlob; columnIndex++ {
column := make([]cKzg4844.Cell, 0, blobsCount)
kzgProofOfColumn := make([]cKzg4844.KZGProof, 0, blobsCount)
Expand Down Expand Up @@ -231,14 +231,16 @@ func dataColumnSidecars(signedBlock interfaces.SignedBeaconBlock, blobs []cKzg48
kzgProofOfColumnBytes = append(kzgProofOfColumnBytes, kzgProof[:])
}

sidecars = append(sidecars, ethpb.DataColumnSidecar{
sidecar := &ethpb.DataColumnSidecar{
ColumnIndex: columnIndex,
DataColumn: columnBytes,
KzgCommitments: blobKzgCommitments,
KzgProof: kzgProofOfColumnBytes,
SignedBlockHeader: signedBlockHeader,
KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof,
})
}

sidecars = append(sidecars, sidecar)
}

return sidecars, nil
Expand Down
97 changes: 97 additions & 0 deletions beacon-chain/p2p/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,99 @@ func (s *Service) internalBroadcastBlob(
}
}

// BroadcastDataColumn broadcasts a data column to the p2p network, the message is assumed to be
// broadcasted to the current fork and to the input column subnet.
func (s *Service) BroadcastDataColumn(ctx context.Context, columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar) error {
// Add tracing to the function.
ctx, span := trace.StartSpan(ctx, "p2p.BroadcastBlob")
defer span.End()

// Ensure the data column sidecar is not nil.
if dataColumnSidecar == nil {
return errors.New("attempted to broadcast nil data column sidecar")
}

// Retrieve the current fork digest.
forkDigest, err := s.currentForkDigest()
if err != nil {
err := errors.Wrap(err, "current fork digest")
tracing.AnnotateError(span, err)
return err
}

// Non-blocking broadcast, with attempts to discover a column subnet peer if none available.
go s.internalBroadcastDataColumn(ctx, columnSubnet, dataColumnSidecar, forkDigest)

return nil
}

func (s *Service) internalBroadcastDataColumn(
ctx context.Context,
columnSubnet uint64,
dataColumnSidecar *ethpb.DataColumnSidecar,
forkDigest [fieldparams.VersionLength]byte,
) {
// Add tracing to the function.
_, span := trace.StartSpan(ctx, "p2p.internalBroadcastDataColumn")
defer span.End()

// Increase the number of broadcast attempts.
dataColumnSidecarBroadcastAttempts.Inc()

// Clear parent context / deadline.
ctx = trace.NewContext(context.Background(), span)

// Define a one-slot lenght context timeout.
oneSlot := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
ctx, cancel := context.WithTimeout(ctx, oneSlot)
defer cancel()

// Build the topic corresponding to this column subnet and this fork digest.
topic := dataColumnSubnetToTopic(columnSubnet, forkDigest)

// Compute the wrapped subnet index.
wrappedSubIdx := columnSubnet + dataColumnSubnetVal

// Check if we have peers with this subnet.
hasPeer := func() bool {

s.subnetLocker(wrappedSubIdx).RLock()
defer s.subnetLocker(wrappedSubIdx).RUnlock()

return s.hasPeerWithSubnet(topic)
}()

// If no peers are found, attempt to find peers with this subnet.
if !hasPeer {
if err := func() error {
s.subnetLocker(wrappedSubIdx).Lock()
defer s.subnetLocker(wrappedSubIdx).Unlock()

ok, err := s.FindPeersWithSubnet(ctx, topic, columnSubnet, 1 /*threshold*/)
if err != nil {
return errors.Wrap(err, "find peers for subnet")
}

if ok {
return nil
}
return errors.New("failed to find peers for subnet")
}(); err != nil {
log.WithError(err).Error("Failed to find peers")
tracing.AnnotateError(span, err)
}
}

// Broadcast the data column sidecar to the network.
if err := s.broadcastObject(ctx, dataColumnSidecar, topic); err != nil {
log.WithError(err).Error("Failed to broadcast blob sidecar")
tracing.AnnotateError(span, err)
}

// Increase the number of successful broadcasts.
blobSidecarBroadcasts.Inc()
}

// method to broadcast messages to other peers in our gossip mesh.
func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic string) error {
ctx, span := trace.StartSpan(ctx, "p2p.broadcastObject")
Expand Down Expand Up @@ -319,3 +412,7 @@ func syncCommitteeToTopic(subnet uint64, forkDigest [fieldparams.VersionLength]b
func blobSubnetToTopic(subnet uint64, forkDigest [fieldparams.VersionLength]byte) string {
return fmt.Sprintf(BlobSubnetTopicFormat, forkDigest, subnet)
}

func dataColumnSubnetToTopic(subnet uint64, forkDigest [fieldparams.VersionLength]byte) string {
return fmt.Sprintf(DataColumnSubnetTopicFormat, forkDigest, subnet)
}
1 change: 1 addition & 0 deletions beacon-chain/p2p/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Broadcaster interface {
BroadcastAttestation(ctx context.Context, subnet uint64, att *ethpb.Attestation) error
BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage) error
BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error
BroadcastDataColumn(ctx context.Context, columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar) error
}

// SetStreamHandler configures p2p to handle streams of a certain topic ID.
Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/p2p/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ var (
Name: "p2p_blob_sidecar_committee_attempted_broadcasts",
Help: "The number of blob sidecar messages that were attempted to be broadcast.",
})
dataColumnSidecarBroadcastAttempts = promauto.NewCounter(prometheus.CounterOpts{
Name: "p2p_data_column_sidecar_attempted_broadcasts",
Help: "The number of data column sidecar messages that were attempted to be broadcast.",
})

// Gossip Tracer Metrics
pubsubTopicsActive = promauto.NewGaugeVec(prometheus.GaugeOpts{
Expand Down
20 changes: 14 additions & 6 deletions beacon-chain/p2p/subnets.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ var syncCommsSubnetEnrKey = params.BeaconNetworkConfig().SyncCommsSubnetKey
// The value used with the subnet, inorder
// to create an appropriate key to retrieve
// the relevant lock. This is used to differentiate
// sync subnets from attestation subnets. This is deliberately
// chosen as more than 64(attestation subnet count).
// sync subnets from others. This is deliberately
// chosen as more than 64 (attestation subnet count).
const syncLockerVal = 100

// The value used with the blob sidecar subnet, in order
Expand All @@ -46,6 +46,13 @@ const syncLockerVal = 100
// chosen more than sync and attestation subnet combined.
const blobSubnetLockerVal = 110

// The value used with the data column sidecar subnet, in order
// to create an appropriate key to retrieve
// the relevant lock. This is used to differentiate
// data column subnets from others. This is deliberately
// chosen more than sync, attestation and blob subnet (6) combined.
const dataColumnSubnetVal = 150

// FindPeersWithSubnet performs a network search for peers
// subscribed to a particular subnet. Then it tries to connect
// with those peers. This method will block until either:
Expand Down Expand Up @@ -371,10 +378,11 @@ func syncBitvector(record *enr.Record) (bitfield.Bitvector4, error) {

// The subnet locker is a map which keeps track of all
// mutexes stored per subnet. This locker is re-used
// between both the attestation and sync subnets. In
// order to differentiate between attestation and sync
// subnets. Sync subnets are stored by (subnet+syncLockerVal). This
// is to prevent conflicts while allowing both subnets
// between both the attestation, sync and blob subnets.
// Sync subnets are stored by (subnet+syncLockerVal).
// Blob subnets are stored by (subnet+blobSubnetLockerVal).
// Data column subnets are stored by (subnet+dataColumnSubnetVal).
// This is to prevent conflicts while allowing subnets
// to use a single locker.
func (s *Service) subnetLocker(i uint64) *sync.RWMutex {
s.subnetsLockLock.Lock()
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/p2p/testing/fuzz_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ func (_ *FakeP2P) BroadcastBlob(_ context.Context, _ uint64, _ *ethpb.BlobSideca
return nil
}

// BroadcastDataColumn -- fake.
func (_ *FakeP2P) BroadcastDataColumn(_ context.Context, _ uint64, _ *ethpb.DataColumnSidecar) error {
return nil
}

// InterceptPeerDial -- fake.
func (_ *FakeP2P) InterceptPeerDial(peer.ID) (allow bool) {
return true
Expand Down
6 changes: 6 additions & 0 deletions beacon-chain/p2p/testing/mock_broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ func (m *MockBroadcaster) BroadcastBlob(context.Context, uint64, *ethpb.BlobSide
return nil
}

// BroadcastDataColumn broadcasts a data column for mock.
func (m *MockBroadcaster) BroadcastDataColumn(context.Context, uint64, *ethpb.DataColumnSidecar) error {
m.BroadcastCalled.Store(true)
return nil
}

// NumMessages returns the number of messages broadcasted.
func (m *MockBroadcaster) NumMessages() int {
m.msgLock.Lock()
Expand Down
6 changes: 6 additions & 0 deletions beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ func (p *TestP2P) BroadcastBlob(context.Context, uint64, *ethpb.BlobSidecar) err
return nil
}

// BroadcastDataColumn broadcasts a data column for mock.
func (p *TestP2P) BroadcastDataColumn(context.Context, uint64, *ethpb.DataColumnSidecar) error {
p.BroadcastCalled.Store(true)
return nil
}

// SetStreamHandler for RPC.
func (p *TestP2P) SetStreamHandler(topic string, handler network.StreamHandler) {
p.BHost.SetStreamHandler(protocol.ID(topic), handler)
Expand Down
Loading

0 comments on commit 2630855

Please sign in to comment.