Skip to content

Commit

Permalink
Add Support For Discovery Of Column Subnets (#13883)
Browse files Browse the repository at this point in the history
* Add Support For Discovery Of Column Subnets

* Lint for SubnetsPerNode

* Manu's Review

* Change to a better name
  • Loading branch information
nisdas authored and nalepae committed Oct 7, 2024
1 parent c9b67c9 commit d095739
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 16 deletions.
1 change: 1 addition & 0 deletions beacon-chain/cache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"attestation_data.go",
"balance_cache_key.go",
"checkpoint_state.go",
"column_subnet_ids.go",
"committee.go",
"committee_disabled.go", # keep
"committees.go",
Expand Down
65 changes: 65 additions & 0 deletions beacon-chain/cache/column_subnet_ids.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package cache

import (
"sync"
"time"

"github.com/patrickmn/go-cache"
"github.com/prysmaticlabs/prysm/v5/config/params"
)

type columnSubnetIDs struct {
colSubCache *cache.Cache
colSubLock sync.RWMutex
}

// ColumnSubnetIDs for column subnet participants
var ColumnSubnetIDs = newColumnSubnetIDs()

const columnKey = "columns"

func newColumnSubnetIDs() *columnSubnetIDs {
epochDuration := time.Duration(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot))
// Set the default duration of a column subnet subscription as the column expiry period.
subLength := epochDuration * time.Duration(params.BeaconConfig().MinEpochsForDataColumnSidecarsRequest)
persistentCache := cache.New(subLength*time.Second, epochDuration*time.Second)
return &columnSubnetIDs{colSubCache: persistentCache}
}

// GetColumnSubnets retrieves the data column subnets.
func (s *columnSubnetIDs) GetColumnSubnets() ([]uint64, bool, time.Time) {
s.colSubLock.RLock()
defer s.colSubLock.RUnlock()

id, duration, ok := s.colSubCache.GetWithExpiration(columnKey)
if !ok {
return nil, false, time.Time{}
}
// Retrieve indices from the cache.
idxs, ok := id.([]uint64)
if !ok {
return nil, false, time.Time{}
}

return idxs, ok, duration
}

// AddColumnSubnets adds the relevant data column subnets.
func (s *columnSubnetIDs) AddColumnSubnets(colIdx []uint64) {
s.colSubLock.Lock()
defer s.colSubLock.Unlock()

s.colSubCache.Set(columnKey, colIdx, 0)
}

