diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index f9ec14700eb1..abfb6090b6d6 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -331,8 +331,7 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start primitives.Slot } if coreTime.PeerDASIsActive(start) { - connectedPeers := f.p2p.Peers().Connected() - response.err = f.fetchDataColumnsFromPeers(ctx, response.bwb, connectedPeers) + response.err = f.fetchDataColumnsFromPeers(ctx, response.bwb, nil) return response } @@ -850,7 +849,23 @@ func (f *blocksFetcher) requestDataColumnsFromPeers( peers[i], peers[j] = peers[j], peers[i] }) + var columnsLog interface{} = "all" + columnsCount := uint64(len(request.Columns)) + numberOfColumns := params.BeaconConfig().NumberOfColumns + if columnsCount < numberOfColumns { + columnsLog = request.Columns + } + + log := log.WithFields(logrus.Fields{ + "start": request.StartSlot, + "count": request.Count, + "columns": columnsLog, + "items": request.Count * columnsCount, + }) + for _, peer := range peers { + log := log.WithField("peer", peer) + if ctx.Err() != nil { return nil, "", ctx.Err() } @@ -861,12 +876,9 @@ func (f *blocksFetcher) requestDataColumnsFromPeers( defer l.Unlock() log.WithFields(logrus.Fields{ - "peer": peer, - "start": request.StartSlot, - "count": request.Count, "capacity": f.rateLimiter.Remaining(peer.String()), "score": f.p2p.Peers().Scorers().BlockProviderScorer().FormatScorePretty(peer), - }).Debug("Requesting data columns") + }).Debug("Data columns by range - requesting") // We're intentionally abusing the block rate limit here, treating data column requests as if they were block requests. // Since column requests take more bandwidth than blocks, we should improve how we account for the different kinds @@ -883,32 +895,28 @@ func (f *blocksFetcher) requestDataColumnsFromPeers( }() if err != nil { - log.WithError(err).WithField("peer", peer).Warning("Could not wait for bandwidth") + log.WithError(err).Warning("Data columns by range - could not wait for bandwidth") continue } roDataColumns, err := prysmsync.SendDataColumnsByRangeRequest(ctx, f.clock, f.p2p, peer, f.ctxMap, request) if err != nil { - log.WithField("peer", peer).WithError(err).Warning("Could not send data columns by range request") + log.WithError(err).Warning("Data columns by range - could not send data columns by range request") continue } // If the peer did not return any data columns, go to the next peer. if len(roDataColumns) == 0 { - log.WithFields(logrus.Fields{ - "peer": peer, - "start": request.StartSlot, - "count": request.Count, - }).Debug("Peer did not returned any data columns") + log.Debug("Data columns by range - peer did not returned any data columns") continue } - // We have received at least one data columns from the peer. + // We have received at least one data columns from the peer. This is the happy path. return roDataColumns, peer, nil } - // No peer returned any data columns. + // No peer returned any data columns. This this the unhappy path. return nil, "", nil } @@ -1008,15 +1016,13 @@ func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers( indicesFromRoot map[[fieldparams.RootLength]byte][]int, peers []peer.ID, ) error { - const delay = 5 * time.Second - - columnsCount := 0 - for _, columns := range missingColumnsFromRoot { - columnsCount += len(columns) - } + const ( + delay = 5 * time.Second + batchSize = 512 + ) start := time.Now() - log.WithField("columnsCount", columnsCount).Debug("Retrieving missing data columns from peers - start") + log.Debug("Retrieving missing data columns from peers - start") for len(missingColumnsFromRoot) > 0 { if ctx.Err() != nil { @@ -1041,8 +1047,30 @@ func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers( } } + // Get a sorted slice of missing data columns. + missingDataColumnsSlice := sortedSliceFromMap(missingDataColumns) + missingDataColumnsCount := uint64(len(missingDataColumnsSlice)) + + numberOfColumns := params.BeaconConfig().NumberOfColumns + var requestedColumnsLog interface{} = "all" + + if missingDataColumnsCount < numberOfColumns { + requestedColumnsLog = missingDataColumnsSlice + } + + // Reduce blocks count until the total number of elements is less than the batch size. + for missingDataColumnsCount*blocksCount > batchSize { + blocksCount /= 2 + } + + // If no peer is specified, get all connected peers. + peersToFilter := peers + if peersToFilter == nil { + peersToFilter = f.p2p.Peers().Connected() + } + // Filter peers. - filteredPeers, err := f.peersWithSlotAndDataColumns(peers, lastSlot, missingDataColumns) + filteredPeers, err := f.peersWithSlotAndDataColumns(peersToFilter, lastSlot, missingDataColumns) if err != nil { return errors.Wrap(err, "peers with slot and data columns") } @@ -1050,9 +1078,10 @@ func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers( if len(filteredPeers) == 0 { log. WithFields(logrus.Fields{ - "peers": filteredPeers, - "delay": delay, - "targetSlot": lastSlot, + "peers": peersToFilter, + "filteredPeers": filteredPeers, + "delay": delay, + "targetSlot": lastSlot, }). Warning("No peers available to retrieve missing data columns, retrying later") @@ -1067,14 +1096,14 @@ func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers( request := &p2ppb.DataColumnSidecarsByRangeRequest{ StartSlot: startSlot, Count: blocksCount, - Columns: sortedSliceFromMap(missingDataColumns), + Columns: missingDataColumnsSlice, } // Get all the blocks and data columns we should retrieve. blockFromRoot := blockFromRoot(bwb[firstIndex : lastIndex+1]) // Iterate requests over all peers, and exits as soon as at least one data column is retrieved. - roDataColumns, peer, err := f.requestDataColumnsFromPeers(ctx, request, filteredPeers) + roDataColumns, _, err := f.requestDataColumnsFromPeers(ctx, request, filteredPeers) if err != nil { return errors.Wrap(err, "request data columns from peers") } @@ -1082,11 +1111,12 @@ func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers( if len(roDataColumns) == 0 { log. WithFields(logrus.Fields{ - "peers": filteredPeers, - "delay": delay, - "startSlot": startSlot, - "count": blocksCount, - "columns": sortedSliceFromMap(missingDataColumns), + "peers": peers, + "filteredPeers": filteredPeers, + "delay": delay, + "start": startSlot, + "count": blocksCount, + "columns": requestedColumnsLog, }). Warning("No data columns returned from any peer, retrying later") @@ -1096,27 +1126,6 @@ func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers( // Process the retrieved data columns. processRetrievedDataColumns(roDataColumns, blockFromRoot, indicesFromRoot, missingColumnsFromRoot, bwb, f.cv) - - if len(missingColumnsFromRoot) > 0 { - numberOfColumns := params.BeaconConfig().NumberOfColumns - - for root, missingColumns := range missingColumnsFromRoot { - missingColumnsCount := uint64(len(missingColumns)) - var missingColumnsLog interface{} = "all" - - if missingColumnsCount < numberOfColumns { - missingColumnsLog = sortedSliceFromMap(missingColumns) - } - - slot := blockFromRoot[root].Block().Slot() - log.WithFields(logrus.Fields{ - "peer": peer, - "root": fmt.Sprintf("%#x", root), - "slot": slot, - "missingColumns": missingColumnsLog, - }).Debug("Peer did not returned all requested data columns") - } - } } log.WithField("duration", time.Since(start)).Debug("Retrieving missing data columns from peers - success") diff --git a/beacon-chain/sync/rpc_send_request.go b/beacon-chain/sync/rpc_send_request.go index edbb47ded974..0306092f099c 100644 --- a/beacon-chain/sync/rpc_send_request.go +++ b/beacon-chain/sync/rpc_send_request.go @@ -354,14 +354,14 @@ func SendDataColumnsByRangeRequest( columnsLog = columns } - log.WithFields(logrus.Fields{ + log := log.WithFields(logrus.Fields{ "peer": pid, "topic": topic, "startSlot": req.StartSlot, "count": req.Count, "columns": columnsLog, "totalCount": req.Count * uint64(len(req.Columns)), - }).Debug("Sending data column by range request") + }) stream, err := p2pApi.Send(ctx, req, topic, pid) if err != nil { @@ -391,19 +391,19 @@ func SendDataColumnsByRangeRequest( } if err != nil { - log.WithError(err).WithField("peer", pid).Debug("Error reading chunked data column sidecar") + log.WithError(err).Debug("Error reading chunked data column sidecar") break } if roDataColumn == nil { - log.WithError(err).WithField("peer", pid).Debug("Validation error") + log.WithError(err).Debug("Validation error") continue } if i >= max { // The response MUST contain no more than `reqCount` blocks. // (`reqCount` is already capped by `maxRequestDataColumnSideCar`.) - log.WithError(err).WithField("peer", pid).Debug("Response contains more data column sidecars than maximum") + log.WithError(err).Debug("Response contains more data column sidecars than maximum") break } diff --git a/beacon-chain/sync/rpc_status.go b/beacon-chain/sync/rpc_status.go index 0ad9ffd21f33..75d370854cdc 100644 --- a/beacon-chain/sync/rpc_status.go +++ b/beacon-chain/sync/rpc_status.go @@ -191,7 +191,7 @@ func (s *Service) reValidatePeer(ctx context.Context, id peer.ID) error { } // Do not return an error for ping requests. if err := s.sendPingRequest(ctx, id); err != nil && !isUnwantedError(err) { - log.WithError(err).Debug("Could not ping peer") + log.WithError(err).WithField("pid", id).Debug("Could not ping peer") } return nil }