Skip to content

Commit

Permalink
subscribeDynamicWithSyncSubnets: Use exactly the same subscription …
Browse files Browse the repository at this point in the history
…function initially and every slot.
  • Loading branch information
nalepae committed Sep 3, 2024
1 parent 7d02a3c commit eee87bb
Showing 1 changed file with 122 additions and 72 deletions.
194 changes: 122 additions & 72 deletions beacon-chain/sync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/features"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/container/slice"
Expand Down Expand Up @@ -454,10 +455,8 @@ func (s *Service) subscribeDynamicWithSubnets(
return
}
wantedSubs := s.retrievePersistentSubs(currentSlot)
// Resize as appropriate.
s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest)

// subscribe desired aggregator subnets.
for _, idx := range wantedSubs {
s.subscribeAggregatorSubnet(subscriptions, idx, digest, validate, handle)
}
Expand All @@ -471,9 +470,15 @@ func (s *Service) subscribeDynamicWithSubnets(
}()
}

// revalidate that our currently connected subnets are valid.
func (s *Service) reValidateSubscriptions(subscriptions map[uint64]*pubsub.Subscription,
wantedSubs []uint64, topicFormat string, digest [4]byte) {
// reValidateSubscriptions unsubscribe from topics we are currently subscribed to but that are
// not in the list of wanted subnets.
// TODO: Rename this functions as it does not only revalidate subscriptions.
func (s *Service) reValidateSubscriptions(
subscriptions map[uint64]*pubsub.Subscription,
wantedSubs []uint64,
topicFormat string,
digest [4]byte,
) {
for k, v := range subscriptions {
var wanted bool
for _, idx := range wantedSubs {
Expand All @@ -482,6 +487,7 @@ func (s *Service) reValidateSubscriptions(subscriptions map[uint64]*pubsub.Subsc
break
}
}

if !wanted && v != nil {
v.Cancel()
fullTopic := fmt.Sprintf(topicFormat, digest, k) + s.cfg.p2p.Encoding().ProtocolSuffix()
Expand Down Expand Up @@ -515,30 +521,6 @@ func (s *Service) subscribeAggregatorSubnet(
}
}

// subscribe missing subnets for our sync committee members.
func (s *Service) subscribeSyncSubnet(
subscriptions map[uint64]*pubsub.Subscription,
idx uint64,
digest [4]byte,
validate wrappedVal,
handle subHandler,
) {
// do not subscribe if we have no peers in the same
// subnet
topic := p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.SyncCommitteeMessage{})]
subnetTopic := fmt.Sprintf(topic, digest, idx)
// check if subscription exists and if not subscribe the relevant subnet.
if _, exists := subscriptions[idx]; !exists {
subscriptions[idx] = s.subscribeWithBase(subnetTopic, validate, handle)
}
if !s.validPeersExist(subnetTopic) {
_, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
}
}
}

// subscribe to a static subnet with the given topic and index. A given validator and subscription handler is
// used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding.
func (s *Service) subscribeStaticWithSyncSubnets(topic string, validator wrappedVal, handle subHandler, digest [4]byte) {
Expand Down Expand Up @@ -602,59 +584,138 @@ func (s *Service) subscribeStaticWithSyncSubnets(topic string, validator wrapped
}()
}

// subscribe to a dynamically changing list of subnets. This method expects a fmt compatible
// string for the topic name and the list of subnets for subscribed topics that should be
// maintained.
// subscribeToSyncSubnets subscribes to needed sync subnets, unsubscribe from unneeded ones and search for more peers if needed.
// Returns `true` if the digest is valid (wrt. the current epoch), `false` otherwise.
func (s *Service) subscribeToSyncSubnets(
topicFormat string,
digest [4]byte,
genesisValidatorsRoot [fieldparams.RootLength]byte,
genesisTime time.Time,
subscriptions map[uint64]*pubsub.Subscription,
currentSlot primitives.Slot,
validate wrappedVal,
handle subHandler,
) bool {
// Get sync subnets topic.
topic := p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.SyncCommitteeMessage{})]

// Do not subscribe if not synced.
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
return true
}

// Do not subscribe is the digest is not valid.
valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot)
if err != nil {
log.Error(err)
return true
}

// Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork.
if !valid {
log.WithField("digest", fmt.Sprintf("%#x", digest)).Warn("Sync subnets with this digest are no longer valid, unsubscribing from all of them.")
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
return false
}

