diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index d9ce96375f1c..2e199d03bc55 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -546,8 +546,9 @@ func missingDataColumns(bs *filesystem.BlobStorage, root [32]byte, expected map[ // closed, the context hits cancellation/timeout, or notifications have been received for all the missing sidecars. func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed interfaces.ReadOnlySignedBeaconBlock) error { if coreTime.PeerDASIsActive(signed.Block().Slot()) { - return s.isDataColumnsAvailable(ctx, root, signed) + return s.areDataColumnsAvailable(ctx, root, signed) } + if signed.Version() < version.Deneb { return nil } @@ -624,7 +625,17 @@ func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed int } } -func (s *Service) isDataColumnsAvailable(ctx context.Context, root [32]byte, signed interfaces.ReadOnlySignedBeaconBlock) error { +// uint64MapToSortedSlice produces a sorted uint64 slice from a map. +func uint64MapToSortedSlice(input map[uint64]bool) []uint64 { + output := make([]uint64, 0, len(input)) + for idx := range input { + output = append(output, idx) + } + slices.Sort[[]uint64](output) + return output +} + +func (s *Service) areDataColumnsAvailable(ctx context.Context, root [32]byte, signed interfaces.ReadOnlySignedBeaconBlock) error { if signed.Version() < version.Deneb { return nil } @@ -653,7 +664,12 @@ func (s *Service) isDataColumnsAvailable(ctx context.Context, root [32]byte, sig return nil } - colMap, err := peerdas.CustodyColumns(s.cfg.P2P.NodeID(), peerdas.CustodySubnetCount()) + // All columns to sample need to be available for the block to be considered available. + // https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/das-core.md#subnet-sampling + nodeID := s.cfg.P2P.NodeID() + subnetSamplingSize := peerdas.SubnetSamplingSize() + + colMap, err := peerdas.CustodyColumns(nodeID, subnetSamplingSize) if err != nil { return errors.Wrap(err, "custody columns") } @@ -698,25 +714,27 @@ func (s *Service) isDataColumnsAvailable(ctx context.Context, root [32]byte, sig nextSlot := slots.BeginsAt(signed.Block().Slot()+1, s.genesisTime) // Avoid logging if DA check is called after next slot start. if nextSlot.After(time.Now()) { - // Compute sorted slice of expected columns. - expected := make([]uint64, 0, len(colMap)) - for col := range colMap { - expected = append(expected, col) - } + nst := time.AfterFunc(time.Until(nextSlot), func() { + missingMapCount := uint64(len(missingMap)) + + if missingMapCount == 0 { + return + } - slices.Sort[[]uint64](expected) + var ( + expected interface{} = "all" + missing interface{} = "all" + ) - // Compute sorted slice of missing columns. - missing := make([]uint64, 0, len(missingMap)) - for col := range missingMap { - missing = append(missing, col) - } + numberOfColumns := params.BeaconConfig().NumberOfColumns + colMapCount := uint64(len(colMap)) - slices.Sort[[]uint64](missing) + if colMapCount < numberOfColumns { + expected = uint64MapToSortedSlice(colMap) + } - nst := time.AfterFunc(time.Until(nextSlot), func() { - if len(missingMap) == 0 { - return + if missingMapCount < numberOfColumns { + missing = uint64MapToSortedSlice(missingMap) } log.WithFields(logrus.Fields{ @@ -724,8 +742,9 @@ func (s *Service) isDataColumnsAvailable(ctx context.Context, root [32]byte, sig "root": fmt.Sprintf("%#x", root), "columnsExpected": expected, "columnsWaiting": missing, - }).Error("Still waiting for data columns DA check at slot end.") + }).Error("Some data columns are still unavailable at slot end.") }) + defer nst.Stop() } diff --git a/beacon-chain/core/peerdas/helpers.go b/beacon-chain/core/peerdas/helpers.go index 91c128101d0d..87ed83fb77ff 100644 --- a/beacon-chain/core/peerdas/helpers.go +++ b/beacon-chain/core/peerdas/helpers.go @@ -431,11 +431,20 @@ func VerifyDataColumnSidecarKZGProofs(sc blocks.RODataColumn) (bool, error) { // CustodySubnetCount returns the number of subnets the node should participate in for custody. func CustodySubnetCount() uint64 { - count := params.BeaconConfig().CustodyRequirement if flags.Get().SubscribeToAllSubnets { - count = params.BeaconConfig().DataColumnSidecarSubnetCount + return params.BeaconConfig().DataColumnSidecarSubnetCount } - return count + + return params.BeaconConfig().CustodyRequirement +} + +// SubnetSamplingSize returns the number of subnets the node should sample from. +// https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/das-core.md#subnet-sampling +func SubnetSamplingSize() uint64 { + samplesPerSlot := params.BeaconConfig().SamplesPerSlot + custodySubnetCount := CustodySubnetCount() + + return max(samplesPerSlot, custodySubnetCount) } // CustodyColumnCount returns the number of columns the node should custody. diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index b9b533b1ce0b..0cd75c0466f2 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -380,7 +380,8 @@ func (s *Service) createLocalNode( } if params.PeerDASEnabled() { - localNode.Set(peerdas.Csc(peerdas.CustodySubnetCount())) + custodySubnetCount := peerdas.CustodySubnetCount() + localNode.Set(peerdas.Csc(custodySubnetCount)) } localNode.SetFallbackIP(ipAddr) diff --git a/beacon-chain/p2p/subnets.go b/beacon-chain/p2p/subnets.go index 803e370b8f0b..a90bc8217070 100644 --- a/beacon-chain/p2p/subnets.go +++ b/beacon-chain/p2p/subnets.go @@ -377,22 +377,29 @@ func initializePersistentSubnets(id enode.ID, epoch primitives.Epoch) error { return nil } +// initializePersistentColumnSubnets initialize persisten column subnets func initializePersistentColumnSubnets(id enode.ID) error { + // Check if the column subnets are already cached. _, ok, expTime := cache.ColumnSubnetIDs.GetColumnSubnets() if ok && expTime.After(time.Now()) { return nil } - subsMap, err := peerdas.CustodyColumnSubnets(id, peerdas.CustodySubnetCount()) + + // Retrieve the subnets we should be subscribed to. + subnetSamplingSize := peerdas.SubnetSamplingSize() + subnetsMap, err := peerdas.CustodyColumnSubnets(id, subnetSamplingSize) if err != nil { - return err + return errors.Wrap(err, "custody column subnets") } - subs := make([]uint64, 0, len(subsMap)) - for sub := range subsMap { - subs = append(subs, sub) + subnets := make([]uint64, 0, len(subnetsMap)) + for subnet := range subnetsMap { + subnets = append(subnets, subnet) } - cache.ColumnSubnetIDs.AddColumnSubnets(subs) + // Add the subnets to the cache. + cache.ColumnSubnetIDs.AddColumnSubnets(subnets) + return nil } diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go index 43555d401a00..278a781655d2 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go @@ -61,28 +61,31 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) ( if err != nil { log.WithError(err).Error("Could not convert slot to time") } - log.WithFields(logrus.Fields{ - "slot": req.Slot, - "sinceSlotStartTime": time.Since(t), - }).Info("Begin building block") + + log := log.WithField("slot", req.Slot) + log.WithField("sinceSlotStartTime", time.Since(t)).Info("Begin building block") // A syncing validator should not produce a block. if vs.SyncChecker.Syncing() { + log.Error("Fail to build block: node is syncing") return nil, status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond") } // An optimistic validator MUST NOT produce a block (i.e., sign across the DOMAIN_BEACON_PROPOSER domain). if slots.ToEpoch(req.Slot) >= params.BeaconConfig().BellatrixForkEpoch { if err := vs.optimisticStatus(ctx); err != nil { + log.WithError(err).Error("Fail to build block: node is optimistic") return nil, status.Errorf(codes.Unavailable, "Validator is not ready to propose: %v", err) } } head, parentRoot, err := vs.getParentState(ctx, req.Slot) if err != nil { + log.WithError(err).Error("Fail to build block: could not get parent state") return nil, err } sBlk, err := getEmptyBlock(req.Slot) if err != nil { + log.WithError(err).Error("Fail to build block: could not get empty block") return nil, status.Errorf(codes.Internal, "Could not prepare block: %v", err) } // Set slot, graffiti, randao reveal, and parent root. @@ -94,6 +97,7 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) ( // Set proposer index. idx, err := helpers.BeaconProposerIndex(ctx, head) if err != nil { + log.WithError(err).Error("Fail to build block: could not calculate proposer index") return nil, fmt.Errorf("could not calculate proposer index %w", err) } sBlk.SetProposerIndex(idx) @@ -104,14 +108,17 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) ( } resp, err := vs.BuildBlockParallel(ctx, sBlk, head, req.SkipMevBoost, builderBoostFactor) + if err != nil { + log.WithError(err).Error("Fail to build block: could not build block in parallel") + return nil, errors.Wrap(err, "could not build block in parallel") + } + log.WithFields(logrus.Fields{ - "slot": req.Slot, "sinceSlotStartTime": time.Since(t), "validator": sBlk.Block().ProposerIndex(), + "parentRoot": fmt.Sprintf("%#x", parentRoot), }).Info("Finished building block") - if err != nil { - return nil, errors.Wrap(err, "could not build block in parallel") - } + return resp, nil } diff --git a/beacon-chain/sync/pending_blocks_queue.go b/beacon-chain/sync/pending_blocks_queue.go index e51a23741df9..a6caf2989afe 100644 --- a/beacon-chain/sync/pending_blocks_queue.go +++ b/beacon-chain/sync/pending_blocks_queue.go @@ -306,55 +306,120 @@ func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, ra ctx, span := prysmTrace.StartSpan(ctx, "sendBatchRootRequest") defer span.End() + // Exit early if there are no roots to request. + if len(roots) == 0 { + return nil + } + + // Remove duplicates (if any) from the list of roots. roots = dedupRoots(roots) - s.pendingQueueLock.RLock() - for i := len(roots) - 1; i >= 0; i-- { - r := roots[i] - if s.seenPendingBlocks[r] || s.cfg.chain.BlockBeingSynced(r) { - roots = append(roots[:i], roots[i+1:]...) - } else { + + // Reversly iterate through the list of roots to request blocks, and filter out roots that are already + // seen in pending blocks or being synced. + func() { + s.pendingQueueLock.RLock() + defer s.pendingQueueLock.RUnlock() + + for i := len(roots) - 1; i >= 0; i-- { + r := roots[i] + if s.seenPendingBlocks[r] || s.cfg.chain.BlockBeingSynced(r) { + roots = append(roots[:i], roots[i+1:]...) + continue + } + log.WithField("blockRoot", fmt.Sprintf("%#x", r)).Debug("Requesting block by root") } - } - s.pendingQueueLock.RUnlock() + }() + // Nothing to do, exit early. if len(roots) == 0 { return nil } + + // Fetch best peers to request blocks from. bestPeers := s.getBestPeers() + + // Filter out peers that do not custody a superset of our columns. + // (Very likely, keep only supernode peers) + // TODO: Change this to be able to fetch from all peers. + headSlot := s.cfg.chain.HeadSlot() + peerDASIsActive := coreTime.PeerDASIsActive(headSlot) + + if peerDASIsActive { + var err error + bestPeers, err = s.cfg.p2p.GetValidCustodyPeers(bestPeers) + if err != nil { + return errors.Wrap(err, "get valid custody peers") + } + } + + // No suitable peer, exit early. if len(bestPeers) == 0 { + log.WithField("roots", fmt.Sprintf("%#x", roots)).Debug("Send batch root request: No suited peers") return nil } - // Randomly choose a peer to query from our best peers. If that peer cannot return - // all the requested blocks, we randomly select another peer. - pid := bestPeers[randGen.Int()%len(bestPeers)] - for i := 0; i < numOfTries; i++ { + + // Randomly choose a peer to query from our best peers. + // If that peer cannot return all the requested blocks, + // we randomly select another peer. + randomIndex := randGen.Int() % len(bestPeers) + pid := bestPeers[randomIndex] + + for range numOfTries { req := p2ptypes.BeaconBlockByRootsReq(roots) - currentEpoch := slots.ToEpoch(s.cfg.clock.CurrentSlot()) + + // Get the current epoch. + currentSlot := s.cfg.clock.CurrentSlot() + currentEpoch := slots.ToEpoch(currentSlot) + + // Trim the request to the maximum number of blocks we can request if needed. maxReqBlock := params.MaxRequestBlock(currentEpoch) - if uint64(len(roots)) > maxReqBlock { + rootCount := uint64(len(roots)) + if rootCount > maxReqBlock { req = roots[:maxReqBlock] } + + // Send the request to the peer. if err := s.sendRecentBeaconBlocksRequest(ctx, &req, pid); err != nil { tracing.AnnotateError(span, err) log.WithError(err).Debug("Could not send recent block request") } - newRoots := make([][32]byte, 0, len(roots)) - s.pendingQueueLock.RLock() - for _, rt := range roots { - if !s.seenPendingBlocks[rt] { - newRoots = append(newRoots, rt) + + // Filter out roots that are already seen in pending blocks. + newRoots := make([][32]byte, 0, rootCount) + func() { + s.pendingQueueLock.RLock() + defer s.pendingQueueLock.RUnlock() + + for _, rt := range roots { + if !s.seenPendingBlocks[rt] { + newRoots = append(newRoots, rt) + } } - } - s.pendingQueueLock.RUnlock() + }() + + // Exit early if all roots have been seen. + // This is the happy path. if len(newRoots) == 0 { - break + return nil } - // Choosing a new peer with the leftover set of - // roots to request. + + // There is still some roots that have not been seen. + // Choosing a new peer with the leftover set of oots to request. roots = newRoots - pid = bestPeers[randGen.Int()%len(bestPeers)] + + // Choose a new peer to query. + randomIndex = randGen.Int() % len(bestPeers) + pid = bestPeers[randomIndex] } + + // Some roots are still missing after all allowed tries. + // This is the unhappy path. + log.WithFields(logrus.Fields{ + "roots": fmt.Sprintf("%#x", roots), + "tries": numOfTries, + }).Debug("Send batch root request: Some roots are still missing after all allowed tries") + return nil } diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 3439f051ef8c..e50f29d0af49 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -170,7 +170,6 @@ type Service struct { receivedDataColumnsFromRoot map[[fieldparams.RootLength]byte]map[uint64]bool receivedDataColumnsFromRootLock sync.RWMutex ctxMap ContextByteVersions - sampler DataColumnSampler } // NewService initializes new regular sync service. @@ -360,12 +359,6 @@ func (s *Service) startTasksPostInitialSync() { // Start the fork watcher. go s.forkWatcher() - // Start data columns sampling if peerDAS is enabled. - if params.PeerDASEnabled() { - s.sampler = newDataColumnSampler1D(s.cfg.p2p, s.cfg.clock, s.ctxMap, s.cfg.stateNotifier, s.newColumnVerifier) - go s.sampler.Run(s.ctx) - } - case <-s.ctx.Done(): log.Debug("Context closed, exiting goroutine") }