Skip to content

Commit

Permalink
Peerdas: Full subnet sampling and sendBatchRootRequest fix. (#14452)
Browse files Browse the repository at this point in the history
* `sendBatchRootRequest`: Refactor and add comments.

* `sendBatchRootRequest`: Do send requests to peers that custodies a superset of our columns.

Before this commit, we sent "data columns by root requests" for data columns peers do not custody.

* Data columns: Use subnet sampling only.

(Instead of peer sampling.)

aaa

* `areDataColumnsAvailable`: Improve logs.

* `GetBeaconBlock`: Improve logs.

Rationale: A `begin` log should always be followed by a `success` log or a `failure` log.
  • Loading branch information
nalepae committed Oct 24, 2024
1 parent 4bf2c27 commit 6b1b04e
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 69 deletions.
57 changes: 38 additions & 19 deletions beacon-chain/blockchain/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,9 @@ func missingDataColumns(bs *filesystem.BlobStorage, root [32]byte, expected map[
// closed, the context hits cancellation/timeout, or notifications have been received for all the missing sidecars.
func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed interfaces.ReadOnlySignedBeaconBlock) error {
if coreTime.PeerDASIsActive(signed.Block().Slot()) {
return s.isDataColumnsAvailable(ctx, root, signed)
return s.areDataColumnsAvailable(ctx, root, signed)
}

if signed.Version() < version.Deneb {
return nil
}
Expand Down Expand Up @@ -628,7 +629,17 @@ func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed int
}
}

func (s *Service) isDataColumnsAvailable(ctx context.Context, root [32]byte, signed interfaces.ReadOnlySignedBeaconBlock) error {
// uint64MapToSortedSlice produces a sorted uint64 slice from a map.
func uint64MapToSortedSlice(input map[uint64]bool) []uint64 {
output := make([]uint64, 0, len(input))
for idx := range input {
output = append(output, idx)
}
slices.Sort[[]uint64](output)
return output
}

func (s *Service) areDataColumnsAvailable(ctx context.Context, root [32]byte, signed interfaces.ReadOnlySignedBeaconBlock) error {
if signed.Version() < version.Deneb {
return nil
}
Expand Down Expand Up @@ -657,7 +668,12 @@ func (s *Service) isDataColumnsAvailable(ctx context.Context, root [32]byte, sig
return nil
}

colMap, err := peerdas.CustodyColumns(s.cfg.P2P.NodeID(), peerdas.CustodySubnetCount())
// All columns to sample need to be available for the block to be considered available.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/das-core.md#subnet-sampling
nodeID := s.cfg.P2P.NodeID()
subnetSamplingSize := peerdas.SubnetSamplingSize()

colMap, err := peerdas.CustodyColumns(nodeID, subnetSamplingSize)
if err != nil {
return errors.Wrap(err, "custody columns")
}
Expand Down Expand Up @@ -702,34 +718,37 @@ func (s *Service) isDataColumnsAvailable(ctx context.Context, root [32]byte, sig
nextSlot := slots.BeginsAt(signed.Block().Slot()+1, s.genesisTime)
// Avoid logging if DA check is called after next slot start.
if nextSlot.After(time.Now()) {
// Compute sorted slice of expected columns.
expected := make([]uint64, 0, len(colMap))
for col := range colMap {
expected = append(expected, col)
}
nst := time.AfterFunc(time.Until(nextSlot), func() {
missingMapCount := uint64(len(missingMap))

if missingMapCount == 0 {
return
}

slices.Sort[[]uint64](expected)
var (
expected interface{} = "all"
missing interface{} = "all"
)

// Compute sorted slice of missing columns.
missing := make([]uint64, 0, len(missingMap))
for col := range missingMap {
missing = append(missing, col)
}
numberOfColumns := params.BeaconConfig().NumberOfColumns
colMapCount := uint64(len(colMap))

slices.Sort[[]uint64](missing)
if colMapCount < numberOfColumns {
expected = uint64MapToSortedSlice(colMap)
}

nst := time.AfterFunc(time.Until(nextSlot), func() {
if len(missingMap) == 0 {
return
if missingMapCount < numberOfColumns {
missing = uint64MapToSortedSlice(missingMap)
}

log.WithFields(logrus.Fields{
"slot": signed.Block().Slot(),
"root": fmt.Sprintf("%#x", root),
"columnsExpected": expected,
"columnsWaiting": missing,
}).Error("Still waiting for data columns DA check at slot end.")
}).Error("Some data columns are still unavailable at slot end.")
})

defer nst.Stop()
}

Expand Down
15 changes: 12 additions & 3 deletions beacon-chain/core/peerdas/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,11 +431,20 @@ func VerifyDataColumnSidecarKZGProofs(sc blocks.RODataColumn) (bool, error) {

// CustodySubnetCount returns the number of subnets the node should participate in for custody.
func CustodySubnetCount() uint64 {
count := params.BeaconConfig().CustodyRequirement
if flags.Get().SubscribeToAllSubnets {
count = params.BeaconConfig().DataColumnSidecarSubnetCount
return params.BeaconConfig().DataColumnSidecarSubnetCount
}
return count

return params.BeaconConfig().CustodyRequirement
}

// SubnetSamplingSize returns the number of subnets the node should sample from.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/das-core.md#subnet-sampling
func SubnetSamplingSize() uint64 {
samplesPerSlot := params.BeaconConfig().SamplesPerSlot
custodySubnetCount := CustodySubnetCount()

return max(samplesPerSlot, custodySubnetCount)
}

// CustodyColumnCount returns the number of columns the node should custody.
Expand Down
3 changes: 2 additions & 1 deletion beacon-chain/p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,8 @@ func (s *Service) createLocalNode(
}

if params.PeerDASEnabled() {
localNode.Set(peerdas.Csc(peerdas.CustodySubnetCount()))
custodySubnetCount := peerdas.CustodySubnetCount()
localNode.Set(peerdas.Csc(custodySubnetCount))
}

localNode.SetFallbackIP(ipAddr)
Expand Down
19 changes: 13 additions & 6 deletions beacon-chain/p2p/subnets.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,22 +377,29 @@ func initializePersistentSubnets(id enode.ID, epoch primitives.Epoch) error {
return nil
}

// initializePersistentColumnSubnets initialize persisten column subnets
func initializePersistentColumnSubnets(id enode.ID) error {
// Check if the column subnets are already cached.
_, ok, expTime := cache.ColumnSubnetIDs.GetColumnSubnets()
if ok && expTime.After(time.Now()) {
return nil
}
subsMap, err := peerdas.CustodyColumnSubnets(id, peerdas.CustodySubnetCount())

// Retrieve the subnets we should be subscribed to.
subnetSamplingSize := peerdas.SubnetSamplingSize()
subnetsMap, err := peerdas.CustodyColumnSubnets(id, subnetSamplingSize)
if err != nil {
return err
return errors.Wrap(err, "custody column subnets")
}

subs := make([]uint64, 0, len(subsMap))
for sub := range subsMap {
subs = append(subs, sub)
subnets := make([]uint64, 0, len(subnetsMap))
for subnet := range subnetsMap {
subnets = append(subnets, subnet)
}

cache.ColumnSubnetIDs.AddColumnSubnets(subs)
// Add the subnets to the cache.
cache.ColumnSubnetIDs.AddColumnSubnets(subnets)

return nil
}

Expand Down
23 changes: 15 additions & 8 deletions beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,31 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
if err != nil {
log.WithError(err).Error("Could not convert slot to time")
}
log.WithFields(logrus.Fields{
"slot": req.Slot,
"sinceSlotStartTime": time.Since(t),
}).Info("Begin building block")

log := log.WithField("slot", req.Slot)
log.WithField("sinceSlotStartTime", time.Since(t)).Info("Begin building block")

// A syncing validator should not produce a block.
if vs.SyncChecker.Syncing() {
log.Error("Fail to build block: node is syncing")
return nil, status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond")
}
// An optimistic validator MUST NOT produce a block (i.e., sign across the DOMAIN_BEACON_PROPOSER domain).
if slots.ToEpoch(req.Slot) >= params.BeaconConfig().BellatrixForkEpoch {
if err := vs.optimisticStatus(ctx); err != nil {
log.WithError(err).Error("Fail to build block: node is optimistic")
return nil, status.Errorf(codes.Unavailable, "Validator is not ready to propose: %v", err)
}
}

head, parentRoot, err := vs.getParentState(ctx, req.Slot)
if err != nil {
log.WithError(err).Error("Fail to build block: could not get parent state")
return nil, err
}
sBlk, err := getEmptyBlock(req.Slot)
if err != nil {
log.WithError(err).Error("Fail to build block: could not get empty block")
return nil, status.Errorf(codes.Internal, "Could not prepare block: %v", err)
}
// Set slot, graffiti, randao reveal, and parent root.
Expand All @@ -94,6 +97,7 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
// Set proposer index.
idx, err := helpers.BeaconProposerIndex(ctx, head)
if err != nil {
log.WithError(err).Error("Fail to build block: could not calculate proposer index")
return nil, fmt.Errorf("could not calculate proposer index %w", err)
}
sBlk.SetProposerIndex(idx)
Expand All @@ -104,14 +108,17 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
}

resp, err := vs.BuildBlockParallel(ctx, sBlk, head, req.SkipMevBoost, builderBoostFactor)
if err != nil {
log.WithError(err).Error("Fail to build block: could not build block in parallel")
return nil, errors.Wrap(err, "could not build block in parallel")
}

log.WithFields(logrus.Fields{
"slot": req.Slot,
"sinceSlotStartTime": time.Since(t),
"validator": sBlk.Block().ProposerIndex(),
"parentRoot": fmt.Sprintf("%#x", parentRoot),
}).Info("Finished building block")
if err != nil {
return nil, errors.Wrap(err, "could not build block in parallel")
}

return resp, nil
}

Expand Down
115 changes: 90 additions & 25 deletions beacon-chain/sync/pending_blocks_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,55 +306,120 @@ func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, ra
ctx, span := prysmTrace.StartSpan(ctx, "sendBatchRootRequest")
defer span.End()

// Exit early if there are no roots to request.
if len(roots) == 0 {
return nil
}

// Remove duplicates (if any) from the list of roots.
roots = dedupRoots(roots)
s.pendingQueueLock.RLock()
for i := len(roots) - 1; i >= 0; i-- {
r := roots[i]
if s.seenPendingBlocks[r] || s.cfg.chain.BlockBeingSynced(r) {
roots = append(roots[:i], roots[i+1:]...)
} else {

// Reversly iterate through the list of roots to request blocks, and filter out roots that are already
// seen in pending blocks or being synced.
func() {
s.pendingQueueLock.RLock()
defer s.pendingQueueLock.RUnlock()

for i := len(roots) - 1; i >= 0; i-- {
r := roots[i]
if s.seenPendingBlocks[r] || s.cfg.chain.BlockBeingSynced(r) {
roots = append(roots[:i], roots[i+1:]...)
continue
}

log.WithField("blockRoot", fmt.Sprintf("%#x", r)).Debug("Requesting block by root")
}
}
s.pendingQueueLock.RUnlock()
}()

// Nothing to do, exit early.
if len(roots) == 0 {
return nil
}

// Fetch best peers to request blocks from.
bestPeers := s.getBestPeers()

// Filter out peers that do not custody a superset of our columns.
// (Very likely, keep only supernode peers)
// TODO: Change this to be able to fetch from all peers.
headSlot := s.cfg.chain.HeadSlot()
peerDASIsActive := coreTime.PeerDASIsActive(headSlot)

if peerDASIsActive {
var err error
bestPeers, err = s.cfg.p2p.GetValidCustodyPeers(bestPeers)
if err != nil {
return errors.Wrap(err, "get valid custody peers")
}
}

// No suitable peer, exit early.
if len(bestPeers) == 0 {
log.WithField("roots", fmt.Sprintf("%#x", roots)).Debug("Send batch root request: No suited peers")
return nil
}
// Randomly choose a peer to query from our best peers. If that peer cannot return
// all the requested blocks, we randomly select another peer.
pid := bestPeers[randGen.Int()%len(bestPeers)]
for i := 0; i < numOfTries; i++ {

// Randomly choose a peer to query from our best peers.
// If that peer cannot return all the requested blocks,
// we randomly select another peer.
randomIndex := randGen.Int() % len(bestPeers)
pid := bestPeers[randomIndex]

for range numOfTries {
req := p2ptypes.BeaconBlockByRootsReq(roots)
currentEpoch := slots.ToEpoch(s.cfg.clock.CurrentSlot())

// Get the current epoch.
currentSlot := s.cfg.clock.CurrentSlot()
currentEpoch := slots.ToEpoch(currentSlot)

// Trim the request to the maximum number of blocks we can request if needed.
maxReqBlock := params.MaxRequestBlock(currentEpoch)
if uint64(len(roots)) > maxReqBlock {
rootCount := uint64(len(roots))
if rootCount > maxReqBlock {
req = roots[:maxReqBlock]
}

// Send the request to the peer.
if err := s.sendRecentBeaconBlocksRequest(ctx, &req, pid); err != nil {
tracing.AnnotateError(span, err)
log.WithError(err).Debug("Could not send recent block request")
}
newRoots := make([][32]byte, 0, len(roots))
s.pendingQueueLock.RLock()
for _, rt := range roots {
if !s.seenPendingBlocks[rt] {
newRoots = append(newRoots, rt)

// Filter out roots that are already seen in pending blocks.
newRoots := make([][32]byte, 0, rootCount)
func() {
s.pendingQueueLock.RLock()
defer s.pendingQueueLock.RUnlock()

for _, rt := range roots {
if !s.seenPendingBlocks[rt] {
newRoots = append(newRoots, rt)
}
}
}
s.pendingQueueLock.RUnlock()
}()

// Exit early if all roots have been seen.
// This is the happy path.
if len(newRoots) == 0 {
break
return nil
}
// Choosing a new peer with the leftover set of
// roots to request.

// There is still some roots that have not been seen.
// Choosing a new peer with the leftover set of oots to request.
roots = newRoots
pid = bestPeers[randGen.Int()%len(bestPeers)]

// Choose a new peer to query.
randomIndex = randGen.Int() % len(bestPeers)
pid = bestPeers[randomIndex]
}

// Some roots are still missing after all allowed tries.
// This is the unhappy path.
log.WithFields(logrus.Fields{
"roots": fmt.Sprintf("%#x", roots),
"tries": numOfTries,
}).Debug("Send batch root request: Some roots are still missing after all allowed tries")

return nil
}

Expand Down
Loading

0 comments on commit 6b1b04e

Please sign in to comment.