From 533843d7f9e9639552cd927fc0d10bfd1c845560 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 15 Oct 2024 12:08:25 +0200 Subject: [PATCH] Data columns initial sync: Rework. (#14522) --- .../sync/initial-sync/blocks_fetcher.go | 862 ++++++++---------- .../sync/initial-sync/blocks_fetcher_test.go | 545 ++++++----- .../sync/initial-sync/blocks_fetcher_utils.go | 60 +- .../initial-sync/blocks_fetcher_utils_test.go | 46 + .../sync/rpc_data_column_sidecars_by_root.go | 18 +- beacon-chain/sync/validate_data_column.go | 5 +- beacon-chain/sync/verify/blob.go | 12 +- 7 files changed, 831 insertions(+), 717 deletions(-) diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 98aec160a177..16d662451577 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -3,7 +3,6 @@ package initialsync import ( "context" "fmt" - "math" "sort" "strings" "sync" @@ -38,11 +37,12 @@ import ( ) const ( - // maxPendingRequests limits how many concurrent fetch request one can initiate. maxPendingRequests = 64 // peersPercentagePerRequest caps percentage of peers to be used in a request. peersPercentagePerRequest = 0.75 + // peersPercentagePerRequestDataColumns caps percentage of peers to be used in a data columns request. + peersPercentagePerRequestDataColumns = 1. // handshakePollingInterval is a polling interval for checking the number of received handshakes. handshakePollingInterval = 5 * time.Second // peerLocksPollingInterval is a polling interval for checking if there are stale peer locks. @@ -318,8 +318,11 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start primitives.Slot if f.mode == modeStopOnFinalizedEpoch { highestFinalizedSlot := params.BeaconConfig().SlotsPerEpoch.Mul(uint64(targetEpoch + 1)) if start > highestFinalizedSlot { - response.err = fmt.Errorf("%w, slot: %d, highest finalized slot: %d", - errSlotIsTooHigh, start, highestFinalizedSlot) + response.err = fmt.Errorf( + "%w, slot: %d, highest finalized slot: %d", + errSlotIsTooHigh, start, highestFinalizedSlot, + ) + return response } } @@ -482,16 +485,6 @@ func (r *blobRange) Request() *p2ppb.BlobSidecarsByRangeRequest { } } -func (r *blobRange) RequestDataColumns() *p2ppb.DataColumnSidecarsByRangeRequest { - if r == nil { - return nil - } - return &p2ppb.DataColumnSidecarsByRangeRequest{ - StartSlot: r.low, - Count: uint64(r.high.SubSlot(r.low)) + 1, - } -} - var errBlobVerification = errors.New("peer unable to serve aligned BlobSidecarsByRange and BeaconBlockSidecarsByRange responses") var errMissingBlobsForBlockCommitments = errors.Wrap(errBlobVerification, "blobs unavailable for processing block with kzg commitments") @@ -621,120 +614,120 @@ func sortedSliceFromMap(m map[uint64]bool) []uint64 { return result } -// blocksWithMissingDataColumnsBoundaries finds the first and last block in `bwb` that: -// - are in the blob retention period, -// - contain at least one blob, and -// - have at least one missing data column. -func (f *blocksFetcher) blocksWithMissingDataColumnsBoundaries( - bwb []blocks.BlockWithROBlobs, - currentSlot primitives.Slot, - localCustodyColumns map[uint64]bool, -) (bool, int, int, error) { - // Get, regarding the current slot, the minimum slot for which we should serve data columns. - columnWindowStart, err := prysmsync.DataColumnsRPCMinValidSlot(currentSlot) - if err != nil { - return false, 0, 0, errors.Wrap(err, "data columns RPC min valid slot") - } +type bwbSlice struct { + start, end int + dataColumns map[uint64]bool +} - // Find the first block with a slot higher than or equal to columnWindowStart, - firstWindowIndex := -1 - for i := range bwb { - if bwb[i].Block.Block().Slot() >= columnWindowStart { - firstWindowIndex = i - break - } +// buildBwbSlices builds slices of `bwb` that aims to optimize the count of +// by range requests needed to fetch missing data columns. +func buildBwbSlices( + bwbs []blocks.BlockWithROBlobs, + missingColumnsByRoot map[[fieldparams.RootLength]byte]map[uint64]bool, +) ([]bwbSlice, error) { + // Return early if there are no blocks to process. + if len(bwbs) == 0 { + return []bwbSlice{}, nil } - if firstWindowIndex == -1 { - // There is no block with slot greater than or equal to columnWindowStart. - return false, 0, 0, nil + // It's safe to get the first item of the slice since we've already checked that it's not empty. + firstROBlock := bwbs[0].Block + firstBlockRoot := firstROBlock.Root() + + previousMissingDataColumns := map[uint64]bool{} + + if missing, ok := missingColumnsByRoot[firstBlockRoot]; ok { + previousMissingDataColumns = missing } - // Find the first block which contains blob commitments and for which some data columns are missing. - firstIndex := -1 - for i := firstWindowIndex; i < len(bwb); i++ { - // Is there any blob commitment in this block? - commits, err := bwb[i].Block.Block().Body().BlobKzgCommitments() - if err != nil { - return false, 0, 0, errors.Wrap(err, "blob KZG commitments") - } + previousBlockSlot := firstROBlock.Block().Slot() + previousStartIndex := 0 - if len(commits) == 0 { - continue - } + const offset = 1 - // Is there at least one column we should custody that is not in our store? - root := bwb[i].Block.Root() - allColumnsAreAvailable := f.bs.Summary(root).AllDataColumnsAvailable(localCustodyColumns) + result := make([]bwbSlice, 0, 1) + for currentIndexWithoutOffest, bwb := range bwbs[offset:] { + currentIndex := currentIndexWithoutOffest + offset + // Extract the ROBlock from the blockWithROBlob. + currentROBlock := bwb.Block - if !allColumnsAreAvailable { - firstIndex = i - break - } - } + // Extract the current block from the current ROBlock. + currentBlock := currentROBlock.Block() - if firstIndex == -1 { - // There is no block with at least one missing data column. - return false, 0, 0, nil - } + // Extract the slot from the block. + currentBlockSlot := currentBlock.Slot() - // Find the last block which contains blob commitments and for which some data columns are missing. - lastIndex := len(bwb) - 1 - for i := lastIndex; i >= firstIndex; i-- { - // Is there any blob commitment in this block? - commits, err := bwb[i].Block.Block().Body().BlobKzgCommitments() + if currentBlockSlot < previousBlockSlot { + return nil, errors.New("blocks are not sorted by slot") + } + + // Extract KZG commitments count from the current block body + currentBlockkzgCommitments, err := currentBlock.Body().BlobKzgCommitments() if err != nil { - return false, 0, 0, errors.Wrap(err, "blob KZG commitments") + return nil, errors.Wrap(err, "blob KZG commitments") } - if len(commits) == 0 { + // Compute the count of KZG commitments. + currentBlockKzgCommitmentCount := len(currentBlockkzgCommitments) + + // Skip blocks without commitments. + if currentBlockKzgCommitmentCount == 0 { + previousBlockSlot = currentBlockSlot continue } - // Is there at least one column we should custody that is not in our store? - root := bwb[i].Block.Root() - allColumnsAreAvailable := f.bs.Summary(root).AllDataColumnsAvailable(localCustodyColumns) + // Extract the current block root from the current ROBlock. + currentBlockRoot := currentROBlock.Root() - if !allColumnsAreAvailable { - lastIndex = i - break - } - } + // Get the missing data columns for the current block. + missingDataColumns := missingColumnsByRoot[currentBlockRoot] - return true, firstIndex, lastIndex, nil -} + // Compute if the missing data columns differ. + missingDataColumnsDiffer := uint64MapDiffer(previousMissingDataColumns, missingDataColumns) -// custodyAllNeededColumns filter `inputPeers` that custody all columns in `columns`. -func (f *blocksFetcher) custodyAllNeededColumns(inputPeers map[peer.ID]bool, columns map[uint64]bool) (map[peer.ID]bool, error) { - outputPeers := make(map[peer.ID]bool, len(inputPeers)) + // Check if there is a gap or if the missing data columns differ. + if missingDataColumnsDiffer { + // Append the slice to the result. + slice := bwbSlice{ + start: previousStartIndex, + end: currentIndex - 1, + dataColumns: previousMissingDataColumns, + } -loop: - for peer := range inputPeers { - // Get the node ID from the peer ID. - nodeID, err := p2p.ConvertPeerIDToNodeID(peer) - if err != nil { - return nil, errors.Wrap(err, "convert peer ID to node ID") + result = append(result, slice) + + previousStartIndex = currentIndex + previousMissingDataColumns = missingDataColumns } - // Get the custody columns count from the peer. - custodyCount := f.p2p.DataColumnsCustodyCountFromRemotePeer(peer) + previousBlockSlot = currentBlockSlot + } - // Get the custody columns from the peer. - remoteCustodyColumns, err := peerdas.CustodyColumns(nodeID, custodyCount) - if err != nil { - return nil, errors.Wrap(err, "custody columns") - } + // Append the last slice to the result. + lastSlice := bwbSlice{ + start: previousStartIndex, + end: len(bwbs) - 1, + dataColumns: previousMissingDataColumns, + } - for column := range columns { - if !remoteCustodyColumns[column] { - continue loop - } - } + result = append(result, lastSlice) + + return result, nil +} + +// uint64MapDiffer returns true if the two maps differ. +func uint64MapDiffer(left, right map[uint64]bool) bool { + if len(left) != len(right) { + return true + } - outputPeers[peer] = true + for k := range left { + if !right[k] { + return true + } } - return outputPeers, nil + return false } // custodyColumns returns the columns we should custody. @@ -754,49 +747,66 @@ func (f *blocksFetcher) custodyColumns() (map[uint64]bool, error) { return localCustodyColumns, nil } -// missingColumnsFromRoot returns the missing columns indexed by root. +// missingColumnsFromRoot computes the columns corresponding to blocks in `bwbs` that +// we should custody and that are not in our store. +// The result is indexed by root. func (f *blocksFetcher) missingColumnsFromRoot( custodyColumns map[uint64]bool, - bwb []blocks.BlockWithROBlobs, + minSlot primitives.Slot, + bwbs []blocks.BlockWithROBlobs, ) (map[[fieldparams.RootLength]byte]map[uint64]bool, error) { - result := make(map[[fieldparams.RootLength]byte]map[uint64]bool) - for i := 0; i < len(bwb); i++ { - block := bwb[i].Block + missingColumnsByRoot := make(map[[fieldparams.RootLength]byte]map[uint64]bool) + for _, bwb := range bwbs { + // Extract the roblock from the roblock with RO blobs. + roblock := bwb.Block + + // Extract the block from the roblock. + block := roblock.Block() - // Retrieve the blob KZG commitments. - commitments, err := block.Block().Body().BlobKzgCommitments() + // Extract the slot of the block. + blockSlot := block.Slot() + + // Skip if the block slot is lower than the column window start. + if blockSlot < minSlot { + continue + } + + // Retrieve the blob KZG kzgCommitments. + kzgCommitments, err := roblock.Block().Body().BlobKzgCommitments() if err != nil { return nil, errors.Wrap(err, "blob KZG commitments") } - // Skip if there are no commitments. - if len(commitments) == 0 { + // Skip if there are no KZG commitments. + if len(kzgCommitments) == 0 { continue } - // Retrieve the root. - root := block.Root() + // Extract the block root. + root := roblock.Root() + + // Retrieve the summary for the root. + summary := f.bs.Summary(root) + // Compute the set of missing columns. for column := range custodyColumns { - // If there is at least one commitment for this block and if a column we should custody - // is not in our store, then we should retrieve it. - if !f.bs.Summary(root).HasDataColumnIndex(column) { - if _, ok := result[root]; !ok { - result[root] = make(map[uint64]bool) + if !summary.HasDataColumnIndex(column) { + if _, ok := missingColumnsByRoot[root]; !ok { + missingColumnsByRoot[root] = make(map[uint64]bool) } - result[root][column] = true + missingColumnsByRoot[root][column] = true } } } - return result, nil + return missingColumnsByRoot, nil } // indicesFromRoot returns the indices indexed by root. -func indicesFromRoot(bwb []blocks.BlockWithROBlobs) map[[fieldparams.RootLength]byte][]int { - result := make(map[[fieldparams.RootLength]byte][]int, len(bwb)) - for i := 0; i < len(bwb); i++ { - root := bwb[i].Block.Root() +func indicesFromRoot(bwbs []blocks.BlockWithROBlobs) map[[fieldparams.RootLength]byte][]int { + result := make(map[[fieldparams.RootLength]byte][]int, len(bwbs)) + for i := 0; i < len(bwbs); i++ { + root := bwbs[i].Block.Root() result[root] = append(result[root], i) } @@ -814,421 +824,343 @@ func blockFromRoot(bwb []blocks.BlockWithROBlobs) map[[fieldparams.RootLength]by return result } -// minInt returns the minimum integer in a slice. -func minInt(slice []int) int { - min := math.MaxInt - for _, item := range slice { - if item < min { - min = item - } - } +// fetchDataColumnsFromPeers looks at the blocks in `bwb` and retrieves all +// data columns for with the block has blob commitments, and for which our store is missing data columns +// we should custody. +// This function mutates `bwb` by adding the retrieved data columns. +// Prerequisite: bwb is sorted by slot. +func (f *blocksFetcher) fetchDataColumnsFromPeers( + ctx context.Context, + bwbs []blocks.BlockWithROBlobs, + peers []peer.ID, +) error { + // Time to wait if no peers are available. + const ( + delay = 5 * time.Second // Time to wait before retrying to fetch data columns. + maxIdentifier = 1_000 // Max identifier for the request. + ) - return min -} + // Generate random identifier. + identifier := f.rand.Intn(maxIdentifier) + log := log.WithField("reqIdentifier", identifier) -// maxInt returns the maximum integer in a slice. -func maxInt(slice []int) int { - max := math.MinInt - for _, item := range slice { - if item > max { - max = item - } + // Compute the columns we should custody. + localCustodyColumns, err := f.custodyColumns() + if err != nil { + return errors.Wrap(err, "custody columns") } - return max -} + // Compute the current slot. + currentSlot := f.clock.CurrentSlot() -// requestDataColumnsFromPeers send `request` to each peer in `peers` until a peer returns at least one data column. -func (f *blocksFetcher) requestDataColumnsFromPeers( - ctx context.Context, - request *p2ppb.DataColumnSidecarsByRangeRequest, - peers map[peer.ID]bool, -) ([]blocks.RODataColumn, peer.ID, error) { - peersSlice := make([]peer.ID, 0, len(peers)) - for peer := range peers { - peersSlice = append(peersSlice, peer) + // Compute the minimum slot for which we should serve data columns. + minimumSlot, err := prysmsync.DataColumnsRPCMinValidSlot(currentSlot) + if err != nil { + return errors.Wrap(err, "data columns RPC min valid slot") } - // Shuffle peers to avoid always querying the same peers - f.rand.Shuffle(len(peersSlice), func(i, j int) { - peersSlice[i], peersSlice[j] = peersSlice[j], peersSlice[i] - }) - - var columnsLog interface{} = "all" - columnsCount := uint64(len(request.Columns)) - numberOfColumns := params.BeaconConfig().NumberOfColumns - if columnsCount < numberOfColumns { - columnsLog = request.Columns + // Compute all missing data columns indexed by root. + missingColumnsByRoot, err := f.missingColumnsFromRoot(localCustodyColumns, minimumSlot, bwbs) + if err != nil { + return errors.Wrap(err, "missing columns from root") } - log := log.WithFields(logrus.Fields{ - "start": request.StartSlot, - "count": request.Count, - "columns": columnsLog, - "items": request.Count * columnsCount, - }) - - for _, peer := range peersSlice { - log := log.WithField("peer", peer) - - if ctx.Err() != nil { - return nil, "", ctx.Err() - } - - err := func() error { - l := f.peerLock(peer) - l.Lock() - defer l.Unlock() - - log.WithFields(logrus.Fields{ - "capacity": f.rateLimiter.Remaining(peer.String()), - "score": f.p2p.Peers().Scorers().BlockProviderScorer().FormatScorePretty(peer), - }).Debug("Data columns by range - requesting") - - // We're intentionally abusing the block rate limit here, treating data column requests as if they were block requests. - // Since column requests take more bandwidth than blocks, we should improve how we account for the different kinds - // of requests, more in proportion to the cost of serving them. - if f.rateLimiter.Remaining(peer.String()) < int64(request.Count) { - if err := f.waitForBandwidth(peer, request.Count); err != nil { - return errors.Wrap(err, "wait for bandwidth") - } - } - - f.rateLimiter.Add(peer.String(), int64(request.Count)) + // Return early if there are no missing data columns. + if len(missingColumnsByRoot) == 0 { + return nil + } - return nil - }() + // Log the start of the process. + start := time.Now() + log.Debug("Fetch data columns from peers - start") + for len(missingColumnsByRoot) > 0 { + // Compute the optimal slices of `bwb` to minimize the number of by range returned columns. + bwbsSlices, err := buildBwbSlices(bwbs, missingColumnsByRoot) if err != nil { - log.WithError(err).Warning("Data columns by range - could not wait for bandwidth") - continue + return errors.Wrap(err, "build bwb slices") } - roDataColumns, err := prysmsync.SendDataColumnsByRangeRequest(ctx, f.clock, f.p2p, peer, f.ctxMap, request) - if err != nil { - log.WithError(err).Warning("Data columns by range - could not send data columns by range request") - continue - } + outerLoop: + for _, bwbsSlice := range bwbsSlices { + lastSlot := bwbs[bwbsSlice.end].Block.Block().Slot() + dataColumnsSlice := sortedSliceFromMap(bwbsSlice.dataColumns) + dataColumnCount := uint64(len(dataColumnsSlice)) - // If the peer did not return any data columns, go to the next peer. - if len(roDataColumns) == 0 { - log.Debug("Data columns by range - peer did not returned any data columns") + // Filter out slices that are already complete. + if dataColumnCount == 0 { + continue + } - continue - } + // If no peer is specified, get all connected peers. + peersToFilter := peers + if peersToFilter == nil { + peersToFilter = f.p2p.Peers().Connected() + } - // We have received at least one data columns from the peer. This is the happy path. - return roDataColumns, peer, nil - } + // Compute the block count of the request. + startSlot := bwbs[bwbsSlice.start].Block.Block().Slot() + endSlot := bwbs[bwbsSlice.end].Block.Block().Slot() + blockCount := uint64(endSlot - startSlot + 1) - // No peer returned any data columns. This this the unhappy path. - return nil, "", nil -} + filteredPeers, err := f.waitForPeersForDataColumns(ctx, peersToFilter, lastSlot, bwbsSlice.dataColumns, blockCount) + if err != nil { + return errors.Wrap(err, "wait for peers for data columns") + } -// firstLastIndices returns the first and last indices where we have missing columns. -func firstLastIndices( - missingColumnsFromRoot map[[fieldparams.RootLength]byte]map[uint64]bool, - indicesFromRoot map[[fieldparams.RootLength]byte][]int, -) (int, int) { - firstIndex, lastIndex := math.MaxInt, -1 - for root := range missingColumnsFromRoot { - indices := indicesFromRoot[root] - - index := minInt(indices) - if index < firstIndex { - firstIndex = index - } + // Build the request. + request := &p2ppb.DataColumnSidecarsByRangeRequest{ + StartSlot: startSlot, + Count: blockCount, + Columns: dataColumnsSlice, + } - index = maxInt(indices) - if index > lastIndex { - lastIndex = index - } - } + // Get `bwbs` indices indexed by root. + indicesByRoot := indicesFromRoot(bwbs) - return firstIndex, lastIndex -} + // Get blocks indexed by root. + blocksByRoot := blockFromRoot(bwbs) -// processRetrievedDataColumns processes the retrieved data columns. -// This function: -// - Mutate `bwb` by adding the retrieved data columns. -// - Mutate `missingColumnsFromRoot` by removing the columns that have been retrieved. -func processRetrievedDataColumns( - roDataColumns []blocks.RODataColumn, - blockFromRoot map[[fieldparams.RootLength]byte]blocks.ROBlock, - indicesFromRoot map[[fieldparams.RootLength]byte][]int, - missingColumnsFromRoot map[[fieldparams.RootLength]byte]map[uint64]bool, - bwb []blocks.BlockWithROBlobs, - colVerifier verification.NewColumnVerifier, -) { - retrievedColumnsFromRoot := make(map[[fieldparams.RootLength]byte]map[uint64]bool) - - // Verify and populate columns - for i := range roDataColumns { - dataColumn := roDataColumns[i] - - root := dataColumn.BlockRoot() - columnIndex := dataColumn.ColumnIndex - - missingColumns, ok := missingColumnsFromRoot[root] - if !ok { - continue - } + // Prepare nice log fields. + var columnsLog interface{} = "all" + numberOfColuns := params.BeaconConfig().NumberOfColumns + if dataColumnCount < numberOfColuns { + columnsLog = dataColumnsSlice + } - if !missingColumns[columnIndex] { - continue - } + log := log.WithFields(logrus.Fields{ + "start": request.StartSlot, + "count": request.Count, + "columns": columnsLog, + }) - // Verify the data column. - if err := verify.ColumnAlignsWithBlock(dataColumn, blockFromRoot[root], colVerifier); err != nil { - // TODO: Should we downscore the peer for that? - continue - } + // Retrieve the missing data columns from the peers. + for _, peer := range filteredPeers { + success := f.fetchDataColumnFromPeer(ctx, bwbs, missingColumnsByRoot, blocksByRoot, indicesByRoot, peer, request) - // Populate the block with the data column. - for _, index := range indicesFromRoot[root] { - if bwb[index].Columns == nil { - bwb[index].Columns = make([]blocks.RODataColumn, 0) + // If we have successfully retrieved some data columns, continue to the next slice. + if success { + continue outerLoop + } } - bwb[index].Columns = append(bwb[index].Columns, dataColumn) + log.WithField("peers", filteredPeers).Warning("Fetch data columns from peers - no peers among this list returned any valid data columns") } - // Populate the retrieved columns. - if _, ok := retrievedColumnsFromRoot[root]; !ok { - retrievedColumnsFromRoot[root] = make(map[uint64]bool) + if len(missingColumnsByRoot) > 0 { + log.Debug("Fetch data columns from peers - continue") } + } - retrievedColumnsFromRoot[root][columnIndex] = true + // Sort data columns by index. + sortBwbsByColumnIndex(bwbs) - // Remove the column from the missing columns. - delete(missingColumnsFromRoot[root], columnIndex) - if len(missingColumnsFromRoot[root]) == 0 { - delete(missingColumnsFromRoot, root) - } + log.WithField("duration", time.Since(start)).Debug("Fetch data columns from peers - success") + return nil +} + +// sortBwbsByColumnIndex sorts `bwbs` by column index. +func sortBwbsByColumnIndex(bwbs []blocks.BlockWithROBlobs) { + for _, bwb := range bwbs { + sort.Slice(bwb.Columns, func(i, j int) bool { + return bwb.Columns[i].ColumnIndex < bwb.Columns[j].ColumnIndex + }) } } -// retrieveMissingDataColumnsFromPeers retrieves the missing data columns from the peers. -// This function: -// - Mutate `bwb` by adding the retrieved data columns. -// - Mutate `missingColumnsFromRoot` by removing the columns that have been retrieved. -// This function returns when all the missing data columns have been retrieved, -// or when the context is canceled. -func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers( +// waitForPeersForDataColumns filters `peers` to only include peers that are: +// - synced up to `lastSlot`, +// - custody all columns in `dataColumns`, and +// - have bandwidth to serve `blockCount` blocks. +// It waits until at least one peer is available. +func (f *blocksFetcher) waitForPeersForDataColumns( ctx context.Context, - bwb []blocks.BlockWithROBlobs, - missingColumnsFromRoot map[[fieldparams.RootLength]byte]map[uint64]bool, - indicesFromRoot map[[fieldparams.RootLength]byte][]int, peers []peer.ID, -) error { - const ( - delay = 5 * time.Second - batchSize = 512 - ) - - start := time.Now() - log.Debug("Retrieving missing data columns from peers - start") - - for len(missingColumnsFromRoot) > 0 { - if ctx.Err() != nil { - return ctx.Err() - } - - // Get the first and last indices where we have missing columns. - firstIndex, lastIndex := firstLastIndices(missingColumnsFromRoot, indicesFromRoot) - - // Get the first and the last slot. - firstSlot := bwb[firstIndex].Block.Block().Slot() - lastSlot := bwb[lastIndex].Block.Block().Slot() - - // Get the number of blocks to retrieve. - blocksCount := uint64(lastSlot - firstSlot + 1) - - // Get the missing data columns. - missingDataColumns := make(map[uint64]bool) - for _, columns := range missingColumnsFromRoot { - for column := range columns { - missingDataColumns[column] = true - } - } - - // Get a sorted slice of missing data columns. - missingDataColumnsSlice := sortedSliceFromMap(missingDataColumns) - missingDataColumnsCount := uint64(len(missingDataColumnsSlice)) + lastSlot primitives.Slot, + dataColumns map[uint64]bool, + blockCount uint64, +) ([]peer.ID, error) { + // Time to wait before retrying to find new peers. + const delay = 5 * time.Second + + // Filter peers that custody all columns we need and that are synced to the epoch. + filteredPeers, descriptions, err := f.peersWithSlotAndDataColumns(ctx, peers, lastSlot, dataColumns, blockCount) + if err != nil { + return nil, errors.Wrap(err, "peers with slot and data columns") + } - numberOfColumns := params.BeaconConfig().NumberOfColumns - var requestedColumnsLog interface{} = "all" + // Compute data columns count + dataColumnCount := uint64(len(dataColumns)) - if missingDataColumnsCount < numberOfColumns { - requestedColumnsLog = missingDataColumnsSlice - } + // Sort columns. + columnsSlice := sortedSliceFromMap(dataColumns) - // Reduce blocks count until the total number of elements is less than the batch size. - for missingDataColumnsCount*blocksCount > batchSize { - blocksCount /= 2 - lastSlot = firstSlot + primitives.Slot(blocksCount-1) + // Build a nice log field. + var columnsLog interface{} = "all" + numberOfColuns := params.BeaconConfig().NumberOfColumns + if dataColumnCount < numberOfColuns { + columnsLog = columnsSlice + } + + // Wait if no suitable peers are available. + for len(filteredPeers) == 0 { + log. + WithFields(logrus.Fields{ + "peers": peers, + "waitDuration": delay, + "targetSlot": lastSlot, + "columns": columnsLog, + }). + Warning("Fetch data columns from peers - no peers available to retrieve missing data columns, retrying later") + + for _, description := range descriptions { + log.Debug(description) } - // If no peer is specified, get all connected peers. - peersToFilter := peers - if peersToFilter == nil { - peersToFilter = f.p2p.Peers().Connected() - } + time.Sleep(delay) - // Filter peers. - filteredPeers, descriptions, err := f.peersWithSlotAndDataColumns(peersToFilter, lastSlot, missingDataColumns) + filteredPeers, descriptions, err = f.peersWithSlotAndDataColumns(ctx, peers, lastSlot, dataColumns, blockCount) if err != nil { - return errors.Wrap(err, "peers with slot and data columns") - } - - if len(filteredPeers) == 0 { - log. - WithFields(logrus.Fields{ - "peers": peersToFilter, - "filteredPeers": filteredPeers, - "waitDuration": delay, - "targetSlot": lastSlot, - }). - Warning("No peers available to retrieve missing data columns, retrying later") - - // If no peers are available, log the descriptions to help debugging. - for _, description := range descriptions { - log.Debug(description) - } - - time.Sleep(delay) - continue - } - - // Get the first slot for which we should retrieve data columns. - startSlot := bwb[firstIndex].Block.Block().Slot() - - // Build the request. - request := &p2ppb.DataColumnSidecarsByRangeRequest{ - StartSlot: startSlot, - Count: blocksCount, - Columns: missingDataColumnsSlice, + return nil, errors.Wrap(err, "peers with slot and data columns") } + } - // Get all the blocks and data columns we should retrieve. - blockFromRoot := blockFromRoot(bwb[firstIndex : lastIndex+1]) - - // Iterate requests over all peers, and exits as soon as at least one data column is retrieved. - roDataColumns, peer, err := f.requestDataColumnsFromPeers(ctx, request, filteredPeers) - if err != nil { - return errors.Wrap(err, "request data columns from peers") - } + return filteredPeers, nil +} - if len(roDataColumns) == 0 { - log. - WithFields(logrus.Fields{ - "peers": peersToFilter, - "filteredPeers": filteredPeers, - "delay": delay, - "start": startSlot, - "count": blocksCount, - "columns": requestedColumnsLog, - }). - Warning("No data columns returned from any peer, retrying later") - - time.Sleep(delay) - continue - } +// processDataColumn mutates `bwbs` argument by adding the data column, +// and mutates `missingColumnsByRoot` by removing the data column if the +// data column passes all the check. +func processDataColumn( + bwbs []blocks.BlockWithROBlobs, + missingColumnsByRoot map[[fieldparams.RootLength]byte]map[uint64]bool, + columnVerifier verification.NewColumnVerifier, + blocksByRoot map[[fieldparams.RootLength]byte]blocks.ROBlock, + indicesByRoot map[[fieldparams.RootLength]byte][]int, + dataColumn blocks.RODataColumn, +) bool { + // Extract the block root from the data column. + blockRoot := dataColumn.BlockRoot() - // Process the retrieved data columns. - processRetrievedDataColumns(roDataColumns, blockFromRoot, indicesFromRoot, missingColumnsFromRoot, bwb, f.cv) + // Find the position of the block in `bwbs` that corresponds to this block root. + indices, ok := indicesByRoot[blockRoot] + if !ok { + // The peer returned a data column that we did not expect. + // This is among others possible when the peer is not on the same fork. + return false + } - // Log missing columns after request. - if len(missingColumnsFromRoot) > 0 { - for root, missingColumns := range missingColumnsFromRoot { - slot := blockFromRoot[root].Block().Slot() + // Extract the block from the block root. + block, ok := blocksByRoot[blockRoot] + if !ok { + // This should never happen. + log.WithField("blockRoot", fmt.Sprintf("%#x", blockRoot)).Error("Fetch data columns from peers - block not found") + return false + } - // It's normal to have missing columns for slots higher than the last requested slot. - // Skip logging those. - if slot > lastSlot { - continue - } + // Verify the data column. + if err := verify.ColumnAlignsWithBlock(dataColumn, block, columnVerifier); err != nil { + log.WithError(err).WithFields(logrus.Fields{ + "root": fmt.Sprintf("%#x", blockRoot), + "slot": block.Block().Slot(), + "column": dataColumn.ColumnIndex, + }).Warning("Fetch data columns from peers - fetched data column does not align with block") - missingColumnsCount := uint64(len(missingColumns)) - var missingColumnsLog interface{} = "all" + // TODO: Should we downscore the peer for that? + return false + } - if missingColumnsCount < numberOfColumns { - missingColumnsLog = sortedSliceFromMap(missingColumns) - } + // Populate the corresponding items in `bwbs`. + for _, index := range indices { + bwbs[index].Columns = append(bwbs[index].Columns, dataColumn) + } - log.WithFields(logrus.Fields{ - "peer": peer, - "root": fmt.Sprintf("%#x", root), - "slot": slot, - "missingColumns": missingColumnsLog, - "requestedColumns": requestedColumnsLog, - "requestedStart": startSlot, - "requestedCount": blocksCount, - }).Debug("Peer did not return all requested data columns") - } - } + // Remove the column from the missing columns. + delete(missingColumnsByRoot[blockRoot], dataColumn.ColumnIndex) + if len(missingColumnsByRoot[blockRoot]) == 0 { + delete(missingColumnsByRoot, blockRoot) } - log.WithField("duration", time.Since(start)).Debug("Retrieving missing data columns from peers - success") - return nil + return true } -// fetchDataColumnsFromPeers looks at the blocks in `bwb` and retrieves all -// data columns for with the block has blob commitments, and for which our store is missing data columns -// we should custody. -// This function mutates `bwb` by adding the retrieved data columns. -// Preqrequisite: bwb is sorted by slot. -func (f *blocksFetcher) fetchDataColumnsFromPeers( +// fetchDataColumnsFromPeer sends `request` to `peer`, then mutates: +// - `bwbs` by adding the fetched data columns, +// - `missingColumnsByRoot` by removing the fetched data columns. +func (f *blocksFetcher) fetchDataColumnFromPeer( ctx context.Context, - bwb []blocks.BlockWithROBlobs, - peers []peer.ID, -) error { - ctx, span := trace.StartSpan(ctx, "initialsync.fetchColumnsFromPeer") - defer span.End() + bwbs []blocks.BlockWithROBlobs, + missingColumnsByRoot map[[fieldparams.RootLength]byte]map[uint64]bool, + blocksByRoot map[[fieldparams.RootLength]byte]blocks.ROBlock, + indicesByRoot map[[fieldparams.RootLength]byte][]int, + peer peer.ID, + request *p2ppb.DataColumnSidecarsByRangeRequest, +) bool { + // Define useful log field. + log := log.WithField("peer", peer) + + // Wait for peer bandwidth if needed. + if err := func() error { + l := f.peerLock(peer) + l.Lock() + defer l.Unlock() + + remaining := uint64(f.rateLimiter.Remaining(peer.String())) + + // We're intentionally abusing the block rate limit here, treating data column requests as if they were block requests. + // Since column requests take more bandwidth than blocks, we should improve how we account for the different kinds + // of requests, more in proportion to the cost of serving them. + if remaining < request.Count { + log.Debug("Fetch data columns from peers - wait for bandwidth") + if err := f.waitForBandwidth(peer, request.Count); err != nil { + return errors.Wrap(err, "wait for bandwidth") + } + } - // Get the current slot. - currentSlot := f.clock.CurrentSlot() + f.rateLimiter.Add(peer.String(), int64(request.Count)) - // If there is no data columns before deneb. Early return. - if slots.ToEpoch(currentSlot) < params.BeaconConfig().DenebForkEpoch { return nil + }(); err != nil { + log.WithError(err).Warning("Fetch data columns from peers - could not wait for bandwidth") + return false } - // Get the columns we custody. - localCustodyColumns, err := f.custodyColumns() + // Send the request to the peer. + requestStart := time.Now() + roDataColumns, err := prysmsync.SendDataColumnsByRangeRequest(ctx, f.clock, f.p2p, peer, f.ctxMap, request) if err != nil { - return errors.Wrap(err, "custody columns") + log.WithError(err).Warning("Fetch data columns from peers - could not send data columns by range request") + return false } - // Find the first and last block in `bwb` that: - // - are in the blob retention period, - // - contain at least one blob, and - // - have at least one missing data column. - someColumnsAreMissing, firstIndex, lastIndex, err := f.blocksWithMissingDataColumnsBoundaries(bwb, currentSlot, localCustodyColumns) - if err != nil { - return errors.Wrap(err, "blocks with missing data columns boundaries") - } + requestDuration := time.Since(requestStart) - // If there is no block with missing data columns, early return. - if !someColumnsAreMissing { - return nil + if len(roDataColumns) == 0 { + log.Debug("Fetch data columns from peers - peer did not return any data columns") + return false } - // Get all missing columns indexed by root. - missingColumnsFromRoot, err := f.missingColumnsFromRoot(localCustodyColumns, bwb[firstIndex:lastIndex+1]) - if err != nil { - return errors.Wrap(err, "missing columns from root") - } + globalSuccess := false - // Get all indices indexed by root. - indicesFromRoot := indicesFromRoot(bwb) + for _, dataColumn := range roDataColumns { + success := processDataColumn(bwbs, missingColumnsByRoot, f.cv, blocksByRoot, indicesByRoot, dataColumn) + if success { + globalSuccess = true + } + } - // Retrieve the missing data columns from the peers. - if err := f.retrieveMissingDataColumnsFromPeers(ctx, bwb, missingColumnsFromRoot, indicesFromRoot, peers); err != nil { - return errors.Wrap(err, "retrieve missing data columns from peers") + if !globalSuccess { + log.Debug("Fetch data columns from peers - peer did not return any valid data columns") + return false } - return nil + totalDuration := time.Since(requestStart) + log.WithFields(logrus.Fields{ + "reqDuration": requestDuration, + "totalDuration": totalDuration, + }).Debug("Fetch data columns from peers - got some columns") + + return true } // requestBlocks is a wrapper for handling BeaconBlocksByRangeRequest requests/streams. diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go index 7e0622755f85..a12349f5846c 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go @@ -1328,12 +1328,6 @@ type blockParams struct { hasBlobs bool } -func rootFromUint64(u uint64) [fieldparams.RootLength]byte { - var root [fieldparams.RootLength]byte - binary.LittleEndian.PutUint64(root[:], u) - return root -} - func createPeer(t *testing.T, privateKeyOffset int, custodyCount uint64) (*enr.Record, peer.ID) { privateKeyBytes := make([]byte, 32) for i := 0; i < 32; i++ { @@ -1356,52 +1350,6 @@ func createPeer(t *testing.T, privateKeyOffset int, custodyCount uint64) (*enr.R return record, peerID } -func TestCustodyAllNeededColumns(t *testing.T) { - const dataColumnsCount = 31 - - p2p := p2ptest.NewTestP2P(t) - - dataColumns := make(map[uint64]bool, dataColumnsCount) - for i := range dataColumnsCount { - dataColumns[uint64(i)] = true - } - - custodyCounts := [...]uint64{ - 4 * params.BeaconConfig().CustodyRequirement, - 32 * params.BeaconConfig().CustodyRequirement, - 4 * params.BeaconConfig().CustodyRequirement, - 32 * params.BeaconConfig().CustodyRequirement, - } - - expected := make(map[peer.ID]bool) - - peersID := make(map[peer.ID]bool, len(custodyCounts)) - for _, custodyCount := range custodyCounts { - peerRecord, peerID := createPeer(t, len(peersID), custodyCount) - peersID[peerID] = true - p2p.Peers().Add(peerRecord, peerID, nil, network.DirOutbound) - if custodyCount == 32*params.BeaconConfig().CustodyRequirement { - expected[peerID] = true - } - } - - blocksFetcher := newBlocksFetcher( - context.Background(), - &blocksFetcherConfig{ - p2p: p2p, - }, - ) - - actual, err := blocksFetcher.custodyAllNeededColumns(peersID, dataColumns) - require.NoError(t, err) - - require.Equal(t, len(expected), len(actual)) - for peerID := range expected { - _, ok := actual[peerID] - require.Equal(t, true, ok) - } -} - func TestCustodyColumns(t *testing.T) { blocksFetcher := newBlocksFetcher(context.Background(), &blocksFetcherConfig{ p2p: p2ptest.NewTestP2P(t), @@ -1415,24 +1363,6 @@ func TestCustodyColumns(t *testing.T) { require.Equal(t, int(expected), len(actual)) } -func TestMinInt(t *testing.T) { - input := []int{1, 2, 3, 4, 5, 5, 4, 3, 2, 1} - const expected = 1 - - actual := minInt(input) - - require.Equal(t, expected, actual) -} - -func TestMaxInt(t *testing.T) { - input := []int{1, 2, 3, 4, 5, 5, 4, 3, 2, 1} - const expected = 5 - - actual := maxInt(input) - - require.Equal(t, expected, actual) -} - // deterministicRandomness returns a random bytes array based on the seed func deterministicRandomness(t *testing.T, seed int64) [32]byte { buf := new(bytes.Buffer) @@ -1592,28 +1522,196 @@ func defaultMockChain(t *testing.T, currentSlot uint64) (*mock.ChainService, *st return chain, clock } -func TestFirstLastIndices(t *testing.T) { - missingColumnsFromRoot := map[[fieldparams.RootLength]byte]map[uint64]bool{ - rootFromUint64(42): {1: true, 3: true, 5: true}, - rootFromUint64(43): {2: true, 4: true, 6: true}, - rootFromUint64(44): {7: true, 8: true, 9: true}, +func TestBuildBwbSlices(t *testing.T) { + areBwbSlicesEqual := func(lefts, rights []bwbSlice) bool { + if len(lefts) != len(rights) { + return false + } + + for i := range lefts { + left, right := lefts[i], rights[i] + if left.start != right.start { + return false + } + + if left.end != right.end { + return false + } + + if len(left.dataColumns) != len(right.dataColumns) { + return false + } + + for dataColumn := range left.dataColumns { + if _, ok := right.dataColumns[dataColumn]; !ok { + return false + } + } + } + + return true } - indicesFromRoot := map[[fieldparams.RootLength]byte][]int{ - rootFromUint64(42): {5, 6, 7}, - rootFromUint64(43): {8, 9}, - rootFromUint64(44): {3, 2, 1}, + type missingColumnsWithCommitment struct { + areCommitments bool + missingColumns map[uint64]bool } - const ( - expectedFirst = 1 - expectedLast = 9 - ) + testCases := []struct { + name string + + // input + missingColumnsWithCommitments []*missingColumnsWithCommitment + + // output + bwbSlices []bwbSlice + }{ + { + name: "no item", + missingColumnsWithCommitments: []*missingColumnsWithCommitment{}, + bwbSlices: []bwbSlice{}, + }, + { + name: "one item, - no missing columns", + missingColumnsWithCommitments: []*missingColumnsWithCommitment{{areCommitments: true, missingColumns: map[uint64]bool{}}}, + bwbSlices: []bwbSlice{{start: 0, end: 0, dataColumns: map[uint64]bool{}}}, + }, + { + name: "one item - some missing columns", + missingColumnsWithCommitments: []*missingColumnsWithCommitment{{areCommitments: true, missingColumns: map[uint64]bool{1: true, 3: true, 5: true}}}, + bwbSlices: []bwbSlice{{start: 0, end: 0, dataColumns: map[uint64]bool{1: true, 3: true, 5: true}}}, + }, + { + name: "two items - no break", + missingColumnsWithCommitments: []*missingColumnsWithCommitment{ + {areCommitments: true, missingColumns: map[uint64]bool{1: true, 3: true, 5: true}}, + {areCommitments: true, missingColumns: map[uint64]bool{1: true, 3: true, 5: true}}, + }, + bwbSlices: []bwbSlice{{start: 0, end: 1, dataColumns: map[uint64]bool{1: true, 3: true, 5: true}}}, + }, + { + name: "three items - no break", + missingColumnsWithCommitments: []*missingColumnsWithCommitment{ + {areCommitments: true, missingColumns: map[uint64]bool{1: true, 3: true, 5: true}}, + {areCommitments: true, missingColumns: map[uint64]bool{1: true, 3: true, 5: true}}, + {areCommitments: true, missingColumns: map[uint64]bool{1: true, 3: true, 5: true}}, + }, + bwbSlices: []bwbSlice{{start: 0, end: 2, dataColumns: map[uint64]bool{1: true, 3: true, 5: true}}}, + }, + { + name: "five items - columns break", + missingColumnsWithCommitments: []*missingColumnsWithCommitment{ + {areCommitments: true, missingColumns: map[uint64]bool{1: true, 3: true, 5: true}}, + {areCommitments: true, missingColumns: map[uint64]bool{1: true, 3: true, 5: true}}, + {areCommitments: true, missingColumns: map[uint64]bool{1: true, 3: true}}, + {areCommitments: true, missingColumns: map[uint64]bool{1: true, 3: true}}, + {areCommitments: true, missingColumns: map[uint64]bool{}}, + }, + bwbSlices: []bwbSlice{ + {start: 0, end: 1, dataColumns: map[uint64]bool{1: true, 3: true, 5: true}}, + {start: 2, end: 3, dataColumns: map[uint64]bool{1: true, 3: true}}, + {start: 4, end: 4, dataColumns: map[uint64]bool{}}, + }, + }, + { + name: "seven items - gap", + missingColumnsWithCommitments: []*missingColumnsWithCommitment{ + {areCommitments: true, missingColumns: map[uint64]bool{1: true, 3: true, 5: true}}, // 0 + {areCommitments: true, missingColumns: map[uint64]bool{1: true, 3: true, 5: true}}, // 1 + nil, + nil, + {areCommitments: true, missingColumns: map[uint64]bool{1: true, 3: true, 5: true}}, // 2 + {areCommitments: true, missingColumns: map[uint64]bool{1: true, 3: true, 5: true}}, // 3 + {areCommitments: true, missingColumns: map[uint64]bool{1: true, 3: true, 5: true}}, // 4 + }, + bwbSlices: []bwbSlice{ + {start: 0, end: 4, dataColumns: map[uint64]bool{1: true, 3: true, 5: true}}, + }, + }, + { + name: "seven items - only breaks", + missingColumnsWithCommitments: []*missingColumnsWithCommitment{ + {areCommitments: true, missingColumns: map[uint64]bool{}}, // 0 + {areCommitments: true, missingColumns: map[uint64]bool{1: true, 3: true, 5: true}}, // 1 + nil, + {areCommitments: true, missingColumns: map[uint64]bool{1: true, 3: true, 5: true}}, // 2 + {areCommitments: true, missingColumns: map[uint64]bool{2: true}}, // 3 + {areCommitments: true, missingColumns: map[uint64]bool{}}, // 4 + }, + bwbSlices: []bwbSlice{ + {start: 0, end: 0, dataColumns: map[uint64]bool{}}, + {start: 1, end: 2, dataColumns: map[uint64]bool{1: true, 3: true, 5: true}}, + {start: 3, end: 3, dataColumns: map[uint64]bool{2: true}}, + {start: 4, end: 4, dataColumns: map[uint64]bool{}}, + }, + }, + { + name: "thirteen items - some blocks without commitments", + missingColumnsWithCommitments: []*missingColumnsWithCommitment{ + {areCommitments: true, missingColumns: map[uint64]bool{1: true, 3: true, 5: true}}, // 0 + {areCommitments: true, missingColumns: map[uint64]bool{1: true, 3: true, 5: true}}, // 1 + nil, + {areCommitments: true, missingColumns: map[uint64]bool{1: true, 3: true, 5: true}}, // 2 + {areCommitments: true, missingColumns: map[uint64]bool{2: true, 4: true}}, // 3 + {areCommitments: false, missingColumns: nil}, // 4 + {areCommitments: false, missingColumns: nil}, // 5 + {areCommitments: true, missingColumns: map[uint64]bool{2: true, 4: true}}, // 6 + nil, + nil, + {areCommitments: true, missingColumns: map[uint64]bool{1: true}}, // 7 + {areCommitments: true, missingColumns: map[uint64]bool{1: true}}, // 8 + {areCommitments: false, missingColumns: nil}, // 9 + {areCommitments: false, missingColumns: nil}, // 10 + + }, + bwbSlices: []bwbSlice{ + {start: 0, end: 2, dataColumns: map[uint64]bool{1: true, 3: true, 5: true}}, + {start: 3, end: 6, dataColumns: map[uint64]bool{2: true, 4: true}}, + {start: 7, end: 10, dataColumns: map[uint64]bool{1: true}}, + }, + }, + } + + // We don't care about the actual content of commitments, so we use a fake commitment. + fakeCommitment := make([]byte, 48) + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + bwbs := make([]blocks.BlockWithROBlobs, 0, len(tt.missingColumnsWithCommitments)) + missingColumnsByRoot := make(map[[fieldparams.RootLength]byte]map[uint64]bool, len(tt.missingColumnsWithCommitments)) + for i, missingColumnsWithCommitments := range tt.missingColumnsWithCommitments { + if missingColumnsWithCommitments == nil { + continue + } - actualFirst, actualLast := firstLastIndices(missingColumnsFromRoot, indicesFromRoot) + missingColumns := missingColumnsWithCommitments.missingColumns - require.Equal(t, expectedFirst, actualFirst) - require.Equal(t, expectedLast, actualLast) + pbSignedBeaconBlock := util.NewBeaconBlockDeneb() + + signedBeaconBlock, err := blocks.NewSignedBeaconBlock(pbSignedBeaconBlock) + require.NoError(t, err) + + signedBeaconBlock.SetSlot(primitives.Slot(i)) + + if missingColumnsWithCommitments.areCommitments { + err := signedBeaconBlock.SetBlobKzgCommitments([][]byte{fakeCommitment}) + require.NoError(t, err) + } + + roBlock, err := blocks.NewROBlock(signedBeaconBlock) + require.NoError(t, err) + + bwb := blocks.BlockWithROBlobs{Block: roBlock} + bwbs = append(bwbs, bwb) + + blockRoot := bwb.Block.Root() + missingColumnsByRoot[blockRoot] = missingColumns + } + bwbSlices, err := buildBwbSlices(bwbs, missingColumnsByRoot) + require.NoError(t, err) + require.Equal(t, true, areBwbSlicesEqual(tt.bwbSlices, bwbSlices)) + }) + } } func TestFetchDataColumnsFromPeers(t *testing.T) { @@ -1636,15 +1734,16 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { // Current slot. currentSlot uint64 - // Blocks with blobs parameters. + // Blocks with blobs parameters that will be used as `bwb` parameter. blocksParams []blockParams - // - Position in the slice: Stored data columns in the store for the - // nth position in the input bwb. - // - Key : Column index - // - Value : Always true + // What data columns do we store for the block in the same position in blocksParams. + // len(storedDataColumns) has to be the same than len(blocksParams). storedDataColumns []map[int]bool + // Each item in the list represents a peer. + // We can specify what the peer will respond to each data column by range request. + // For the exact same data columns by range request, the peer will respond in the order they are specified. peersParams []peerParams // OUTPUTS @@ -1657,9 +1756,9 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { name: "Deneb fork epoch not reached", denebForkEpoch: primitives.Epoch(math.MaxUint64), blocksParams: []blockParams{ - {slot: 1, hasBlobs: true}, - {slot: 2, hasBlobs: true}, - {slot: 3, hasBlobs: true}, + {slot: 1, hasBlobs: true}, // Before deneb fork epoch + {slot: 2, hasBlobs: true}, // Before deneb fork epoch + {slot: 3, hasBlobs: true}, // Before deneb fork epoch }, addedRODataColumns: [][]int{nil, nil, nil}, }, @@ -1669,10 +1768,10 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { eip7954ForkEpoch: 1, currentSlot: 40, blocksParams: []blockParams{ - {slot: 25, hasBlobs: false}, - {slot: 26, hasBlobs: false}, - {slot: 27, hasBlobs: false}, - {slot: 28, hasBlobs: false}, + {slot: 25, hasBlobs: false}, // Before EIP-7954 fork epoch + {slot: 26, hasBlobs: false}, // Before EIP-7954 fork epoch + {slot: 27, hasBlobs: false}, // Before EIP-7954 fork epoch + {slot: 28, hasBlobs: false}, // Before EIP-7954 fork epoch }, addedRODataColumns: [][]int{nil, nil, nil, nil}, }, @@ -1682,9 +1781,9 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { eip7954ForkEpoch: 1, currentSlot: 40, blocksParams: []blockParams{ - {slot: 25, hasBlobs: false}, - {slot: 26, hasBlobs: true}, - {slot: 27, hasBlobs: true}, + {slot: 25, hasBlobs: false}, // Before EIP-7954 fork epoch + {slot: 26, hasBlobs: true}, // Before EIP-7954 fork epoch + {slot: 27, hasBlobs: true}, // Before EIP-7954 fork epoch {slot: 32, hasBlobs: false}, {slot: 33, hasBlobs: false}, }, @@ -1696,9 +1795,9 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { eip7954ForkEpoch: 1, currentSlot: 40, blocksParams: []blockParams{ - {slot: 25, hasBlobs: false}, - {slot: 26, hasBlobs: true}, - {slot: 27, hasBlobs: true}, + {slot: 25, hasBlobs: false}, // Before EIP-7954 fork epoch + {slot: 26, hasBlobs: true}, // Before EIP-7954 fork epoch + {slot: 27, hasBlobs: true}, // Before EIP-7954 fork epoch {slot: 32, hasBlobs: false}, {slot: 33, hasBlobs: true}, }, @@ -1717,8 +1816,8 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { eip7954ForkEpoch: 1, currentSlot: 40, blocksParams: []blockParams{ - {slot: 25, hasBlobs: false}, - {slot: 27, hasBlobs: true}, + {slot: 25, hasBlobs: false}, // Before EIP-7954 fork epoch + {slot: 27, hasBlobs: true}, // Before EIP-7954 fork epoch {slot: 32, hasBlobs: false}, {slot: 33, hasBlobs: true}, {slot: 34, hasBlobs: true}, @@ -1729,184 +1828,192 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { {slot: 39, hasBlobs: false}, }, storedDataColumns: []map[int]bool{ - nil, - nil, - nil, - {6: true, 38: true, 70: true, 102: true}, - {6: true, 70: true}, - nil, - {6: true, 38: true, 70: true, 102: true}, - {38: true, 102: true}, - {6: true, 38: true, 70: true, 102: true}, - nil, + nil, // Slot 25 + nil, // Slot 27 + nil, // Slot 32 + {6: true, 38: true}, // Slot 33 + {6: true, 38: true}, // Slot 34 + nil, // Slot 35 + {6: true, 38: true}, // Slot 36 + {38: true, 102: true}, // Slot 37 + {6: true, 38: true, 70: true, 102: true}, // Slot 38 + nil, // Slot 39 }, peersParams: []peerParams{ { + // This peer custodies all the columns we need but + // will never respond any column. csc: 128, toRespond: map[string][][]responseParams{ (ðpb.DataColumnSidecarsByRangeRequest{ - StartSlot: 34, + StartSlot: 33, Count: 4, - Columns: []uint64{6, 38, 70, 102}, - }).String(): { - { - {slot: 34, columnIndex: 6}, - {slot: 34, columnIndex: 38}, - {slot: 34, columnIndex: 70}, - {slot: 34, columnIndex: 102}, - {slot: 36, columnIndex: 6}, - {slot: 36, columnIndex: 38}, - {slot: 36, columnIndex: 70}, - {slot: 36, columnIndex: 102}, - {slot: 37, columnIndex: 6}, - {slot: 37, columnIndex: 38}, - {slot: 37, columnIndex: 70}, - {slot: 37, columnIndex: 102}, - }, - }, + Columns: []uint64{70, 102}, + }).String(): {{}}, + (ðpb.DataColumnSidecarsByRangeRequest{ + StartSlot: 37, + Count: 1, + Columns: []uint64{6, 70}, + }).String(): {{}}, }, }, { csc: 128, toRespond: map[string][][]responseParams{ (ðpb.DataColumnSidecarsByRangeRequest{ - StartSlot: 34, + StartSlot: 33, Count: 4, - Columns: []uint64{6, 38, 70, 102}, + Columns: []uint64{70, 102}, }).String(): { { - {slot: 34, columnIndex: 6}, - {slot: 34, columnIndex: 38}, + {slot: 33, columnIndex: 70}, + {slot: 33, columnIndex: 102}, {slot: 34, columnIndex: 70}, {slot: 34, columnIndex: 102}, - {slot: 36, columnIndex: 6}, - {slot: 36, columnIndex: 38}, {slot: 36, columnIndex: 70}, {slot: 36, columnIndex: 102}, + }, + }, + (ðpb.DataColumnSidecarsByRangeRequest{ + StartSlot: 37, + Count: 1, + Columns: []uint64{6, 70}, + }).String(): { + { {slot: 37, columnIndex: 6}, - {slot: 37, columnIndex: 38}, {slot: 37, columnIndex: 70}, - {slot: 37, columnIndex: 102}, }, }, }, }, { + // This peer custodies all the columns we need but + // will never respond any column. csc: 128, toRespond: map[string][][]responseParams{ (ðpb.DataColumnSidecarsByRangeRequest{ - StartSlot: 34, + StartSlot: 33, Count: 4, - Columns: []uint64{6, 38, 70, 102}, - }).String(): { - {}, - }, + Columns: []uint64{70, 102}, + }).String(): {{}}, + (ðpb.DataColumnSidecarsByRangeRequest{ + StartSlot: 37, + Count: 1, + Columns: []uint64{6, 70}, + }).String(): {{}}, }, }, { - csc: 128, - toRespond: map[string][][]responseParams{ - (ðpb.DataColumnSidecarsByRangeRequest{ - StartSlot: 34, - Count: 4, - Columns: []uint64{6, 38, 70, 102}, - }).String(): { - {}, - }, - }, + // This peer should not be requested. + csc: 2, + toRespond: map[string][][]responseParams{}, }, }, addedRODataColumns: [][]int{ - nil, - nil, - nil, - nil, - {38, 102}, - nil, - nil, - {6, 70}, - nil, - nil, + nil, // Slot 25 + nil, // Slot 27 + nil, // Slot 32 + {70, 102}, // Slot 33 + {70, 102}, // Slot 34 + nil, // Slot 35 + {70, 102}, // Slot 36 + {6, 70}, // Slot 37 + nil, // Slot 38 + nil, // Slot 39 }, }, { - name: "Some blocks with blobs with missing data columns - several rounds needed", + name: "Some blocks with blobs with missing data columns - partial responses", denebForkEpoch: 0, eip7954ForkEpoch: 1, currentSlot: 40, blocksParams: []blockParams{ - {slot: 25, hasBlobs: false}, - {slot: 27, hasBlobs: true}, - {slot: 32, hasBlobs: false}, {slot: 33, hasBlobs: true}, {slot: 34, hasBlobs: true}, {slot: 35, hasBlobs: false}, - {slot: 37, hasBlobs: true}, - {slot: 38, hasBlobs: true}, - {slot: 39, hasBlobs: false}, + {slot: 36, hasBlobs: true}, }, storedDataColumns: []map[int]bool{ - nil, - nil, - nil, - {6: true, 38: true, 70: true, 102: true}, - {6: true, 70: true}, - nil, - {38: true, 102: true}, - {6: true, 38: true, 70: true, 102: true}, - nil, + {6: true, 38: true}, // Slot 33 + {6: true, 38: true}, // Slot 34 + nil, // Slot 35 + {6: true, 38: true}, // Slot 36 }, peersParams: []peerParams{ { csc: 128, toRespond: map[string][][]responseParams{ (ðpb.DataColumnSidecarsByRangeRequest{ - StartSlot: 34, + StartSlot: 33, Count: 4, - Columns: []uint64{6, 38, 70, 102}, + Columns: []uint64{70, 102}, }).String(): { { - {slot: 34, columnIndex: 38}, + {slot: 33, columnIndex: 70}, + {slot: 34, columnIndex: 70}, + {slot: 36, columnIndex: 70}, }, }, (ðpb.DataColumnSidecarsByRangeRequest{ - StartSlot: 34, + StartSlot: 33, Count: 4, - Columns: []uint64{6, 70, 102}, + Columns: []uint64{70}, }).String(): { { + {slot: 33, columnIndex: 70}, + {slot: 34, columnIndex: 70}, + {slot: 36, columnIndex: 70}, + }, + }, + (ðpb.DataColumnSidecarsByRangeRequest{ + StartSlot: 33, + Count: 4, + Columns: []uint64{102}, + }).String(): {{}}, + }, + }, + { + csc: 128, + toRespond: map[string][][]responseParams{ + (ðpb.DataColumnSidecarsByRangeRequest{ + StartSlot: 33, + Count: 4, + Columns: []uint64{70, 102}, + }).String(): { + { + {slot: 33, columnIndex: 102}, {slot: 34, columnIndex: 102}, + {slot: 36, columnIndex: 102}, }, }, (ðpb.DataColumnSidecarsByRangeRequest{ - StartSlot: 37, - Count: 1, - Columns: []uint64{6, 70}, + StartSlot: 33, + Count: 4, + Columns: []uint64{102}, }).String(): { { - {slot: 37, columnIndex: 6}, - {slot: 37, columnIndex: 70}, + {slot: 33, columnIndex: 102}, + {slot: 34, columnIndex: 102}, + {slot: 36, columnIndex: 102}, }, }, + (ðpb.DataColumnSidecarsByRangeRequest{ + StartSlot: 33, + Count: 4, + Columns: []uint64{70}, + }).String(): {{}}, }, }, - {csc: 0}, - {csc: 0}, }, addedRODataColumns: [][]int{ - nil, - nil, - nil, - nil, - {38, 102}, - nil, - {6, 70}, - nil, - nil, + {70, 102}, // Slot 33 + {70, 102}, // Slot 34 + nil, // Slot 35 + {70, 102}, // Slot 36 }, }, { - name: "Some blocks with blobs with missing data columns - no peers response at first", + name: "Some blocks with blobs with missing data columns - first response is invalid", denebForkEpoch: 0, eip7954ForkEpoch: 1, currentSlot: 40, @@ -1925,26 +2032,18 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { Count: 1, Columns: []uint64{6, 70}, }).String(): { - nil, { - {slot: 38, columnIndex: 6}, + {slot: 38, columnIndex: 6, alterate: true}, {slot: 38, columnIndex: 70}, }, }, - }, - }, - { - csc: 128, - toRespond: map[string][][]responseParams{ (ðpb.DataColumnSidecarsByRangeRequest{ StartSlot: 38, Count: 1, - Columns: []uint64{6, 70}, + Columns: []uint64{6}, }).String(): { - nil, { {slot: 38, columnIndex: 6}, - {slot: 38, columnIndex: 70}, }, }, }, @@ -1955,7 +2054,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { }, }, { - name: "Some blocks with blobs with missing data columns - first response is invalid", + name: "Some blocks with blobs with missing data columns - first response is empty", denebForkEpoch: 0, eip7954ForkEpoch: 1, currentSlot: 40, @@ -1974,25 +2073,17 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { Count: 1, Columns: []uint64{6, 70}, }).String(): { - { - {slot: 38, columnIndex: 6, alterate: true}, - {slot: 38, columnIndex: 70}, - }, - }, - (ðpb.DataColumnSidecarsByRangeRequest{ - StartSlot: 38, - Count: 1, - Columns: []uint64{6}, - }).String(): { + {}, { {slot: 38, columnIndex: 6}, + {slot: 38, columnIndex: 70}, }, }, }, }, }, addedRODataColumns: [][]int{ - {70, 6}, + {6, 70}, }, }, } diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go index 88689398be89..38a48a642194 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go @@ -6,7 +6,9 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" p2pTypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" "github.com/prysmaticlabs/prysm/v5/config/params" @@ -369,21 +371,54 @@ func (f *blocksFetcher) calculateHeadAndTargetEpochs() (headEpoch, targetEpoch p return headEpoch, targetEpoch, peers } +// custodyAllNeededColumns filter `inputPeers` that custody all columns in `columns`. +func (f *blocksFetcher) custodyAllNeededColumns(inputPeers map[peer.ID]bool, columns map[uint64]bool) (map[peer.ID]bool, error) { + outputPeers := make(map[peer.ID]bool, len(inputPeers)) + +loop: + for peer := range inputPeers { + // Get the node ID from the peer ID. + nodeID, err := p2p.ConvertPeerIDToNodeID(peer) + if err != nil { + return nil, errors.Wrap(err, "convert peer ID to node ID") + } + + // Get the custody columns count from the peer. + custodyCount := f.p2p.DataColumnsCustodyCountFromRemotePeer(peer) + + // Get the custody columns from the peer. + remoteCustodyColumns, err := peerdas.CustodyColumns(nodeID, custodyCount) + if err != nil { + return nil, errors.Wrap(err, "custody columns") + } + + for column := range columns { + if !remoteCustodyColumns[column] { + continue loop + } + } + + outputPeers[peer] = true + } + + return outputPeers, nil +} + // peersWithSlotAndDataColumns returns a list of peers that should custody all needed data columns for the given slot. func (f *blocksFetcher) peersWithSlotAndDataColumns( + ctx context.Context, peers []peer.ID, targetSlot primitives.Slot, dataColumns map[uint64]bool, -) (map[peer.ID]bool, []string, error) { + count uint64, +) ([]peer.ID, []string, error) { peersCount := len(peers) - // TODO: Uncomment when we are not in devnet any more. - // TODO: Find a way to have this uncommented without being in devnet. - // // Filter peers based on the percentage of peers to be used in a request. - // peers = f.filterPeers(ctx, peers, peersPercentagePerRequest) + // Filter peers based on the percentage of peers to be used in a request. + peers = f.filterPeers(ctx, peers, peersPercentagePerRequestDataColumns) // // Filter peers on bandwidth. - // peers = f.hasSufficientBandwidth(peers, blocksCount) + peers = f.hasSufficientBandwidth(peers, count) // Select peers which custody ALL wanted columns. // Basically, it is very unlikely that a non-supernode peer will have custody of all columns. @@ -443,5 +478,16 @@ func (f *blocksFetcher) peersWithSlotAndDataColumns( } } - return finalPeers, descriptions, nil + // Convert the map to a slice. + finalPeersSlice := make([]peer.ID, 0, len(finalPeers)) + for peer := range finalPeers { + finalPeersSlice = append(finalPeersSlice, peer) + } + + // Shuffle the peers. + f.rand.Shuffle(len(finalPeersSlice), func(i, j int) { + finalPeersSlice[i], finalPeersSlice[j] = finalPeersSlice[j], finalPeersSlice[i] + }) + + return finalPeersSlice, descriptions, nil } diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go index dccf46320f93..93c2055f3d81 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go @@ -643,3 +643,49 @@ func TestBlocksFetcher_currentHeadAndTargetEpochs(t *testing.T) { }) } } + +func TestCustodyAllNeededColumns(t *testing.T) { + const dataColumnsCount = 31 + + p2p := p2pt.NewTestP2P(t) + + dataColumns := make(map[uint64]bool, dataColumnsCount) + for i := range dataColumnsCount { + dataColumns[uint64(i)] = true + } + + custodyCounts := [...]uint64{ + 4 * params.BeaconConfig().CustodyRequirement, + 32 * params.BeaconConfig().CustodyRequirement, + 4 * params.BeaconConfig().CustodyRequirement, + 32 * params.BeaconConfig().CustodyRequirement, + } + + expected := make(map[peer.ID]bool) + + peersID := make(map[peer.ID]bool, len(custodyCounts)) + for _, custodyCount := range custodyCounts { + peerRecord, peerID := createPeer(t, len(peersID), custodyCount) + peersID[peerID] = true + p2p.Peers().Add(peerRecord, peerID, nil, network.DirOutbound) + if custodyCount == 32*params.BeaconConfig().CustodyRequirement { + expected[peerID] = true + } + } + + blocksFetcher := newBlocksFetcher( + context.Background(), + &blocksFetcherConfig{ + p2p: p2p, + }, + ) + + actual, err := blocksFetcher.custodyAllNeededColumns(peersID, dataColumns) + require.NoError(t, err) + + require.Equal(t, len(expected), len(actual)) + for peerID := range expected { + _, ok := actual[peerID] + require.Equal(t, true, ok) + } +} diff --git a/beacon-chain/sync/rpc_data_column_sidecars_by_root.go b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go index b2b919577e3c..030ea33b180f 100644 --- a/beacon-chain/sync/rpc_data_column_sidecars_by_root.go +++ b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go @@ -81,11 +81,12 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int requestedColumnsByRoot[root][columnIndex] = true } - requestedColumnsByRootLog := make(map[[fieldparams.RootLength]byte]interface{}) + requestedColumnsByRootLog := make(map[string]interface{}) for root, columns := range requestedColumnsByRoot { - requestedColumnsByRootLog[root] = "all" + rootStr := fmt.Sprintf("%#x", root) + requestedColumnsByRootLog[rootStr] = "all" if uint64(len(columns)) != numberOfColumns { - requestedColumnsByRootLog[root] = uint64MapToSortedSlice(columns) + requestedColumnsByRootLog[rootStr] = uint64MapToSortedSlice(columns) } } @@ -124,18 +125,9 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int log := log.WithFields(logrus.Fields{ "peer": remotePeer, "custody": custody, + "columns": requestedColumnsByRootLog, }) - i := 0 - for root, columns := range requestedColumnsByRootLog { - log = log.WithFields(logrus.Fields{ - fmt.Sprintf("root%d", i): fmt.Sprintf("%#x", root), - fmt.Sprintf("columns%d", i): columns, - }) - - i++ - } - log.Debug("Serving data column sidecar by root request") // Subscribe to the data column feed. diff --git a/beacon-chain/sync/validate_data_column.go b/beacon-chain/sync/validate_data_column.go index 4982ab351c44..340aea4b4587 100644 --- a/beacon-chain/sync/validate_data_column.go +++ b/beacon-chain/sync/validate_data_column.go @@ -157,12 +157,15 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs validationTime := s.cfg.clock.Now().Sub(receivedTime) peerGossipScore := s.cfg.p2p.Peers().Scorers().GossipScorer().Score(pid) + + pidString := pid.String() + log. WithFields(logging.DataColumnFields(ds)). WithFields(logrus.Fields{ "sinceSlotStartTime": sinceSlotStartTime, "validationTime": validationTime, - "peer": pid[len(pid)-6:], + "peer": pidString[len(pidString)-6:], "peerGossipScore": peerGossipScore, }). Debug("Accepted data column sidecar gossip") diff --git a/beacon-chain/sync/verify/blob.go b/beacon-chain/sync/verify/blob.go index 59edcb38017e..8fdd089205af 100644 --- a/beacon-chain/sync/verify/blob.go +++ b/beacon-chain/sync/verify/blob.go @@ -53,10 +53,12 @@ func BlobAlignsWithBlock(blob blocks.ROBlob, block blocks.ROBlock) error { } func ColumnAlignsWithBlock(col blocks.RODataColumn, block blocks.ROBlock, colVerifier verification.NewColumnVerifier) error { + // Exit early if the block is not at least a Deneb block. if block.Version() < version.Deneb { return nil } + // Check if the block root in the column sidecar matches the block root. if col.BlockRoot() != block.Root() { return ErrColumnBlockMisaligned } @@ -64,25 +66,27 @@ func ColumnAlignsWithBlock(col blocks.RODataColumn, block blocks.ROBlock, colVer // Verify commitment byte values match commitments, err := block.Block().Body().BlobKzgCommitments() if err != nil { - return err + return errors.Wrap(err, "blob KZG commitments") } if !reflect.DeepEqual(commitments, col.KzgCommitments) { return errors.Wrapf(ErrMismatchedColumnCommitments, "commitment %#v != block commitment %#v for block root %#x at slot %d ", col.KzgCommitments, commitments, block.Root(), col.Slot()) } + vf := colVerifier(col, verification.InitsyncColumnSidecarRequirements) if err := vf.DataColumnIndexInBounds(); err != nil { - return err + return errors.Wrap(err, "data column index out of bounds") } // Filter out columns which did not pass the KZG inclusion proof verification. if err := vf.SidecarInclusionProven(); err != nil { - return err + return errors.Wrap(err, "inclusion proof verification") } // Filter out columns which did not pass the KZG proof verification. if err := vf.SidecarKzgProofVerified(); err != nil { - return err + return errors.Wrap(err, "KZG proof verification") } + return nil }