// Get the current epoch.
currentEpoch := slots.ToEpoch(currentSlot)

// Retrieve the subnets we want to subscribe to.
wantedSubnetsIndex := s.retrieveActiveSyncSubnets(currentEpoch)

// Remove subscriptions that are no longer wanted.
s.reValidateSubscriptions(subscriptions, wantedSubnetsIndex, topicFormat, digest)

// Subscribe to wanted subnets.
for _, subnetIndex := range wantedSubnetsIndex {
subnetTopic := fmt.Sprintf(topic, digest, subnetIndex)

// Check if subscription exists.
if _, exists := subscriptions[subnetIndex]; exists {
continue
}

// We need to subscribe to the subnet.
subscription := s.subscribeWithBase(subnetTopic, validate, handle)
subscriptions[subnetIndex] = subscription
}

// Find new peers for wanted subnets if needed.
for _, subnetIndex := range wantedSubnetsIndex {
subnetTopic := fmt.Sprintf(topic, digest, subnetIndex)

// Check if we have enough peers in the subnet. Skip if we do.
if s.validPeersExist(subnetTopic) {
continue
}

// Not enough peers in the subnet, we need to search for more.
_, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, subnetIndex, flags.Get().MinimumPeersPerSubnet)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
}
}

return true
}

// subscribeDynamicWithSyncSubnets subscribes to a dynamically changing list of subnets.
func (s *Service) subscribeDynamicWithSyncSubnets(
topicFormat string,
validate wrappedVal,
handle subHandler,
digest [4]byte,
) {
genRoot := s.cfg.clock.GenesisValidatorsRoot()
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:])
// Retrieve the number of committee subnets we need to subscribe to.
syncCommiteeSubnetsCount := params.BeaconConfig().SyncCommitteeSubnetCount

// Initialize the subscriptions map.
subscriptions := make(map[uint64]*pubsub.Subscription, syncCommiteeSubnetsCount)

// Retrieve the fenesis validators root.
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()

// Retrieve the epoch of the fork corresponding to the digest.
_, e, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:])
if err != nil {
panic(err)
}

// Retrieve the base protobuf message.
base := p2p.GossipTopicMappings(topicFormat, e)
if base == nil {
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat))
}
subscriptions := make(map[uint64]*pubsub.Subscription, params.BeaconConfig().SyncCommitteeSubnetCount)
genesis := s.cfg.clock.GenesisTime()
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)

// Retrieve the genesis time.
genesisTime := s.cfg.clock.GenesisTime()

// Define a ticker ticking every slot.
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
ticker := slots.NewSlotTicker(genesisTime, secondsPerSlot)

// Retrieve the current slot.
currentSlot := s.cfg.clock.CurrentSlot()

// Subscribe to the sync subnets.
s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)

go func() {
for {
select {
case <-s.ctx.Done():
ticker.Done()
return
case currentSlot := <-ticker.C():
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
continue
}
valid, err := isDigestValid(digest, genesis, genRoot)
if err != nil {
log.Error(err)
continue
}
if !valid {
log.Warnf("Sync subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest)
// Unsubscribes from all our current subnets.
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
isDigestValid := s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)

// Stop the ticker if the digest is not valid. Likely to happen after a hard fork.
if !isDigestValid {
ticker.Done()
return
}

wantedSubs := s.retrieveActiveSyncSubnets(slots.ToEpoch(currentSlot))
// Resize as appropriate.
s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest)

// subscribe desired aggregator subnets.
for _, idx := range wantedSubs {
s.subscribeSyncSubnet(subscriptions, idx, digest, validate, handle)
}
case <-s.ctx.Done():
ticker.Done()
return
}
}
}()
Expand Down Expand Up @@ -782,18 +843,7 @@ func (s *Service) validPeersExist(subnetTopic string) bool {
peersWithSubnet := s.cfg.p2p.PubSub().ListPeers(topic)
peersWithSubnetCount := len(peersWithSubnet)

enoughPeers := peersWithSubnetCount >= threshold

if !enoughPeers {
log.WithFields(logrus.Fields{
"topic": topic,
"peersCount": peersWithSubnetCount,
"threshold": threshold,
"ctxError": s.ctx.Err(),
}).Debug("Not enough valid peers, starting network search")
}

return enoughPeers
return peersWithSubnetCount >= threshold
}

func (s *Service) retrievePersistentSubs(currSlot primitives.Slot) []uint64 {
Expand Down

0 comments on commit eee87bb

Please sign in to comment.