From 9be69fbd078e7fefe96c123bf61024e732911651 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Wed, 16 Oct 2024 14:42:17 +0200 Subject: [PATCH] PeerDAS: Fix major bug in `dataColumnSidecarsByRangeRPCHandler` and allow syncing from full nodes. (#14532) * `validateDataColumnsByRange`: `current` ==> `currentSlot`. * `validateRequest`: Extract `remotePeer` variable. * `dataColumnSidecarsByRangeRPCHandler`: Small non functional refactor. * `streamDataColumnBatch`: Fix major bug. Before this commit, the node was unable to respond with a data column index higher than the count of stored data columns. For example, if there is 8 data columns stored for a given block, the node was able to respond for data columns indices 1, 3, and 5, but not for 10, 16 or 127. The issue was visible only for full nodes, since super nodes always store 128 data columns. * Initial sync: Fetch data columns from all peers. (Not only from supernodes.) * Nishant's comment: Fix `lastSlot` and `endSlot` duplication. * Address Nishant's comment. --- beacon-chain/sync/initial-sync/BUILD.bazel | 2 - .../sync/initial-sync/blocks_fetcher.go | 409 ++++++++++++------ .../sync/initial-sync/blocks_fetcher_test.go | 240 +++++----- .../sync/initial-sync/blocks_fetcher_utils.go | 272 +++++++++--- .../initial-sync/blocks_fetcher_utils_test.go | 229 ++++++++-- .../sync/rpc_data_column_sidecars_by_range.go | 73 +++- 6 files changed, 877 insertions(+), 348 deletions(-) diff --git a/beacon-chain/sync/initial-sync/BUILD.bazel b/beacon-chain/sync/initial-sync/BUILD.bazel index da6ec0c57ae3..aef3e6502286 100644 --- a/beacon-chain/sync/initial-sync/BUILD.bazel +++ b/beacon-chain/sync/initial-sync/BUILD.bazel @@ -98,7 +98,6 @@ go_test( "//consensus-types/primitives:go_default_library", "//container/leaky-bucket:go_default_library", "//container/slice:go_default_library", - "//crypto/ecdsa:go_default_library", "//crypto/hash:go_default_library", "//encoding/bytesutil:go_default_library", "//network/forks:go_default_library", @@ -111,7 +110,6 @@ go_test( "//time/slots:go_default_library", "@com_github_consensys_gnark_crypto//ecc/bls12-381/fr:go_default_library", "@com_github_crate_crypto_go_kzg_4844//:go_default_library", - "@com_github_ethereum_go_ethereum//p2p/enode:go_default_library", "@com_github_ethereum_go_ethereum//p2p/enr:go_default_library", "@com_github_libp2p_go_libp2p//:go_default_library", "@com_github_libp2p_go_libp2p//core:go_default_library", diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 16d662451577..deb9998ec88f 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -3,6 +3,7 @@ package initialsync import ( "context" "fmt" + "slices" "sort" "strings" "sync" @@ -41,8 +42,6 @@ const ( maxPendingRequests = 64 // peersPercentagePerRequest caps percentage of peers to be used in a request. peersPercentagePerRequest = 0.75 - // peersPercentagePerRequestDataColumns caps percentage of peers to be used in a data columns request. - peersPercentagePerRequestDataColumns = 1. // handshakePollingInterval is a polling interval for checking the number of received handshakes. handshakePollingInterval = 5 * time.Second // peerLocksPollingInterval is a polling interval for checking if there are stale peer locks. @@ -293,6 +292,11 @@ func (f *blocksFetcher) scheduleRequest(ctx context.Context, start primitives.Sl // handleRequest parses fetch request and forwards it to response builder. func (f *blocksFetcher) handleRequest(ctx context.Context, start primitives.Slot, count uint64) *fetchRequestResponse { + const ( + delay = 5 * time.Second + batchSize = 32 + ) + ctx, span := trace.StartSpan(ctx, "initialsync.handleRequest") defer span.End() @@ -334,7 +338,7 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start primitives.Slot } if coreTime.PeerDASIsActive(start) { - response.err = f.fetchDataColumnsFromPeers(ctx, response.bwb, nil) + response.err = f.fetchDataColumnsFromPeers(ctx, response.bwb, nil, delay, batchSize) return response } @@ -621,10 +625,13 @@ type bwbSlice struct { // buildBwbSlices builds slices of `bwb` that aims to optimize the count of // by range requests needed to fetch missing data columns. -func buildBwbSlices( - bwbs []blocks.BlockWithROBlobs, - missingColumnsByRoot map[[fieldparams.RootLength]byte]map[uint64]bool, -) ([]bwbSlice, error) { +func buildBwbSlices(wrappedBwbsMissingColumns *bwbsMissingColumns) ([]bwbSlice, error) { + wrappedBwbsMissingColumns.mu.Lock() + defer wrappedBwbsMissingColumns.mu.Unlock() + + bwbs := wrappedBwbsMissingColumns.bwbs + missingColumnsByRoot := wrappedBwbsMissingColumns.missingColumnsByRoot + // Return early if there are no blocks to process. if len(bwbs) == 0 { return []bwbSlice{}, nil @@ -634,10 +641,12 @@ func buildBwbSlices( firstROBlock := bwbs[0].Block firstBlockRoot := firstROBlock.Root() - previousMissingDataColumns := map[uint64]bool{} + previousMissingDataColumns := make(map[uint64]bool, len(missingColumnsByRoot[firstBlockRoot])) if missing, ok := missingColumnsByRoot[firstBlockRoot]; ok { - previousMissingDataColumns = missing + for key, value := range missing { + previousMissingDataColumns[key] = value + } } previousBlockSlot := firstROBlock.Block().Slot() @@ -680,7 +689,10 @@ func buildBwbSlices( currentBlockRoot := currentROBlock.Root() // Get the missing data columns for the current block. - missingDataColumns := missingColumnsByRoot[currentBlockRoot] + missingDataColumns := make(map[uint64]bool, len(missingColumnsByRoot[currentBlockRoot])) + for key, value := range missingColumnsByRoot[currentBlockRoot] { + missingDataColumns[key] = value + } // Compute if the missing data columns differ. missingDataColumnsDiffer := uint64MapDiffer(previousMissingDataColumns, missingDataColumns) @@ -824,6 +836,103 @@ func blockFromRoot(bwb []blocks.BlockWithROBlobs) map[[fieldparams.RootLength]by return result } +// computeMissingDataColumnsCount returns the count of missing columns. +func computeMissingDataColumnsCount(missingColumnsByRoot map[[fieldparams.RootLength]byte]map[uint64]bool) int { + count := 0 + for _, columns := range missingColumnsByRoot { + count += len(columns) + } + return count +} + +func (f *blocksFetcher) fetchBwbSliceFromPeers( + ctx context.Context, + identifier int, + wrappedBwbsMissingColumns *bwbsMissingColumns, + peers []peer.ID, + batchSize uint64, + bwbSlice bwbSlice) error { + // Filter out slices that are already complete. + if len(bwbSlice.dataColumns) == 0 { + return nil + } + + // Compute the start and end slot of the request. + startSlot, endSlot := func() (primitives.Slot, primitives.Slot) { + mu := &wrappedBwbsMissingColumns.mu + + mu.RLock() + defer mu.RUnlock() + + bwbs := wrappedBwbsMissingColumns.bwbs + + startSlot := bwbs[bwbSlice.start].Block.Block().Slot() + endSlot := bwbs[bwbSlice.end].Block.Block().Slot() + + return startSlot, endSlot + }() + + // Compute the block count of the request. + blockCount := uint64(endSlot - startSlot + 1) + + // Get all admissible peers with the data columns they custody. + dataColumnsByAdmissiblePeer, err := f.waitForPeersForDataColumns(identifier, peers, endSlot, bwbSlice.dataColumns, blockCount) + if err != nil { + return errors.Wrap(err, "wait for peers for data columns") + } + + // Select the peers that will be requested. + dataColumnsToFetchByPeer, err := selectPeersToFetchDataColumnsFrom(bwbSlice.dataColumns, dataColumnsByAdmissiblePeer) + if err != nil { + // This should never happen. + return errors.Wrap(err, "select peers to fetch data columns from") + } + + var wg sync.WaitGroup + + for peer, dataColumnsToFetch := range dataColumnsToFetchByPeer { + // Extract peer custody columns. + peerCustodyColumns := dataColumnsByAdmissiblePeer[peer] + + indicesByRoot, blocksByRoot := func() (map[[fieldparams.RootLength]byte][]int, map[[fieldparams.RootLength]byte]blocks.ROBlock) { + mu := &wrappedBwbsMissingColumns.mu + + mu.RLock() + defer mu.RUnlock() + + // Get `bwbs` indices indexed by root. + // Get blocks indexed by root. + + bwbs := wrappedBwbsMissingColumns.bwbs + return indicesFromRoot(bwbs), blockFromRoot(bwbs) + }() + + // Sort data columns. + slices.Sort[[]uint64](dataColumnsToFetch) + + // Build the requests. + requests := buildDataColumnSidecarsByRangeRequests(startSlot, blockCount, dataColumnsToFetch, batchSize) + + for _, request := range requests { + // Fetch the missing data columns from the peers. + wg.Add(1) + go f.fetchDataColumnFromPeer(ctx, &wg, identifier, wrappedBwbsMissingColumns, blocksByRoot, indicesByRoot, peer, peerCustodyColumns, request) + } + } + + // Wait for all requests to finish. + wg.Wait() + + return nil +} + +type bwbsMissingColumns struct { + mu sync.RWMutex + + bwbs []blocks.BlockWithROBlobs + missingColumnsByRoot map[[fieldparams.RootLength]byte]map[uint64]bool +} + // fetchDataColumnsFromPeers looks at the blocks in `bwb` and retrieves all // data columns for with the block has blob commitments, and for which our store is missing data columns // we should custody. @@ -833,16 +942,17 @@ func (f *blocksFetcher) fetchDataColumnsFromPeers( ctx context.Context, bwbs []blocks.BlockWithROBlobs, peers []peer.ID, + delay time.Duration, + batchSize uint64, ) error { // Time to wait if no peers are available. const ( - delay = 5 * time.Second // Time to wait before retrying to fetch data columns. - maxIdentifier = 1_000 // Max identifier for the request. + maxIdentifier = 1_000 // Max identifier for the request. + maxAllowedStall = 5 // Number of trials before giving up. ) // Generate random identifier. identifier := f.rand.Intn(maxIdentifier) - log := log.WithField("reqIdentifier", identifier) // Compute the columns we should custody. localCustodyColumns, err := f.custodyColumns() @@ -870,85 +980,70 @@ func (f *blocksFetcher) fetchDataColumnsFromPeers( return nil } + // Compute the number of missing data columns. + previousMissingDataColumnsCount := computeMissingDataColumnsCount(missingColumnsByRoot) + + // Count the number of retries for the same amount of missing data columns. + stallCount := 0 + + // Add log fields. + log := log.WithFields(logrus.Fields{ + "identifier": identifier, + "initialMissingColumnsCount": previousMissingDataColumnsCount, + }) + // Log the start of the process. start := time.Now() log.Debug("Fetch data columns from peers - start") + wrappedBwbsMissingColumns := &bwbsMissingColumns{ + bwbs: bwbs, + missingColumnsByRoot: missingColumnsByRoot, + } + for len(missingColumnsByRoot) > 0 { // Compute the optimal slices of `bwb` to minimize the number of by range returned columns. - bwbsSlices, err := buildBwbSlices(bwbs, missingColumnsByRoot) + bwbSlices, err := buildBwbSlices(wrappedBwbsMissingColumns) if err != nil { return errors.Wrap(err, "build bwb slices") } - outerLoop: - for _, bwbsSlice := range bwbsSlices { - lastSlot := bwbs[bwbsSlice.end].Block.Block().Slot() - dataColumnsSlice := sortedSliceFromMap(bwbsSlice.dataColumns) - dataColumnCount := uint64(len(dataColumnsSlice)) - - // Filter out slices that are already complete. - if dataColumnCount == 0 { - continue + for _, bwbSlice := range bwbSlices { + if err := f.fetchBwbSliceFromPeers(ctx, identifier, wrappedBwbsMissingColumns, peers, batchSize, bwbSlice); err != nil { + return errors.Wrap(err, "fetch BWB slice from peers") } + } - // If no peer is specified, get all connected peers. - peersToFilter := peers - if peersToFilter == nil { - peersToFilter = f.p2p.Peers().Connected() - } - - // Compute the block count of the request. - startSlot := bwbs[bwbsSlice.start].Block.Block().Slot() - endSlot := bwbs[bwbsSlice.end].Block.Block().Slot() - blockCount := uint64(endSlot - startSlot + 1) - - filteredPeers, err := f.waitForPeersForDataColumns(ctx, peersToFilter, lastSlot, bwbsSlice.dataColumns, blockCount) - if err != nil { - return errors.Wrap(err, "wait for peers for data columns") - } - - // Build the request. - request := &p2ppb.DataColumnSidecarsByRangeRequest{ - StartSlot: startSlot, - Count: blockCount, - Columns: dataColumnsSlice, - } - - // Get `bwbs` indices indexed by root. - indicesByRoot := indicesFromRoot(bwbs) - - // Get blocks indexed by root. - blocksByRoot := blockFromRoot(bwbs) + missingDataColumnsCount := computeMissingDataColumnsCount(missingColumnsByRoot) + if missingDataColumnsCount == previousMissingDataColumnsCount { + stallCount++ + } else { + stallCount = 0 + } - // Prepare nice log fields. - var columnsLog interface{} = "all" - numberOfColuns := params.BeaconConfig().NumberOfColumns - if dataColumnCount < numberOfColuns { - columnsLog = dataColumnsSlice - } + previousMissingDataColumnsCount = missingDataColumnsCount + if missingDataColumnsCount > 0 { log := log.WithFields(logrus.Fields{ - "start": request.StartSlot, - "count": request.Count, - "columns": columnsLog, + "remainingMissingColumnsCount": missingDataColumnsCount, + "stallCount": stallCount, + "maxAllowedStall": maxAllowedStall, }) - // Retrieve the missing data columns from the peers. - for _, peer := range filteredPeers { - success := f.fetchDataColumnFromPeer(ctx, bwbs, missingColumnsByRoot, blocksByRoot, indicesByRoot, peer, request) - - // If we have successfully retrieved some data columns, continue to the next slice. - if success { - continue outerLoop - } + if stallCount >= maxAllowedStall { + // It is very likely `bwbs` contains orphaned blocks, for which no peer has the data columns. + // We give up and let the state machine handle the situation. + const message = "Fetch data columns from peers - no progress, giving up" + log.Warning(message) + return errors.New(message) } - log.WithField("peers", filteredPeers).Warning("Fetch data columns from peers - no peers among this list returned any valid data columns") - } + time.Sleep(delay) - if len(missingColumnsByRoot) > 0 { - log.Debug("Fetch data columns from peers - continue") + log.WithFields(logrus.Fields{ + "remainingMissingColumnsCount": missingDataColumnsCount, + "stallCount": stallCount, + }).Debug("Fetch data columns from peers - continue") } } @@ -972,68 +1067,99 @@ func sortBwbsByColumnIndex(bwbs []blocks.BlockWithROBlobs) { // - synced up to `lastSlot`, // - custody all columns in `dataColumns`, and // - have bandwidth to serve `blockCount` blocks. -// It waits until at least one peer is available. +// It waits until at least one peer is available for all needed columns. +// It returns a map, where the key of the map is the peer, the value is the custody columns of the peer. func (f *blocksFetcher) waitForPeersForDataColumns( - ctx context.Context, + reqIdentifier int, peers []peer.ID, lastSlot primitives.Slot, - dataColumns map[uint64]bool, + neededDataColumns map[uint64]bool, blockCount uint64, -) ([]peer.ID, error) { +) (map[peer.ID]map[uint64]bool, error) { // Time to wait before retrying to find new peers. const delay = 5 * time.Second - // Filter peers that custody all columns we need and that are synced to the epoch. - filteredPeers, descriptions, err := f.peersWithSlotAndDataColumns(ctx, peers, lastSlot, dataColumns, blockCount) + var computeDataColumnsWithoutPeers = func(neededColumns map[uint64]bool, peersByColumn map[uint64][]peer.ID) map[uint64]bool { + result := make(map[uint64]bool) + for column := range neededColumns { + if _, ok := peersByColumn[column]; !ok { + result[column] = true + } + } + + return result + } + + // Get the peers that are admissible for the data columns. + dataColumnsByAdmissiblePeer, admissiblePeersByDataColumn, descriptions, err := f.admissiblePeersForDataColumn(peers, lastSlot, neededDataColumns, blockCount) if err != nil { return nil, errors.Wrap(err, "peers with slot and data columns") } - // Compute data columns count - dataColumnCount := uint64(len(dataColumns)) + dataColumnsWithoutPeers := computeDataColumnsWithoutPeers(neededDataColumns, admissiblePeersByDataColumn) - // Sort columns. - columnsSlice := sortedSliceFromMap(dataColumns) + // Wait if no suitable peers are available. + for len(dataColumnsWithoutPeers) > 0 { + // Sort columns. + neededDataColumnsSlice := sortedSliceFromMap(neededDataColumns) - // Build a nice log field. - var columnsLog interface{} = "all" - numberOfColuns := params.BeaconConfig().NumberOfColumns - if dataColumnCount < numberOfColuns { - columnsLog = columnsSlice - } + // Build a nice log fields. + numberOfColumns := params.BeaconConfig().NumberOfColumns + + var neededDataColumnsLog interface{} = "all" + neededDataColumnCount := uint64(len(neededDataColumns)) + if neededDataColumnCount < numberOfColumns { + neededDataColumnsLog = neededDataColumnsSlice + } + + var dataColumnsWithoutPeersLog interface{} = "all" + dataColumnsWithoutPeersCount := uint64(len(dataColumnsWithoutPeers)) + if dataColumnsWithoutPeersCount < numberOfColumns { + dataColumnsWithoutPeersLog = uint64MapToSortedSlice(dataColumnsWithoutPeers) + } - // Wait if no suitable peers are available. - for len(filteredPeers) == 0 { log. WithFields(logrus.Fields{ - "peers": peers, - "waitDuration": delay, - "targetSlot": lastSlot, - "columns": columnsLog, + "waitDuration": delay, + "targetSlot": lastSlot, + "neededDataColumns": neededDataColumnsLog, + "identifier": reqIdentifier, + "columnsWithoutPeers": dataColumnsWithoutPeersLog, }). - Warning("Fetch data columns from peers - no peers available to retrieve missing data columns, retrying later") + Warning("Fetch data columns from peers - no peers available to retrieve some missing data columns, retrying later") for _, description := range descriptions { log.Debug(description) } + for pid, peerDataColumns := range dataColumnsByAdmissiblePeer { + var peerDataColumnsLog interface{} = "all" + peerDataColumnsCount := uint64(len(peerDataColumns)) + if peerDataColumnsCount < numberOfColumns { + peerDataColumnsLog = uint64MapToSortedSlice(peerDataColumns) + } + + log.Debugf("peer %s: custody columns: %v", pid, peerDataColumnsLog) + } + time.Sleep(delay) - filteredPeers, descriptions, err = f.peersWithSlotAndDataColumns(ctx, peers, lastSlot, dataColumns, blockCount) + dataColumnsByAdmissiblePeer, admissiblePeersByDataColumn, descriptions, err = f.admissiblePeersForDataColumn(peers, lastSlot, neededDataColumns, blockCount) if err != nil { return nil, errors.Wrap(err, "peers with slot and data columns") } + + dataColumnsWithoutPeers = computeDataColumnsWithoutPeers(neededDataColumns, admissiblePeersByDataColumn) } - return filteredPeers, nil + return dataColumnsByAdmissiblePeer, nil } // processDataColumn mutates `bwbs` argument by adding the data column, // and mutates `missingColumnsByRoot` by removing the data column if the // data column passes all the check. func processDataColumn( - bwbs []blocks.BlockWithROBlobs, - missingColumnsByRoot map[[fieldparams.RootLength]byte]map[uint64]bool, + wrappedBwbsMissingColumns *bwbsMissingColumns, columnVerifier verification.NewColumnVerifier, blocksByRoot map[[fieldparams.RootLength]byte]blocks.ROBlock, indicesByRoot map[[fieldparams.RootLength]byte][]int, @@ -1071,15 +1197,25 @@ func processDataColumn( } // Populate the corresponding items in `bwbs`. - for _, index := range indices { - bwbs[index].Columns = append(bwbs[index].Columns, dataColumn) - } + func() { + mu := &wrappedBwbsMissingColumns.mu - // Remove the column from the missing columns. - delete(missingColumnsByRoot[blockRoot], dataColumn.ColumnIndex) - if len(missingColumnsByRoot[blockRoot]) == 0 { - delete(missingColumnsByRoot, blockRoot) - } + mu.Lock() + defer mu.Unlock() + + bwbs := wrappedBwbsMissingColumns.bwbs + missingColumnsByRoot := wrappedBwbsMissingColumns.missingColumnsByRoot + + for _, index := range indices { + bwbs[index].Columns = append(bwbs[index].Columns, dataColumn) + } + + // Remove the column from the missing columns. + delete(missingColumnsByRoot[blockRoot], dataColumn.ColumnIndex) + if len(missingColumnsByRoot[blockRoot]) == 0 { + delete(missingColumnsByRoot, blockRoot) + } + }() return true } @@ -1089,15 +1225,41 @@ func processDataColumn( // - `missingColumnsByRoot` by removing the fetched data columns. func (f *blocksFetcher) fetchDataColumnFromPeer( ctx context.Context, - bwbs []blocks.BlockWithROBlobs, - missingColumnsByRoot map[[fieldparams.RootLength]byte]map[uint64]bool, + wg *sync.WaitGroup, + identifier int, + wrappedBwbsMissingColumns *bwbsMissingColumns, blocksByRoot map[[fieldparams.RootLength]byte]blocks.ROBlock, indicesByRoot map[[fieldparams.RootLength]byte][]int, peer peer.ID, + peerCustodyColumns map[uint64]bool, request *p2ppb.DataColumnSidecarsByRangeRequest, -) bool { +) { + defer wg.Done() + + // Extract the number of columns. + numberOfColumns := params.BeaconConfig().NumberOfColumns + + requestedColumnsCount := uint64(len(request.Columns)) + var requestedColumnsLog interface{} = "all" + if requestedColumnsCount < numberOfColumns { + requestedColumnsLog = request.Columns + } + + peerCustodyColumnsCount := uint64(len(peerCustodyColumns)) + var peerCustodyColumnsLog interface{} = "all" + if peerCustodyColumnsCount < numberOfColumns { + peerCustodyColumnsLog = uint64MapToSortedSlice(peerCustodyColumns) + } + // Define useful log field. - log := log.WithField("peer", peer) + log := log.WithFields(logrus.Fields{ + "peer": peer, + "identifier": identifier, + "start": request.StartSlot, + "count": request.Count, + "requestedColumns": requestedColumnsLog, + "custodyColumns": peerCustodyColumnsLog, + }) // Wait for peer bandwidth if needed. if err := func() error { @@ -1122,45 +1284,36 @@ func (f *blocksFetcher) fetchDataColumnFromPeer( return nil }(); err != nil { log.WithError(err).Warning("Fetch data columns from peers - could not wait for bandwidth") - return false + return } // Send the request to the peer. - requestStart := time.Now() roDataColumns, err := prysmsync.SendDataColumnsByRangeRequest(ctx, f.clock, f.p2p, peer, f.ctxMap, request) if err != nil { log.WithError(err).Warning("Fetch data columns from peers - could not send data columns by range request") - return false + return } - requestDuration := time.Since(requestStart) - if len(roDataColumns) == 0 { - log.Debug("Fetch data columns from peers - peer did not return any data columns") - return false + log.Debug("Fetch data columns from peers - no data column returned") + return } globalSuccess := false for _, dataColumn := range roDataColumns { - success := processDataColumn(bwbs, missingColumnsByRoot, f.cv, blocksByRoot, indicesByRoot, dataColumn) + success := processDataColumn(wrappedBwbsMissingColumns, f.cv, blocksByRoot, indicesByRoot, dataColumn) if success { globalSuccess = true } } if !globalSuccess { - log.Debug("Fetch data columns from peers - peer did not return any valid data columns") - return false + log.Debug("Fetch data columns from peers - no valid data column returned") + return } - totalDuration := time.Since(requestStart) - log.WithFields(logrus.Fields{ - "reqDuration": requestDuration, - "totalDuration": totalDuration, - }).Debug("Fetch data columns from peers - got some columns") - - return true + log.Debug("Fetch data columns from peers - got some columns") } // requestBlocks is a wrapper for handling BeaconBlocksByRangeRequest requests/streams. diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go index ddc15e4bae00..33f437f5f8ef 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go @@ -15,7 +15,6 @@ import ( "github.com/consensys/gnark-crypto/ecc/bls12-381/fr" GoKZG "github.com/crate-crypto/go-kzg-4844" - "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/libp2p/go-libp2p" libp2pcore "github.com/libp2p/go-libp2p/core" @@ -41,7 +40,6 @@ import ( "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" leakybucket "github.com/prysmaticlabs/prysm/v5/container/leaky-bucket" "github.com/prysmaticlabs/prysm/v5/container/slice" - ecdsaprysm "github.com/prysmaticlabs/prysm/v5/crypto/ecdsa" "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v5/network/forks" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" @@ -1328,28 +1326,6 @@ type blockParams struct { hasBlobs bool } -func createPeer(t *testing.T, privateKeyOffset int, custodyCount uint64) (*enr.Record, peer.ID) { - privateKeyBytes := make([]byte, 32) - for i := 0; i < 32; i++ { - privateKeyBytes[i] = byte(privateKeyOffset + i) - } - - unmarshalledPrivateKey, err := crypto.UnmarshalSecp256k1PrivateKey(privateKeyBytes) - require.NoError(t, err) - - privateKey, err := ecdsaprysm.ConvertFromInterfacePrivKey(unmarshalledPrivateKey) - require.NoError(t, err) - - peerID, err := peer.IDFromPrivateKey(unmarshalledPrivateKey) - require.NoError(t, err) - - record := &enr.Record{} - record.Set(peerdas.Csc(custodyCount)) - record.Set(enode.Secp256k1(privateKey.PublicKey)) - - return record, peerID -} - func TestCustodyColumns(t *testing.T) { blocksFetcher := newBlocksFetcher(context.Background(), &blocksFetcherConfig{ p2p: p2ptest.NewTestP2P(t), @@ -1446,7 +1422,7 @@ func createAndConnectPeer( // Get the response to send. items, ok := peerParams.toRespond[reqString] - require.Equal(t, true, ok) + require.Equal(t, true, ok, "no response to send for request %s", reqString) for _, responseParams := range items[countFromRequest[reqString]] { // Get data columns sidecars for this slot. @@ -1707,7 +1683,13 @@ func TestBuildBwbSlices(t *testing.T) { blockRoot := bwb.Block.Root() missingColumnsByRoot[blockRoot] = missingColumns } - bwbSlices, err := buildBwbSlices(bwbs, missingColumnsByRoot) + + wrappedBwbsMissingColumns := &bwbsMissingColumns{ + bwbs: bwbs, + missingColumnsByRoot: missingColumnsByRoot, + } + + bwbSlices, err := buildBwbSlices(wrappedBwbsMissingColumns) require.NoError(t, err) require.Equal(t, true, areBwbSlicesEqual(tt.bwbSlices, bwbSlices)) }) @@ -1718,6 +1700,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { const ( blobsCount = 6 peersHeadSlot = 100 + delay = 0 * time.Second ) testCases := []struct { @@ -1746,11 +1729,15 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { // For the exact same data columns by range request, the peer will respond in the order they are specified. peersParams []peerParams + // The max count of data columns that will be requested in each batch. + batchSize uint64 + // OUTPUTS // ------- // Data columns that should be added to `bwb`. addedRODataColumns [][]int + isError bool }{ { name: "Deneb fork epoch not reached", @@ -1760,7 +1747,9 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { {slot: 2, hasBlobs: true}, // Before deneb fork epoch {slot: 3, hasBlobs: true}, // Before deneb fork epoch }, + batchSize: 32, addedRODataColumns: [][]int{nil, nil, nil}, + isError: false, }, { name: "All blocks are before EIP-7954 fork epoch", @@ -1773,7 +1762,9 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { {slot: 27, hasBlobs: false}, // Before EIP-7954 fork epoch {slot: 28, hasBlobs: false}, // Before EIP-7954 fork epoch }, + batchSize: 32, addedRODataColumns: [][]int{nil, nil, nil, nil}, + isError: false, }, { name: "All blocks with commitments before are EIP-7954 fork epoch", @@ -1787,6 +1778,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { {slot: 32, hasBlobs: false}, {slot: 33, hasBlobs: false}, }, + batchSize: 32, addedRODataColumns: [][]int{nil, nil, nil, nil, nil}, }, { @@ -1808,7 +1800,9 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { nil, {6: true, 38: true, 70: true, 102: true}, }, + batchSize: 32, addedRODataColumns: [][]int{nil, nil, nil, nil, nil}, + isError: false, }, { name: "Some blocks with blobs with missing data columns - one round needed", @@ -1841,21 +1835,8 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { }, peersParams: []peerParams{ { - // This peer custodies all the columns we need but - // will never respond any column. - csc: 128, - toRespond: map[string][][]responseParams{ - (ðpb.DataColumnSidecarsByRangeRequest{ - StartSlot: 33, - Count: 4, - Columns: []uint64{70, 102}, - }).String(): {{}}, - (ðpb.DataColumnSidecarsByRangeRequest{ - StartSlot: 37, - Count: 1, - Columns: []uint64{6, 70}, - }).String(): {{}}, - }, + csc: 0, + toRespond: map[string][][]responseParams{}, }, { csc: 128, @@ -1887,28 +1868,36 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { }, }, { - // This peer custodies all the columns we need but - // will never respond any column. csc: 128, toRespond: map[string][][]responseParams{ (ðpb.DataColumnSidecarsByRangeRequest{ StartSlot: 33, Count: 4, Columns: []uint64{70, 102}, - }).String(): {{}}, + }).String(): { + { + {slot: 33, columnIndex: 70}, + {slot: 33, columnIndex: 102}, + {slot: 34, columnIndex: 70}, + {slot: 34, columnIndex: 102}, + {slot: 36, columnIndex: 70}, + {slot: 36, columnIndex: 102}, + }, + }, (ðpb.DataColumnSidecarsByRangeRequest{ StartSlot: 37, Count: 1, Columns: []uint64{6, 70}, - }).String(): {{}}, + }).String(): { + { + {slot: 37, columnIndex: 6}, + {slot: 37, columnIndex: 70}, + }, + }, }, }, - { - // This peer should not be requested. - csc: 2, - toRespond: map[string][][]responseParams{}, - }, }, + batchSize: 32, addedRODataColumns: [][]int{ nil, // Slot 25 nil, // Slot 27 @@ -1921,6 +1910,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { nil, // Slot 38 nil, // Slot 39 }, + isError: false, }, { name: "Some blocks with blobs with missing data columns - partial responses", @@ -1954,31 +1944,10 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { {slot: 36, columnIndex: 70}, }, }, - (ðpb.DataColumnSidecarsByRangeRequest{ - StartSlot: 33, - Count: 4, - Columns: []uint64{70}, - }).String(): { - { - {slot: 33, columnIndex: 70}, - {slot: 34, columnIndex: 70}, - {slot: 36, columnIndex: 70}, - }, - }, (ðpb.DataColumnSidecarsByRangeRequest{ StartSlot: 33, Count: 4, Columns: []uint64{102}, - }).String(): {{}}, - }, - }, - { - csc: 128, - toRespond: map[string][][]responseParams{ - (ðpb.DataColumnSidecarsByRangeRequest{ - StartSlot: 33, - Count: 4, - Columns: []uint64{70, 102}, }).String(): { { {slot: 33, columnIndex: 102}, @@ -1986,25 +1955,10 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { {slot: 36, columnIndex: 102}, }, }, - (ðpb.DataColumnSidecarsByRangeRequest{ - StartSlot: 33, - Count: 4, - Columns: []uint64{102}, - }).String(): { - { - {slot: 33, columnIndex: 102}, - {slot: 34, columnIndex: 102}, - {slot: 36, columnIndex: 102}, - }, - }, - (ðpb.DataColumnSidecarsByRangeRequest{ - StartSlot: 33, - Count: 4, - Columns: []uint64{70}, - }).String(): {{}}, }, }, }, + batchSize: 32, addedRODataColumns: [][]int{ {70, 102}, // Slot 33 {70, 102}, // Slot 34 @@ -2020,9 +1974,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { blocksParams: []blockParams{ {slot: 38, hasBlobs: true}, }, - storedDataColumns: []map[int]bool{ - {38: true, 102: true}, - }, + storedDataColumns: []map[int]bool{{38: true, 102: true}}, peersParams: []peerParams{ { csc: 128, @@ -2049,42 +2001,115 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { }, }, }, - addedRODataColumns: [][]int{ - {6, 70}, + batchSize: 32, + addedRODataColumns: [][]int{{6, 70}}, + isError: false, + }, + { + name: "Some blocks with blobs with missing data columns - first response is empty", + denebForkEpoch: 0, + eip7954ForkEpoch: 1, + currentSlot: 40, + blocksParams: []blockParams{{slot: 38, hasBlobs: true}}, + storedDataColumns: []map[int]bool{{38: true, 102: true}}, + peersParams: []peerParams{ + { + csc: 128, + toRespond: map[string][][]responseParams{ + (ðpb.DataColumnSidecarsByRangeRequest{ + StartSlot: 38, + Count: 1, + Columns: []uint64{6, 70}, + }).String(): { + {}, + { + {slot: 38, columnIndex: 6}, + {slot: 38, columnIndex: 70}, + }, + }, + }, + }, }, + batchSize: 32, + addedRODataColumns: [][]int{{6, 70}}, + isError: false, }, { - name: "Some blocks with blobs with missing data columns - first response is empty", + name: "Some blocks with blobs with missing data columns - no response at all", + denebForkEpoch: 0, + eip7954ForkEpoch: 1, + currentSlot: 40, + blocksParams: []blockParams{{slot: 38, hasBlobs: true}}, + storedDataColumns: []map[int]bool{{38: true, 102: true}}, + peersParams: []peerParams{ + { + csc: 128, + toRespond: map[string][][]responseParams{ + (ðpb.DataColumnSidecarsByRangeRequest{ + StartSlot: 38, + Count: 1, + Columns: []uint64{6, 70}, + }).String(): {{}, {}, {}, {}, {}, {}, {}, {}, {}, {}}, + }, + }, + }, + batchSize: 32, + addedRODataColumns: [][]int{{}}, + isError: true, + }, + { + name: "Some blocks with blobs with missing data columns - request has to be split", denebForkEpoch: 0, eip7954ForkEpoch: 1, currentSlot: 40, blocksParams: []blockParams{ - {slot: 38, hasBlobs: true}, + {slot: 32, hasBlobs: true}, {slot: 33, hasBlobs: true}, {slot: 34, hasBlobs: true}, {slot: 35, hasBlobs: true}, // 4 + {slot: 36, hasBlobs: true}, {slot: 37, hasBlobs: true}, // 6 }, storedDataColumns: []map[int]bool{ - {38: true, 102: true}, + nil, nil, nil, nil, // 4 + nil, nil, // 6 + }, peersParams: []peerParams{ { csc: 128, toRespond: map[string][][]responseParams{ (ðpb.DataColumnSidecarsByRangeRequest{ - StartSlot: 38, - Count: 1, - Columns: []uint64{6, 70}, + StartSlot: 32, + Count: 4, + Columns: []uint64{6, 38, 70, 102}, }).String(): { - {}, { - {slot: 38, columnIndex: 6}, - {slot: 38, columnIndex: 70}, + {slot: 32, columnIndex: 6}, {slot: 32, columnIndex: 38}, {slot: 32, columnIndex: 70}, {slot: 32, columnIndex: 102}, + {slot: 33, columnIndex: 6}, {slot: 33, columnIndex: 38}, {slot: 33, columnIndex: 70}, {slot: 33, columnIndex: 102}, + {slot: 34, columnIndex: 6}, {slot: 34, columnIndex: 38}, {slot: 34, columnIndex: 70}, {slot: 34, columnIndex: 102}, + {slot: 35, columnIndex: 6}, {slot: 35, columnIndex: 38}, {slot: 35, columnIndex: 70}, {slot: 35, columnIndex: 102}, + }, + }, + (ðpb.DataColumnSidecarsByRangeRequest{ + StartSlot: 36, + Count: 2, + Columns: []uint64{6, 38, 70, 102}, + }).String(): { + { + {slot: 36, columnIndex: 6}, {slot: 36, columnIndex: 38}, {slot: 36, columnIndex: 70}, {slot: 36, columnIndex: 102}, + {slot: 37, columnIndex: 6}, {slot: 37, columnIndex: 38}, {slot: 37, columnIndex: 70}, {slot: 37, columnIndex: 102}, }, }, }, }, }, + batchSize: 4, addedRODataColumns: [][]int{ - {6, 70}, - }, + {6, 38, 70, 102}, // Slot 32 + {6, 38, 70, 102}, // Slot 33 + {6, 38, 70, 102}, // Slot 34 + {6, 38, 70, 102}, // Slot 35 + {6, 38, 70, 102}, // Slot 36 + {6, 38, 70, 102}, // Slot 37 + }, + isError: false, }, } @@ -2222,8 +2247,13 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { }) // Fetch the data columns from the peers. - err = blocksFetcher.fetchDataColumnsFromPeers(ctx, bwb, peersID) - require.NoError(t, err) + err = blocksFetcher.fetchDataColumnsFromPeers(ctx, bwb, peersID, delay, tc.batchSize) + if !tc.isError { + require.NoError(t, err) + } else { + require.NotNil(t, err) + return + } // Check the added RO data columns. for i := range bwb { diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go index 38a48a642194..58e5cc432d59 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go @@ -3,6 +3,8 @@ package initialsync import ( "context" "fmt" + "slices" + "time" "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" @@ -211,6 +213,11 @@ func findForkReqRangeSize() uint64 { // findForkWithPeer loads some blocks from a peer in an attempt to find alternative blocks. func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot primitives.Slot) (*forkData, error) { + const ( + delay = 5 * time.Second + batchSize = 32 + ) + reqCount := findForkReqRangeSize() // Safe-guard, since previous epoch is used when calculating. if uint64(slot) < reqCount { @@ -282,7 +289,7 @@ func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot return nil, errors.Wrap(err, "invalid blocks received in findForkWithPeer") } if coreTime.PeerDASIsActive(block.Block().Slot()) { - if err := f.fetchDataColumnsFromPeers(ctx, bwb, []peer.ID{pid}); err != nil { + if err := f.fetchDataColumnsFromPeers(ctx, bwb, []peer.ID{pid}, delay, batchSize); err != nil { return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findForkWithPeer") } } else { @@ -302,6 +309,10 @@ func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot // findAncestor tries to figure out common ancestor slot that connects a given root to known block. func (f *blocksFetcher) findAncestor(ctx context.Context, pid peer.ID, b interfaces.ReadOnlySignedBeaconBlock) (*forkData, error) { + const ( + delay = 5 * time.Second + batchSize = 32 + ) outBlocks := []interfaces.ReadOnlySignedBeaconBlock{b} for i := uint64(0); i < backtrackingMaxHops; i++ { parentRoot := outBlocks[len(outBlocks)-1].Block().ParentRoot() @@ -312,7 +323,7 @@ func (f *blocksFetcher) findAncestor(ctx context.Context, pid peer.ID, b interfa return nil, errors.Wrap(err, "received invalid blocks in findAncestor") } if coreTime.PeerDASIsActive(b.Block().Slot()) { - if err := f.fetchDataColumnsFromPeers(ctx, bwb, []peer.ID{pid}); err != nil { + if err := f.fetchDataColumnsFromPeers(ctx, bwb, []peer.ID{pid}, delay, batchSize); err != nil { return nil, errors.Wrap(err, "unable to retrieve columns for blocks found in findAncestor") } } else { @@ -371,12 +382,12 @@ func (f *blocksFetcher) calculateHeadAndTargetEpochs() (headEpoch, targetEpoch p return headEpoch, targetEpoch, peers } -// custodyAllNeededColumns filter `inputPeers` that custody all columns in `columns`. -func (f *blocksFetcher) custodyAllNeededColumns(inputPeers map[peer.ID]bool, columns map[uint64]bool) (map[peer.ID]bool, error) { - outputPeers := make(map[peer.ID]bool, len(inputPeers)) +// custodyColumnFromPeer compute all costody columns indexed by peer. +func (f *blocksFetcher) custodyDataColumnsFromPeer(peers map[peer.ID]bool) (map[peer.ID]map[uint64]bool, error) { + peerCount := len(peers) -loop: - for peer := range inputPeers { + custodyDataColumnsByPeer := make(map[peer.ID]map[uint64]bool, peerCount) + for peer := range peers { // Get the node ID from the peer ID. nodeID, err := p2p.ConvertPeerIDToNodeID(peer) if err != nil { @@ -387,59 +398,125 @@ loop: custodyCount := f.p2p.DataColumnsCustodyCountFromRemotePeer(peer) // Get the custody columns from the peer. - remoteCustodyColumns, err := peerdas.CustodyColumns(nodeID, custodyCount) + custodyDataColumns, err := peerdas.CustodyColumns(nodeID, custodyCount) if err != nil { return nil, errors.Wrap(err, "custody columns") } - for column := range columns { - if !remoteCustodyColumns[column] { - continue loop + custodyDataColumnsByPeer[peer] = custodyDataColumns + } + + return custodyDataColumnsByPeer, nil +} + +// uint64MapToSortedSlice produces a sorted uint64 slice from a map. +func uint64MapToSortedSlice(input map[uint64]bool) []uint64 { + output := make([]uint64, 0, len(input)) + for idx := range input { + output = append(output, idx) + } + + slices.Sort[[]uint64](output) + return output +} + +// `filterPeerWhichCustodyAtLeastOneDataColumn` filters peers which custody at least one data column +// specified in `neededDataColumns`. It returns also a list of descriptions for non admissible peers. +func filterPeerWhichCustodyAtLeastOneDataColumn( + neededDataColumns map[uint64]bool, + inputDataColumnsByPeer map[peer.ID]map[uint64]bool, +) (map[peer.ID]map[uint64]bool, []string) { + // Get the count of needed data columns. + neededDataColumnsCount := uint64(len(neededDataColumns)) + + // Create pretty needed data columns for logs. + var neededDataColumnsLog interface{} = "all" + numberOfColumns := params.BeaconConfig().NumberOfColumns + + if neededDataColumnsCount < numberOfColumns { + neededDataColumnsLog = uint64MapToSortedSlice(neededDataColumns) + } + + outputDataColumnsByPeer := make(map[peer.ID]map[uint64]bool, len(inputDataColumnsByPeer)) + descriptions := make([]string, 0) + +outerLoop: + for peer, peerCustodyDataColumns := range inputDataColumnsByPeer { + for neededDataColumn := range neededDataColumns { + if peerCustodyDataColumns[neededDataColumn] { + outputDataColumnsByPeer[peer] = peerCustodyDataColumns + + continue outerLoop } } - outputPeers[peer] = true + peerCustodyColumnsCount := uint64(len(peerCustodyDataColumns)) + var peerCustodyColumnsLog interface{} = "all" + + if peerCustodyColumnsCount < numberOfColumns { + peerCustodyColumnsLog = uint64MapToSortedSlice(peerCustodyDataColumns) + } + + description := fmt.Sprintf( + "peer %s: does not custody any needed column, custody columns: %v, needed columns: %v", + peer, peerCustodyColumnsLog, neededDataColumnsLog, + ) + + descriptions = append(descriptions, description) } - return outputPeers, nil + return outputDataColumnsByPeer, descriptions } -// peersWithSlotAndDataColumns returns a list of peers that should custody all needed data columns for the given slot. -func (f *blocksFetcher) peersWithSlotAndDataColumns( - ctx context.Context, +// admissiblePeersForDataColumn returns a map of peers that: +// - custody at least one column listed in `neededDataColumns`, +// - are synced to `targetSlot`, and +// - have enough bandwidth to serve data columns corresponding to `count` blocks. +// It returns: +// - A map, where the key of the map is the peer, the value is the custody columns of the peer. +// - A map, where the key of the map is the data column, the value is the peer that custody the data column. +// - A slice of descriptions for non admissible peers. +// - An error if any. +func (f *blocksFetcher) admissiblePeersForDataColumn( peers []peer.ID, targetSlot primitives.Slot, - dataColumns map[uint64]bool, + neededDataColumns map[uint64]bool, count uint64, -) ([]peer.ID, []string, error) { - peersCount := len(peers) +) (map[peer.ID]map[uint64]bool, map[uint64][]peer.ID, []string, error) { + // If no peer is specified, get all connected peers. + inputPeers := peers + if inputPeers == nil { + inputPeers = f.p2p.Peers().Connected() + } - // Filter peers based on the percentage of peers to be used in a request. - peers = f.filterPeers(ctx, peers, peersPercentagePerRequestDataColumns) + inputPeerCount := len(inputPeers) + neededDataColumnsCount := uint64(len(neededDataColumns)) - // // Filter peers on bandwidth. - peers = f.hasSufficientBandwidth(peers, count) + // Create description slice for non admissible peers. + descriptions := make([]string, 0, inputPeerCount) - // Select peers which custody ALL wanted columns. - // Basically, it is very unlikely that a non-supernode peer will have custody of all columns. - // TODO: Modify to retrieve data columns from all possible peers. - // TODO: If a peer does respond some of the request columns, do not re-request responded columns. + // Filter peers on bandwidth. + peersWithSufficientBandwidth := f.hasSufficientBandwidth(inputPeers, count) + + // Convert peers with sufficient bandwidth to a map. + peerWithSufficientBandwidthMap := make(map[peer.ID]bool, len(peersWithSufficientBandwidth)) + for _, peer := range peersWithSufficientBandwidth { + peerWithSufficientBandwidthMap[peer] = true + } + + for _, peer := range inputPeers { + if !peerWithSufficientBandwidthMap[peer] { + description := fmt.Sprintf("peer %s: does not have sufficient bandwidth", peer) + descriptions = append(descriptions, description) + } + } // Compute the target epoch from the target slot. targetEpoch := slots.ToEpoch(targetSlot) - peersWithAdmissibleHeadEpoch := make(map[peer.ID]bool, peersCount) - descriptions := make([]string, 0, peersCount) - - // Filter out peers with head epoch lower than our target epoch. - // Technically, we should be able to use the head slot from the peer. - // However, our vision of the head slot of the peer is updated twice per epoch - // via P2P messages. So it is likely that we think the peer is lagging behind - // while it is actually not. - // ==> We use the head epoch as a proxy instead. - // However, if the peer is actually lagging for a few slots, - // we may requests some data columns it doesn't have yet. - for _, peer := range peers { + // Filter peers with head epoch lower than our target epoch. + peersWithAdmissibleHeadEpoch := make(map[peer.ID]bool, inputPeerCount) + for _, peer := range peersWithSufficientBandwidth { peerChainState, err := f.p2p.Peers().ChainState(peer) if err != nil { @@ -457,7 +534,7 @@ func (f *blocksFetcher) peersWithSlotAndDataColumns( peerHeadEpoch := slots.ToEpoch(peerChainState.HeadSlot) if peerHeadEpoch < targetEpoch { - description := fmt.Sprintf("peer %s: head epoch %d < target epoch %d", peer, peerHeadEpoch, targetEpoch) + description := fmt.Sprintf("peer %s: peer head epoch %d < our target epoch %d", peer, peerHeadEpoch, targetEpoch) descriptions = append(descriptions, description) continue } @@ -465,29 +542,112 @@ func (f *blocksFetcher) peersWithSlotAndDataColumns( peersWithAdmissibleHeadEpoch[peer] = true } - // Filter out peers that do not have all the data columns needed. - finalPeers, err := f.custodyAllNeededColumns(peersWithAdmissibleHeadEpoch, dataColumns) + // Compute custody columns for each peer. + dataColumnsByPeerWithAdmissibleHeadEpoch, err := f.custodyDataColumnsFromPeer(peersWithAdmissibleHeadEpoch) if err != nil { - return nil, nil, errors.Wrap(err, "custody all needed columns") + return nil, nil, nil, errors.Wrap(err, "custody columns from peer") } - for peer := range peersWithAdmissibleHeadEpoch { - if _, ok := finalPeers[peer]; !ok { - description := fmt.Sprintf("peer %s: does not custody all needed columns", peer) - descriptions = append(descriptions, description) + // Filter peers which custody at least one needed data column. + dataColumnsByAdmissiblePeer, localDescriptions := filterPeerWhichCustodyAtLeastOneDataColumn(neededDataColumns, dataColumnsByPeerWithAdmissibleHeadEpoch) + descriptions = append(descriptions, localDescriptions...) + + // Compute a map from needed data columns to their peers. + admissiblePeersByDataColumn := make(map[uint64][]peer.ID, neededDataColumnsCount) + for peer, peerCustodyDataColumns := range dataColumnsByAdmissiblePeer { + for dataColumn := range peerCustodyDataColumns { + admissiblePeersByDataColumn[dataColumn] = append(admissiblePeersByDataColumn[dataColumn], peer) + } + } + + return dataColumnsByAdmissiblePeer, admissiblePeersByDataColumn, descriptions, nil +} + +// selectPeersToFetchDataColumnsFrom implements greedy algorithm in order to select peers to fetch data columns from. +// https://en.wikipedia.org/wiki/Set_cover_problem#Greedy_algorithm +func selectPeersToFetchDataColumnsFrom( + neededDataColumns map[uint64]bool, + dataColumnsByPeer map[peer.ID]map[uint64]bool, +) (map[peer.ID][]uint64, error) { + dataColumnsFromSelectedPeers := make(map[peer.ID][]uint64) + + // Filter `dataColumnsByPeer` to only contain needed data columns. + neededDataColumnsByPeer := make(map[peer.ID]map[uint64]bool, len(dataColumnsByPeer)) + for pid, dataColumns := range dataColumnsByPeer { + for dataColumn := range dataColumns { + if neededDataColumns[dataColumn] { + if _, ok := neededDataColumnsByPeer[pid]; !ok { + neededDataColumnsByPeer[pid] = make(map[uint64]bool, len(neededDataColumns)) + } + + neededDataColumnsByPeer[pid][dataColumn] = true + } } } - // Convert the map to a slice. - finalPeersSlice := make([]peer.ID, 0, len(finalPeers)) - for peer := range finalPeers { - finalPeersSlice = append(finalPeersSlice, peer) + for len(neededDataColumns) > 0 { + // Check if at least one peer remains. If not, it means that we don't have enough peers to fetch all needed data columns. + if len(neededDataColumnsByPeer) == 0 { + missingDataColumnsSortedSlice := uint64MapToSortedSlice(neededDataColumns) + return dataColumnsFromSelectedPeers, errors.Errorf("no peer to fetch the following data columns: %v", missingDataColumnsSortedSlice) + } + + // Select the peer that custody the most needed data columns (greedy selection). + var bestPeer peer.ID + for peer, dataColumns := range neededDataColumnsByPeer { + if len(dataColumns) > len(neededDataColumnsByPeer[bestPeer]) { + bestPeer = peer + } + } + + dataColumnsSortedSlice := uint64MapToSortedSlice(neededDataColumnsByPeer[bestPeer]) + dataColumnsFromSelectedPeers[bestPeer] = dataColumnsSortedSlice + + // Remove the selected peer from the list of peers. + delete(neededDataColumnsByPeer, bestPeer) + + // Remove the selected peer's data columns from the list of needed data columns. + for _, dataColumn := range dataColumnsSortedSlice { + delete(neededDataColumns, dataColumn) + } + + // Remove the selected peer's data columns from the list of needed data columns by peer. + for _, dataColumn := range dataColumnsSortedSlice { + for peer, dataColumns := range neededDataColumnsByPeer { + delete(dataColumns, dataColumn) + + if len(dataColumns) == 0 { + delete(neededDataColumnsByPeer, peer) + } + } + } } - // Shuffle the peers. - f.rand.Shuffle(len(finalPeersSlice), func(i, j int) { - finalPeersSlice[i], finalPeersSlice[j] = finalPeersSlice[j], finalPeersSlice[i] - }) + return dataColumnsFromSelectedPeers, nil +} + +// buildDataColumnSidecarsByRangeRequests builds a list of data column sidecars by range requests. +// Each request contains at most `batchSize` items. +func buildDataColumnSidecarsByRangeRequests( + startSlot primitives.Slot, + count uint64, + columns []uint64, + batchSize uint64, +) []*p2ppb.DataColumnSidecarsByRangeRequest { + batches := make([]*p2ppb.DataColumnSidecarsByRangeRequest, 0) + + for i := uint64(0); i < count; i += batchSize { + localStartSlot := startSlot + primitives.Slot(i) + localCount := min(batchSize, uint64(startSlot)+count-uint64(localStartSlot)) + + batch := &p2ppb.DataColumnSidecarsByRangeRequest{ + StartSlot: localStartSlot, + Count: localCount, + Columns: columns, + } + + batches = append(batches, batch) + } - return finalPeersSlice, descriptions, nil + return batches } diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go index 93c2055f3d81..867f6db84aff 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go @@ -2,6 +2,7 @@ package initialsync import ( "context" + "errors" "fmt" "sync" "testing" @@ -644,48 +645,210 @@ func TestBlocksFetcher_currentHeadAndTargetEpochs(t *testing.T) { } } -func TestCustodyAllNeededColumns(t *testing.T) { - const dataColumnsCount = 31 +func TestSelectPeersToFetchDataColumnsFrom(t *testing.T) { + testCases := []struct { + name string - p2p := p2pt.NewTestP2P(t) + // Inputs + neededDataColumns map[uint64]bool + dataColumnsByPeer map[peer.ID]map[uint64]bool - dataColumns := make(map[uint64]bool, dataColumnsCount) - for i := range dataColumnsCount { - dataColumns[uint64(i)] = true + // Expected outputs + dataColumnsToFetchByPeer map[peer.ID][]uint64 + err error + }{ + { + name: "no data columns needed", + neededDataColumns: map[uint64]bool{}, + dataColumnsByPeer: map[peer.ID]map[uint64]bool{ + peer.ID("peer1"): {1: true, 2: true}, + peer.ID("peer2"): {3: true, 4: true}, + }, + dataColumnsToFetchByPeer: map[peer.ID][]uint64{}, + err: nil, + }, + { + name: "one peer has all data columns needed", + neededDataColumns: map[uint64]bool{1: true, 3: true, 5: true}, + dataColumnsByPeer: map[peer.ID]map[uint64]bool{ + peer.ID("peer1"): {2: true, 4: true}, + peer.ID("peer2"): {1: true, 3: true, 5: true, 7: true, 9: true}, + peer.ID("peer3"): {6: true}, + }, + dataColumnsToFetchByPeer: map[peer.ID][]uint64{ + peer.ID("peer2"): {1, 3, 5}, + }, + err: nil, + }, + { + name: "multiple peers are needed - 1", + neededDataColumns: map[uint64]bool{1: true, 3: true, 5: true, 7: true, 9: true}, + dataColumnsByPeer: map[peer.ID]map[uint64]bool{ + peer.ID("peer1"): {3: true, 7: true}, + peer.ID("peer2"): {1: true, 3: true, 5: true, 9: true, 10: true}, + peer.ID("peer3"): {6: true, 10: true, 12: true, 14: true, 16: true, 18: true, 20: true}, + peer.ID("peer4"): {9: true}, + }, + dataColumnsToFetchByPeer: map[peer.ID][]uint64{ + peer.ID("peer2"): {1, 3, 5, 9}, + peer.ID("peer1"): {7}, + }, + err: nil, + }, + { + name: "multiple peers are needed - 2", + neededDataColumns: map[uint64]bool{1: true, 3: true, 5: true, 7: true, 9: true}, + dataColumnsByPeer: map[peer.ID]map[uint64]bool{ + peer.ID("peer1"): {9: true, 10: true}, + peer.ID("peer2"): {3: true, 7: true}, + peer.ID("peer3"): {1: true, 5: true}, + }, + dataColumnsToFetchByPeer: map[peer.ID][]uint64{ + peer.ID("peer1"): {9}, + peer.ID("peer2"): {3, 7}, + peer.ID("peer3"): {1, 5}, + }, + err: nil, + }, + { + name: "some columns are not owned by any peer", + neededDataColumns: map[uint64]bool{1: true, 3: true, 5: true, 7: true, 9: true}, + dataColumnsByPeer: map[peer.ID]map[uint64]bool{ + peer.ID("peer1"): {9: true, 10: true}, + peer.ID("peer2"): {2: true, 6: true}, + peer.ID("peer3"): {1: true, 5: true}, + }, + dataColumnsToFetchByPeer: map[peer.ID][]uint64{ + peer.ID("peer1"): {9}, + peer.ID("peer3"): {1, 5}, + }, + err: errors.New("no peer to fetch the following data columns: [3 7]"), + }, } - custodyCounts := [...]uint64{ - 4 * params.BeaconConfig().CustodyRequirement, - 32 * params.BeaconConfig().CustodyRequirement, - 4 * params.BeaconConfig().CustodyRequirement, - 32 * params.BeaconConfig().CustodyRequirement, - } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actual, err := selectPeersToFetchDataColumnsFrom(tc.neededDataColumns, tc.dataColumnsByPeer) - expected := make(map[peer.ID]bool) + if tc.err != nil { + require.Equal(t, tc.err.Error(), err.Error()) + } else { + require.NoError(t, err) + } - peersID := make(map[peer.ID]bool, len(custodyCounts)) - for _, custodyCount := range custodyCounts { - peerRecord, peerID := createPeer(t, len(peersID), custodyCount) - peersID[peerID] = true - p2p.Peers().Add(peerRecord, peerID, nil, network.DirOutbound) - if custodyCount == 32*params.BeaconConfig().CustodyRequirement { - expected[peerID] = true - } + expected := tc.dataColumnsToFetchByPeer + require.Equal(t, len(expected), len(actual)) + + for peerID, expectedDataColumns := range expected { + actualDataColumns, ok := actual[peerID] + require.Equal(t, true, ok) + require.DeepSSZEqual(t, expectedDataColumns, actualDataColumns) + } + }) } - blocksFetcher := newBlocksFetcher( - context.Background(), - &blocksFetcherConfig{ - p2p: p2p, - }, - ) +} - actual, err := blocksFetcher.custodyAllNeededColumns(peersID, dataColumns) - require.NoError(t, err) +func TestBuildDataColumnSidecarsByRangeRequest(t *testing.T) { + const batchSize = 32 + testCases := []struct { + name string + startSlot primitives.Slot + count uint64 + columns []uint64 + expected []*ethpb.DataColumnSidecarsByRangeRequest + }{ + { + name: "one item - 1", + startSlot: 20, + count: 10, + columns: []uint64{1, 2, 3, 4, 5}, + expected: []*ethpb.DataColumnSidecarsByRangeRequest{ + { + StartSlot: 20, + Count: 10, + Columns: []uint64{1, 2, 3, 4, 5}, + }, + }, + }, + { + name: "one item - 2", + startSlot: 20, + count: 32, + columns: []uint64{1, 2, 3, 4, 5}, + expected: []*ethpb.DataColumnSidecarsByRangeRequest{ + { + StartSlot: 20, + Count: 32, + Columns: []uint64{1, 2, 3, 4, 5}, + }, + }, + }, + { + name: "two items - 1", + startSlot: 20, + count: 33, + columns: []uint64{1, 2, 3, 4, 5}, + expected: []*ethpb.DataColumnSidecarsByRangeRequest{ + { + StartSlot: 20, + Count: 32, + Columns: []uint64{1, 2, 3, 4, 5}, + }, + { + StartSlot: 52, + Count: 1, + Columns: []uint64{1, 2, 3, 4, 5}, + }, + }, + }, + { + name: "two items - 2", + startSlot: 20, + count: 64, + columns: []uint64{1, 2, 3, 4, 5}, + expected: []*ethpb.DataColumnSidecarsByRangeRequest{ + { + StartSlot: 20, + Count: 32, + Columns: []uint64{1, 2, 3, 4, 5}, + }, + { + StartSlot: 52, + Count: 32, + Columns: []uint64{1, 2, 3, 4, 5}, + }, + }, + }, + { + name: "three items", + startSlot: 20, + count: 66, + columns: []uint64{1, 2, 3, 4, 5}, + expected: []*ethpb.DataColumnSidecarsByRangeRequest{ + { + StartSlot: 20, + Count: 32, + Columns: []uint64{1, 2, 3, 4, 5}, + }, + { + StartSlot: 52, + Count: 32, + Columns: []uint64{1, 2, 3, 4, 5}, + }, + { + StartSlot: 84, + Count: 2, + Columns: []uint64{1, 2, 3, 4, 5}, + }, + }, + }, + } - require.Equal(t, len(expected), len(actual)) - for peerID := range expected { - _, ok := actual[peerID] - require.Equal(t, true, ok) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actual := buildDataColumnSidecarsByRangeRequests(tc.startSlot, tc.count, tc.columns, batchSize) + require.DeepSSZEqual(t, tc.expected, actual) + }) } } diff --git a/beacon-chain/sync/rpc_data_column_sidecars_by_range.go b/beacon-chain/sync/rpc_data_column_sidecars_by_range.go index d677b6dba6c1..d40fb8653e5a 100644 --- a/beacon-chain/sync/rpc_data_column_sidecars_by_range.go +++ b/beacon-chain/sync/rpc_data_column_sidecars_by_range.go @@ -19,31 +19,42 @@ import ( "github.com/sirupsen/logrus" ) -func (s *Service) streamDataColumnBatch(ctx context.Context, batch blockBatch, wQuota uint64, wantedIndexes map[uint64]bool, stream libp2pcore.Stream) (uint64, error) { +func (s *Service) streamDataColumnBatch(ctx context.Context, batch blockBatch, wQuota uint64, wantedDataColumnIndices map[uint64]bool, stream libp2pcore.Stream) (uint64, error) { + _, span := trace.StartSpan(ctx, "sync.streamDataColumnBatch") + defer span.End() + // Defensive check to guard against underflow. if wQuota == 0 { return 0, nil } - _, span := trace.StartSpan(ctx, "sync.streamDataColumnBatch") - defer span.End() - for _, b := range batch.canonical() { - root := b.Root() - idxs, err := s.cfg.blobStorage.ColumnIndices(b.Root()) + + for _, block := range batch.canonical() { + // Get the block blockRoot. + blockRoot := block.Root() + + // Retrieve stored data columns indices for this block root. + storedDataColumnsIndices, err := s.cfg.blobStorage.ColumnIndices(blockRoot) + if err != nil { s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) - return wQuota, errors.Wrapf(err, "could not retrieve sidecars for block root %#x", root) + return wQuota, errors.Wrapf(err, "could not retrieve data columns indice for block root %#x", blockRoot) } - for i, l := uint64(0), uint64(len(idxs)); i < l; i++ { - // index not available or unwanted, skip - if !idxs[i] || !wantedIndexes[i] { + + for dataColumnIndex := range wantedDataColumnIndices { + isDataColumnStored := storedDataColumnsIndices[dataColumnIndex] + + // Skip if the data column is not stored. + if !isDataColumnStored { continue } + // We won't check for file not found since the .Indices method should normally prevent that from happening. - sc, err := s.cfg.blobStorage.GetColumn(b.Root(), i) + sc, err := s.cfg.blobStorage.GetColumn(blockRoot, dataColumnIndex) if err != nil { s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) - return wQuota, errors.Wrapf(err, "could not retrieve data column sidecar: index %d, block root %#x", i, root) + return wQuota, errors.Wrapf(err, "could not retrieve data column sidecar: index %d, block root %#x", dataColumnIndex, blockRoot) } + SetStreamWriteDeadline(stream, defaultWriteDuration) if chunkErr := WriteDataColumnSidecarChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), sc); chunkErr != nil { log.WithError(chunkErr).Debug("Could not send a chunked response") @@ -51,24 +62,28 @@ func (s *Service) streamDataColumnBatch(ctx context.Context, batch blockBatch, w tracing.AnnotateError(span, chunkErr) return wQuota, chunkErr } + s.rateLimiter.add(stream, 1) wQuota -= 1 + // Stop streaming results once the quota of writes for the request is consumed. if wQuota == 0 { return 0, nil } } } + return wQuota, nil } // dataColumnSidecarsByRangeRPCHandler looks up the request data columns from the database from a given start slot index func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { - var err error ctx, span := trace.StartSpan(ctx, "sync.DataColumnSidecarsByRangeHandler") defer span.End() + ctx, cancel := context.WithTimeout(ctx, respTimeout) defer cancel() + SetRPCStreamDeadlines(stream) r, ok := msg.(*pb.DataColumnSidecarsByRangeRequest) @@ -93,7 +108,6 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i requestedColumnsCount := uint64(len(requestedColumns)) // Format log fields. - var ( custodyColumnsLog interface{} = "all" requestedColumnsLog interface{} = "all" @@ -121,10 +135,11 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i if err := s.rateLimiter.validateRequest(stream, 1); err != nil { return err } + rp, err := validateDataColumnsByRange(r, s.cfg.chain.CurrentSlot()) if err != nil { s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream) - s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) + s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer) tracing.AnnotateError(span, err) return err } @@ -132,6 +147,7 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i // Ticker to stagger out large requests. ticker := time.NewTicker(time.Second) defer ticker.Stop() + batcher, err := newBlockRangeBatcher(rp, s.cfg.beaconDB, s.rateLimiter, s.cfg.chain.IsCanonical, ticker) if err != nil { log.WithError(err).Info("Error in DataColumnSidecarsByRange batch") @@ -139,8 +155,9 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i tracing.AnnotateError(span, err) return err } + // Derive the wanted columns for the request. - wantedColumns := map[uint64]bool{} + wantedColumns := make(map[uint64]bool, len(r.Columns)) for _, c := range r.Columns { wantedColumns[c] = true } @@ -182,18 +199,19 @@ func columnBatchLimit() uint64 { // TODO: Generalize between data columns and blobs, while the validation parameters used are different they // are the same value in the config. Can this be safely abstracted ? -func validateDataColumnsByRange(r *pb.DataColumnSidecarsByRangeRequest, current primitives.Slot) (rangeParams, error) { +func validateDataColumnsByRange(r *pb.DataColumnSidecarsByRangeRequest, currentSlot primitives.Slot) (rangeParams, error) { if r.Count == 0 { return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "invalid request Count parameter") } + rp := rangeParams{ start: r.StartSlot, size: r.Count, } // Peers may overshoot the current slot when in initial sync, so we don't want to penalize them by treating the // request as an error. So instead we return a set of params that acts as a noop. - if rp.start > current { - return rangeParams{start: current, end: current, size: 0}, nil + if rp.start > currentSlot { + return rangeParams{start: currentSlot, end: currentSlot, size: 0}, nil } var err error @@ -202,10 +220,13 @@ func validateDataColumnsByRange(r *pb.DataColumnSidecarsByRangeRequest, current return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "overflow start + count -1") } - maxRequest := params.MaxRequestBlock(slots.ToEpoch(current)) + // Get current epoch from current slot. + currentEpoch := slots.ToEpoch(currentSlot) + + maxRequest := params.MaxRequestBlock(currentEpoch) // Allow some wiggle room, up to double the MaxRequestBlocks past the current slot, // to give nodes syncing close to the head of the chain some margin for error. - maxStart, err := current.SafeAdd(maxRequest * 2) + maxStart, err := currentSlot.SafeAdd(maxRequest * 2) if err != nil { return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "current + maxRequest * 2 > max uint") } @@ -214,20 +235,23 @@ func validateDataColumnsByRange(r *pb.DataColumnSidecarsByRangeRequest, current // [max(current_epoch - MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS, DENEB_FORK_EPOCH), current_epoch] // where current_epoch is defined by the current wall-clock time, // and clients MUST support serving requests of data columns on this range. - minStartSlot, err := DataColumnsRPCMinValidSlot(current) + minStartSlot, err := DataColumnsRPCMinValidSlot(currentSlot) if err != nil { return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "DataColumnsRPCMinValidSlot error") } + if rp.start > maxStart { return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "start > maxStart") } + if rp.start < minStartSlot { rp.start = minStartSlot } - if rp.end > current { - rp.end = current + if rp.end > currentSlot { + rp.end = currentSlot } + if rp.end < rp.start { rp.end = rp.start } @@ -236,6 +260,7 @@ func validateDataColumnsByRange(r *pb.DataColumnSidecarsByRangeRequest, current if limit > maxRequest { limit = maxRequest } + if rp.size > limit { rp.size = limit }