// EmptyAllCaches empties out all the related caches and flushes any stored
// entries on them. This should only ever be used for testing, in normal
// production, handling of the relevant subnets for each role is done
// separately.
func (s *columnSubnetIDs) EmptyAllCaches() {
// Clear the cache.
s.colSubLock.Lock()
defer s.colSubLock.Unlock()

s.colSubCache.Flush()
}
1 change: 1 addition & 0 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ go_library(
"//consensus-types/primitives:go_default_library",
"//consensus-types/wrapper:go_default_library",
"//container/leaky-bucket:go_default_library",
"//container/slice:go_default_library",
"//crypto/ecdsa:go_default_library",
"//crypto/hash:go_default_library",
"//encoding/bytesutil:go_default_library",
Expand Down
14 changes: 9 additions & 5 deletions beacon-chain/p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ type quicProtocol uint16
// quicProtocol is the "quic" key, which holds the QUIC port of the node.
func (quicProtocol) ENRKey() string { return "quic" }

// RefreshENR uses an epoch to refresh the enr entry for our node
// with the tracked committee ids for the epoch, allowing our node
// to be dynamically discoverable by others given our tracked committee ids.
func (s *Service) RefreshENR() {
// return early if discv5 isn't running
// RefreshPersistentSubnets checks that we are tracking our local persistent subnets for a variety of gossip topics.
// This routine checks for our attestation, sync committee and data column subnets and updates them if they have
// been rotated.
func (s *Service) RefreshPersistentSubnets() {
// return early if discv5 isnt running
if s.dv5Listener == nil || !s.isInitialized() {
return
}
Expand All @@ -60,6 +60,10 @@ func (s *Service) RefreshENR() {
log.WithError(err).Error("Could not initialize persistent subnets")
return
}
if err := initializePersistentColumnSubnets(s.dv5Listener.LocalNode().ID()); err != nil {
log.WithError(err).Error("Could not initialize persistent column subnets")
return
}

bitV := bitfield.NewBitvector64()
committees := cache.SubnetIDs.GetAllSubnets()
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func addPeer(t *testing.T, p *peers.Status, state peerdata.PeerConnectionState)
return id
}

func TestRefreshENR_ForkBoundaries(t *testing.T) {
func TestRefreshPersistentSubnets_ForkBoundaries(t *testing.T) {
params.SetupTestConfigCleanup(t)
// Clean up caches after usage.
defer cache.SubnetIDs.EmptyAllCaches()
Expand Down Expand Up @@ -601,7 +601,7 @@ func TestRefreshENR_ForkBoundaries(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := tt.svcBuilder(t)
s.RefreshENR()
s.RefreshPersistentSubnets()
tt.postValidation(t, s)
s.dv5Listener.Close()
cache.SubnetIDs.EmptyAllCaches()
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type PeerManager interface {
Host() host.Host
ENR() *enr.Record
DiscoveryAddresses() ([]multiaddr.Multiaddr, error)
RefreshENR()
RefreshPersistentSubnets()
FindPeersWithSubnet(ctx context.Context, topic string, subIndex uint64, threshold int) (bool, error)
AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error)
}
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,15 @@ func (s *Service) Start() {
}
// Initialize metadata according to the
// current epoch.
s.RefreshENR()
s.RefreshPersistentSubnets()

// Periodic functions.
async.RunEvery(s.ctx, params.BeaconConfig().TtfbTimeoutDuration(), func() {
ensurePeerConnections(s.ctx, s.host, s.peers, relayNodes...)
})
async.RunEvery(s.ctx, 30*time.Minute, s.Peers().Prune)
async.RunEvery(s.ctx, time.Duration(params.BeaconConfig().RespTimeout)*time.Second, s.updateMetrics)
async.RunEvery(s.ctx, refreshRate, s.RefreshENR)
async.RunEvery(s.ctx, refreshRate, s.RefreshPersistentSubnets)
async.RunEvery(s.ctx, 1*time.Minute, func() {
inboundQUICCount := len(s.peers.InboundConnectedWithProtocol(peers.QUIC))
inboundTCPCount := len(s.peers.InboundConnectedWithProtocol(peers.TCP))
Expand Down
64 changes: 64 additions & 0 deletions beacon-chain/p2p/subnets.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/consensus-types/wrapper"
"github.com/prysmaticlabs/prysm/v5/container/slice"
"github.com/prysmaticlabs/prysm/v5/crypto/hash"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
mathutil "github.com/prysmaticlabs/prysm/v5/math"
Expand Down Expand Up @@ -206,6 +207,19 @@ func initializePersistentSubnets(id enode.ID, epoch primitives.Epoch) error {
return nil
}

func initializePersistentColumnSubnets(id enode.ID) error {
_, ok, expTime := cache.ColumnSubnetIDs.GetColumnSubnets()
if ok && expTime.After(time.Now()) {
return nil
}
subs, err := computeSubscribedColumnSubnets(id)
if err != nil {
return err
}
cache.ColumnSubnetIDs.AddColumnSubnets(subs)
return nil
}

// Spec pseudocode definition:
//
// def compute_subscribed_subnets(node_id: NodeID, epoch: Epoch) -> Sequence[SubnetID]:
Expand All @@ -225,6 +239,46 @@ func computeSubscribedSubnets(nodeID enode.ID, epoch primitives.Epoch) ([]uint64
return subs, nil
}

func computeCustodyColumns(nodeID enode.ID) ([]uint64, error) {
subs, err := computeSubscribedColumnSubnets(nodeID)
if err != nil {
return nil, err
}
colsPerSub := params.BeaconConfig().NumberOfColumns / params.BeaconConfig().DataColumnSidecarSubnetCount
colIdxs := []uint64{}
for _, sub := range subs {
for i := uint64(0); i < colsPerSub; i++ {
colId := params.BeaconConfig().DataColumnSidecarSubnetCount*i + sub
colIdxs = append(colIdxs, colId)
}
}
return colIdxs, nil
}

func computeSubscribedColumnSubnets(nodeID enode.ID) ([]uint64, error) {
subnetsPerNode := params.BeaconConfig().CustodyRequirement
subs := make([]uint64, 0, subnetsPerNode)

for i := uint64(0); i < subnetsPerNode; i++ {
sub, err := computeSubscribedColumnSubnet(nodeID, i)
if err != nil {
return nil, err
}
if slice.IsInUint64(sub, subs) {
continue
}
subs = append(subs, sub)
}
isubnetsPerNode, err := mathutil.Int(subnetsPerNode)
if err != nil {
return nil, err
}
if len(subs) != isubnetsPerNode {
return nil, errors.Errorf("inconsistent subnet assignment: %d vs %d", len(subs), isubnetsPerNode)
}
return subs, nil
}

// Spec pseudocode definition:
//
// def compute_subscribed_subnet(node_id: NodeID, epoch: Epoch, index: int) -> SubnetID:
Expand All @@ -250,6 +304,16 @@ func computeSubscribedSubnet(nodeID enode.ID, epoch primitives.Epoch, index uint
return subnet, nil
}

func computeSubscribedColumnSubnet(nodeID enode.ID, index uint64) (uint64, error) {
num := uint256.NewInt(0).SetBytes(nodeID.Bytes())
num = num.Add(num, uint256.NewInt(index))
num64bit := num.Uint64()
byteNum := bytesutil.Uint64ToBytesLittleEndian(num64bit)
hashedObj := hash.Hash(byteNum)
subnetID := bytesutil.FromBytes8(hashedObj[:8]) % params.BeaconConfig().DataColumnSidecarSubnetCount
return subnetID, nil
}

func computeSubscriptionExpirationTime(nodeID enode.ID, epoch primitives.Epoch) time.Duration {
nodeOffset, _ := computeOffsetAndPrefix(nodeID)
pastEpochs := (nodeOffset + uint64(epoch)) % params.BeaconConfig().EpochsPerSubnetSubscription
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/testing/fuzz_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func (_ *FakeP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ i
return false, nil
}

// RefreshENR mocks the p2p func.
func (_ *FakeP2P) RefreshENR() {}
// RefreshPersistentSubnets mocks the p2p func.
func (_ *FakeP2P) RefreshPersistentSubnets() {}

// LeaveTopic -- fake.
func (_ *FakeP2P) LeaveTopic(_ string) error {
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/testing/mock_peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func (m MockPeerManager) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
return m.DiscoveryAddr, nil
}

// RefreshENR .
func (_ MockPeerManager) RefreshENR() {}
// RefreshPersistentSubnets .
func (_ MockPeerManager) RefreshPersistentSubnets() {}

// FindPeersWithSubnet .
func (_ MockPeerManager) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) {
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,8 @@ func (_ *TestP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ i
return false, nil
}

// RefreshENR mocks the p2p func.
func (_ *TestP2P) RefreshENR() {}
// RefreshPersistentSubnets mocks the p2p func.
func (_ *TestP2P) RefreshPersistentSubnets() {}

// ForkDigest mocks the p2p func.
func (p *TestP2P) ForkDigest() ([4]byte, error) {
Expand Down
3 changes: 3 additions & 0 deletions beacon-chain/sync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
params.BeaconConfig().BlobsidecarSubnetCount,
)
}
if features.Get().EnablePeerDAS {
// TODO: Subscribe to persistent column subnets here
}
}

// subscribe to a given topic with a given validator and subscription handler.
Expand Down

0 comments on commit d095739

Please sign in to comment.