diff --git a/beacon-chain/sync/initial-sync/BUILD.bazel b/beacon-chain/sync/initial-sync/BUILD.bazel index da6ec0c57ae..aef3e650228 100644 --- a/beacon-chain/sync/initial-sync/BUILD.bazel +++ b/beacon-chain/sync/initial-sync/BUILD.bazel @@ -98,7 +98,6 @@ go_test( "//consensus-types/primitives:go_default_library", "//container/leaky-bucket:go_default_library", "//container/slice:go_default_library", - "//crypto/ecdsa:go_default_library", "//crypto/hash:go_default_library", "//encoding/bytesutil:go_default_library", "//network/forks:go_default_library", @@ -111,7 +110,6 @@ go_test( "//time/slots:go_default_library", "@com_github_consensys_gnark_crypto//ecc/bls12-381/fr:go_default_library", "@com_github_crate_crypto_go_kzg_4844//:go_default_library", - "@com_github_ethereum_go_ethereum//p2p/enode:go_default_library", "@com_github_ethereum_go_ethereum//p2p/enr:go_default_library", "@com_github_libp2p_go_libp2p//:go_default_library", "@com_github_libp2p_go_libp2p//core:go_default_library", diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 16d66245157..deb9998ec88 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -3,6 +3,7 @@ package initialsync import ( "context" "fmt" + "slices" "sort" "strings" "sync" @@ -41,8 +42,6 @@ const ( maxPendingRequests = 64 // peersPercentagePerRequest caps percentage of peers to be used in a request. peersPercentagePerRequest = 0.75 - // peersPercentagePerRequestDataColumns caps percentage of peers to be used in a data columns request. - peersPercentagePerRequestDataColumns = 1. // handshakePollingInterval is a polling interval for checking the number of received handshakes. handshakePollingInterval = 5 * time.Second // peerLocksPollingInterval is a polling interval for checking if there are stale peer locks. @@ -293,6 +292,11 @@ func (f *blocksFetcher) scheduleRequest(ctx context.Context, start primitives.Sl // handleRequest parses fetch request and forwards it to response builder. func (f *blocksFetcher) handleRequest(ctx context.Context, start primitives.Slot, count uint64) *fetchRequestResponse { + const ( + delay = 5 * time.Second + batchSize = 32 + ) + ctx, span := trace.StartSpan(ctx, "initialsync.handleRequest") defer span.End() @@ -334,7 +338,7 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start primitives.Slot } if coreTime.PeerDASIsActive(start) { - response.err = f.fetchDataColumnsFromPeers(ctx, response.bwb, nil) + response.err = f.fetchDataColumnsFromPeers(ctx, response.bwb, nil, delay, batchSize) return response } @@ -621,10 +625,13 @@ type bwbSlice struct { // buildBwbSlices builds slices of `bwb` that aims to optimize the count of // by range requests needed to fetch missing data columns. -func buildBwbSlices( - bwbs []blocks.BlockWithROBlobs, - missingColumnsByRoot map[[fieldparams.RootLength]byte]map[uint64]bool, -) ([]bwbSlice, error) { +func buildBwbSlices(wrappedBwbsMissingColumns *bwbsMissingColumns) ([]bwbSlice, error) { + wrappedBwbsMissingColumns.mu.Lock() + defer wrappedBwbsMissingColumns.mu.Unlock() + + bwbs := wrappedBwbsMissingColumns.bwbs + missingColumnsByRoot := wrappedBwbsMissingColumns.missingColumnsByRoot + // Return early if there are no blocks to process. if len(bwbs) == 0 { return []bwbSlice{}, nil @@ -634,10 +641,12 @@ func buildBwbSlices( firstROBlock := bwbs[0].Block firstBlockRoot := firstROBlock.Root() - previousMissingDataColumns := map[uint64]bool{} + previousMissingDataColumns := make(map[uint64]bool, len(missingColumnsByRoot[firstBlockRoot])) if missing, ok := missingColumnsByRoot[firstBlockRoot]; ok { - previousMissingDataColumns = missing + for key, value := range missing { + previousMissingDataColumns[key] = value + } } previousBlockSlot := firstROBlock.Block().Slot() @@ -680,7 +689,10 @@ func buildBwbSlices( currentBlockRoot := currentROBlock.Root() // Get the missing data columns for the current block. - missingDataColumns := missingColumnsByRoot[currentBlockRoot] + missingDataColumns := make(map[uint64]bool, len(missingColumnsByRoot[currentBlockRoot])) + for key, value := range missingColumnsByRoot[currentBlockRoot] { + missingDataColumns[key] = value + } // Compute if the missing data columns differ. missingDataColumnsDiffer := uint64MapDiffer(previousMissingDataColumns, missingDataColumns) @@ -824,6 +836,103 @@ func blockFromRoot(bwb []blocks.BlockWithROBlobs) map[[fieldparams.RootLength]by return result } +// computeMissingDataColumnsCount returns the count of missing columns. +func computeMissingDataColumnsCount(missingColumnsByRoot map[[fieldparams.RootLength]byte]map[uint64]bool) int { + count := 0 + for _, columns := range missingColumnsByRoot { + count += len(columns) + } + return count +} + +func (f *blocksFetcher) fetchBwbSliceFromPeers( + ctx context.Context, + identifier int, + wrappedBwbsMissingColumns *bwbsMissingColumns, + peers []peer.ID, + batchSize uint64, + bwbSlice bwbSlice) error { + // Filter out slices that are already complete. + if len(bwbSlice.dataColumns) == 0 { + return nil + } + + // Compute the start and end slot of the request. + startSlot, endSlot := func() (primitives.Slot, primitives.Slot) { + mu := &wrappedBwbsMissingColumns.mu + + mu.RLock() + defer mu.RUnlock() + + bwbs := wrappedBwbsMissingColumns.bwbs + + startSlot := bwbs[bwbSlice.start].Block.Block().Slot() + endSlot := bwbs[bwbSlice.end].Block.Block().Slot() + + return startSlot, endSlot + }() + + // Compute the block count of the request. + blockCount := uint64(endSlot - startSlot + 1) + + // Get all admissible peers with the data columns they custody. + dataColumnsByAdmissiblePeer, err := f.waitForPeersForDataColumns(identifier, peers, endSlot, bwbSlice.dataColumns, blockCount) + if err != nil { + return errors.Wrap(err, "wait for peers for data columns") + } + + // Select the peers that will be requested. + dataColumnsToFetchByPeer, err := selectPeersToFetchDataColumnsFrom(bwbSlice.dataColumns, dataColumnsByAdmissiblePeer) + if err != nil { + // This should never happen. + return errors.Wrap(err, "select peers to fetch data columns from") + } + + var wg sync.WaitGroup + + for peer, dataColumnsToFetch := range dataColumnsToFetchByPeer { + // Extract peer custody columns. + peerCustodyColumns := dataColumnsByAdmissiblePeer[peer] + + indicesByRoot, blocksByRoot := func() (map[[fieldparams.RootLength]byte][]int, map[[fieldparams.RootLength]byte]blocks.ROBlock) { + mu := &wrappedBwbsMissingColumns.mu + + mu.RLock() + defer mu.RUnlock() + + // Get `bwbs` indices indexed by root. + // Get blocks indexed by root. + + bwbs := wrappedBwbsMissingColumns.bwbs + return indicesFromRoot(bwbs), blockFromRoot(bwbs) + }() + + // Sort data columns. + slices.Sort[[]uint64](dataColumnsToFetch) + + // Build the requests. + requests := buildDataColumnSidecarsByRangeRequests(startSlot, blockCount, dataColumnsToFetch, batchSize) + + for _, request := range requests { + // Fetch the missing data columns from the peers. + wg.Add(1) + go f.fetchDataColumnFromPeer(ctx, &wg, identifier, wrappedBwbsMissingColumns, blocksByRoot, indicesByRoot, peer, peerCustodyColumns, request) + } + } + + // Wait for all requests to finish. + wg.Wait() + + return nil +} + +type bwbsMissingColumns struct { + mu sync.RWMutex + + bwbs []blocks.BlockWithROBlobs + missingColumnsByRoot map[[fieldparams.RootLength]byte]map[uint64]bool +} + // fetchDataColumnsFromPeers looks at the blocks in `bwb` and retrieves all // data columns for with the block has blob commitments, and for which our store is missing data columns // we should custody. @@ -833,16 +942,17 @@ func (f *blocksFetcher) fetchDataColumnsFromPeers( ctx context.Context, bwbs []blocks.BlockWithROBlobs, peers []peer.ID, + delay time.Duration, + batchSize uint64, ) error { // Time to wait if no peers are available. const ( - delay = 5 * time.Second // Time to wait before retrying to fetch data columns. - maxIdentifier = 1_000 // Max identifier for the request. + maxIdentifier = 1_000 // Max identifier for the request. + maxAllowedStall = 5 // Number of trials before giving up. ) // Generate random identifier. identifier := f.rand.Intn(maxIdentifier) - log := log.WithField("reqIdentifier", identifier) // Compute the columns we should custody. localCustodyColumns, err := f.custodyColumns() @@ -870,85 +980,70 @@ func (f *blocksFetcher) fetchDataColumnsFromPeers( return nil } + // Compute the number of missing data columns. + previousMissingDataColumnsCount := computeMissingDataColumnsCount(missingColumnsByRoot) + + // Count the number of retries for the same amount of missing data columns. + stallCount := 0 + + // Add log fields. + log := log.WithFields(logrus.Fields{ + "identifier": identifier, + "initialMissingColumnsCount": previousMissingDataColumnsCount, + }) + // Log the start of the process. start := time.Now() log.Debug("Fetch data columns from peers - start") + wrappedBwbsMissingColumns := &bwbsMissingColumns{ + bwbs: bwbs, + missingColumnsByRoot: missingColumnsByRoot, + } + for len(missingColumnsByRoot) > 0 { // Compute the optimal slices of `bwb` to minimize the number of by range returned columns. - bwbsSlices, err := buildBwbSlices(bwbs, missingColumnsByRoot) + bwbSlices, err := buildBwbSlices(wrappedBwbsMissingColumns) if err != nil { return errors.Wrap(err, "build bwb slices") } - outerLoop: - for _, bwbsSlice := range bwbsSlices { - lastSlot := bwbs[bwbsSlice.end].Block.Block().Slot() - dataColumnsSlice := sortedSliceFromMap(bwbsSlice.dataColumns) - dataColumnCount := uint64(len(dataColumnsSlice)) - - // Filter out slices that are already complete. - if dataColumnCount == 0 { - continue + for _, bwbSlice := range bwbSlices { + if err := f.fetchBwbSliceFromPeers(ctx, identifier, wrappedBwbsMissingColumns, peers, batchSize, bwbSlice); err != nil { + return errors.Wrap(err, "fetch BWB slice from peers") } + } - // If no peer is specified, get all connected peers. - peersToFilter := peers - if peersToFilter == nil { - peersToFilter = f.p2p.Peers().Connected() - } - - // Compute the block count of the request. - startSlot := bwbs[bwbsSlice.start].Block.Block().Slot() - endSlot := bwbs[bwbsSlice.end].Block.Block().Slot() - blockCount := uint64(endSlot - startSlot + 1) - - filteredPeers, err := f.waitForPeersForDataColumns(ctx, peersToFilter, lastSlot, bwbsSlice.dataColumns, blockCount) - if err != nil { - return errors.Wrap(err, "wait for peers for data columns") - } - - // Build the request. - request := &p2ppb.DataColumnSidecarsByRangeRequest{ - StartSlot: startSlot, - Count: blockCount, - Columns: dataColumnsSlice, - } - - // Get `bwbs` indices indexed by root. - indicesByRoot := indicesFromRoot(bwbs) - - // Get blocks indexed by root. - blocksByRoot := blockFromRoot(bwbs) + missingDataColumnsCount := computeMissingDataColumnsCount(missingColumnsByRoot) + if missingDataColumnsCount == previousMissingDataColumnsCount { + stallCount++ + } else { + stallCount = 0 + } - // Prepare nice log fields. - var columnsLog interface{} = "all" - numberOfColuns := params.BeaconConfig().NumberOfColumns - if dataColumnCount < numberOfColuns { - columnsLog = dataColumnsSlice - } + previousMissingDataColumnsCount = missingDataColumnsCount + if missingDataColumnsCount > 0 { log := log.WithFields(logrus.Fields{ - "start": request.StartSlot, - "count": request.Count, - "columns": columnsLog, + "remainingMissingColumnsCount": missingDataColumnsCount, + "stallCount": stallCount, + "maxAllowedStall": maxAllowedStall, }) - // Retrieve the missing data columns from the peers. - for _, peer := range filteredPeers { - success := f.fetchDataColumnFromPeer(ctx, bwbs, missingColumnsByRoot, blocksByRoot, indicesByRoot, peer, request) - - // If we have successfully retrieved some data columns, continue to the next slice. - if success { - continue outerLoop - } + if stallCount >= maxAllowedStall { + // It is very likely `bwbs` contains orphaned blocks, for which no peer has the data columns. + // We give up and let the state machine handle the situation. + const message = "Fetch data columns from peers - no progress, giving up" + log.Warning(message) + return errors.New(message) } - log.WithField("peers", filteredPeers).Warning("Fetch data columns from peers - no peers among this list returned any valid data columns") - } + time.Sleep(delay) - if len(missingColumnsByRoot) > 0 { - log.Debug("Fetch data columns from peers - continue") + log.WithFields(logrus.Fields{ + "remainingMissingColumnsCount": missingDataColumnsCount, + "stallCount": stallCount, + }).Debug("Fetch data columns from peers - continue") } } @@ -972,68 +1067,99 @@ func sortBwbsByColumnIndex(bwbs []blocks.BlockWithROBlobs) { // - synced up to `lastSlot`, // - custody all columns in `dataColumns`, and // - have bandwidth to serve `blockCount` blocks. -// It waits until at least one peer is available. +// It waits until at least one peer is available for all needed columns. +// It returns a map, where the key of the map is the peer, the value is the custody columns of the peer. func (f *blocksFetcher) waitForPeersForDataColumns( - ctx context.Context, + reqIdentifier int, peers []peer.ID, lastSlot primitives.Slot, - dataColumns map[uint64]bool, + neededDataColumns map[uint64]bool, blockCount uint64, -) ([]peer.ID, error) { +) (map[peer.ID]map[uint64]bool, error) { // Time to wait before retrying to find new peers. const delay = 5 * time.Second - // Filter peers that custody all columns we need and that are synced to the epoch. - filteredPeers, descriptions, err := f.peersWithSlotAndDataColumns(ctx, peers, lastSlot, dataColumns, blockCount) + var computeDataColumnsWithoutPeers = func(neededColumns map[uint64]bool, peersByColumn map[uint64][]peer.ID) map[uint64]bool { + result := make(map[uint64]bool) + for column := range neededColumns { + if _, ok := peersByColumn[column]; !ok { + result[column] = true + } + } + + return result + } + + // Get the peers that are admissible for the data columns. + dataColumnsByAdmissiblePeer, admissiblePeersByDataColumn, descriptions, err := f.admissiblePeersForDataColumn(peers, lastSlot, neededDataColumns, blockCount) if err != nil { return nil, errors.Wrap(err, "peers with slot and data columns") } - // Compute data columns count - dataColumnCount := uint64(len(dataColumns)) + dataColumnsWithoutPeers := computeDataColumnsWithoutPeers(neededDataColumns, admissiblePeersByDataColumn) - // Sort columns. - columnsSlice := sortedSliceFromMap(dataColumns) + // Wait if no suitable peers are available. + for len(dataColumnsWithoutPeers) > 0 { + // Sort columns. + neededDataColumnsSlice := sortedSliceFromMap(neededDataColumns) - // Build a nice log field. - var columnsLog interface{} = "all" - numberOfColuns := params.BeaconConfig().NumberOfColumns - if dataColumnCount < numberOfColuns { - columnsLog = columnsSlice - } + // Build a nice log fields. + numberOfColumns := params.BeaconConfig().NumberOfColumns + + var neededDataColumnsLog interface{} = "all" + neededDataColumnCount := uint64(len(neededDataColumns)) + if neededDataColumnCount < numberOfColumns { + neededDataColumnsLog = neededDataColumnsSlice + } + + var dataColumnsWithoutPeersLog interface{} = "all" + dataColumnsWithoutPeersCount := uint64(len(dataColumnsWithoutPeers)) + if dataColumnsWithoutPeersCount < numberOfColumns { + dataColumnsWithoutPeersLog = uint64MapToSortedSlice(dataColumnsWithoutPeers) + } - // Wait if no suitable peers are available. - for len(filteredPeers) == 0 { log. WithFields(logrus.Fields{ - "peers": peers, - "waitDuration": delay, - "targetSlot": lastSlot, - "columns": columnsLog, + "waitDuration": delay, + "targetSlot": lastSlot, + "neededDataColumns": neededDataColumnsLog, + "identifier": reqIdentifier, + "columnsWithoutPeers": dataColumnsWithoutPeersLog, }). - Warning("Fetch data columns from peers - no peers available to retrieve missing data columns, retrying later") + Warning("Fetch data columns from peers - no peers available to retrieve some missing data columns, retrying later") for _, description := range descriptions { log.Debug(description) } + for pid, peerDataColumns := range dataColumnsByAdmissiblePeer { + var peerDataColumnsLog interface{} = "all" + peerDataColumnsCount := uint64(len(peerDataColumns)) + if peerDataColumnsCount < numberOfColumns { + peerDataColumnsLog = uint64MapToSortedSlice(peerDataColumns) + } + + log.Debugf("peer %s: custody columns: %v", pid, peerDataColumnsLog) + } + time.Sleep(delay) - filteredPeers, descriptions, err = f.peersWithSlotAndDataColumns(ctx, peers, lastSlot, dataColumns, blockCount) + dataColumnsByAdmissiblePeer, admissiblePeersByDataColumn, descriptions, err = f.admissiblePeersForDataColumn(peers, lastSlot, neededDataColumns, blockCount) if err != nil { return nil, errors.Wrap(err, "peers with slot and data columns") } + + dataColumnsWithoutPeers = computeDataColumnsWithoutPeers(neededDataColumns, admissiblePeersByDataColumn) } - return filteredPeers, nil + return dataColumnsByAdmissiblePeer, nil } // processDataColumn mutates `bwbs` argument by adding the data column, // and mutates `missingColumnsByRoot` by removing the data column if the // data column passes all the check. func processDataColumn( - bwbs []blocks.BlockWithROBlobs, - missingColumnsByRoot map[[fieldparams.RootLength]byte]map[uint64]bool, + wrappedBwbsMissingColumns *bwbsMissingColumns, columnVerifier verification.NewColumnVerifier, blocksByRoot map[[fieldparams.RootLength]byte]blocks.ROBlock, indicesByRoot map[[fieldparams.RootLength]byte][]int, @@ -1071,15 +1197,25 @@ func processDataColumn( } // Populate the corresponding items in `bwbs`. - for _, index := range indices { - bwbs[index].Columns = append(bwbs[index].Columns, dataColumn) - } + func() { + mu := &wrappedBwbsMissingColumns.mu - // Remove the column from the missing columns. - delete(missingColumnsByRoot[blockRoot], dataColumn.ColumnIndex) - if len(missingColumnsByRoot[blockRoot]) == 0 { - delete(missingColumnsByRoot, blockRoot) - } + mu.Lock() + defer mu.Unlock() + + bwbs := wrappedBwbsMissingColumns.bwbs + missingColumnsByRoot := wrappedBwbsMissingColumns.missingColumnsByRoot + + for _, index := range indices { + bwbs[index].Columns = append(bwbs[index].Columns, dataColumn) + } + + // Remove the column from the missing columns. + delete(missingColumnsByRoot[blockRoot], dataColumn.ColumnIndex) + if len(missingColumnsByRoot[blockRoot]) == 0 { + delete(missingColumnsByRoot, blockRoot) + } + }() return true } @@ -1089,15 +1225,41 @@ func processDataColumn( // - `missingColumnsByRoot` by removing the fetched data columns. func (f *blocksFetcher) fetchDataColumnFromPeer( ctx context.Context, - bwbs []blocks.BlockWithROBlobs, - missingColumnsByRoot map[[fieldparams.RootLength]byte]map[uint64]bool, + wg *sync.WaitGroup, + identifier int, + wrappedBwbsMissingColumns *bwbsMissingColumns, blocksByRoot map[[fieldparams.RootLength]byte]blocks.ROBlock, indicesByRoot map[[fieldparams.RootLength]byte][]int, peer peer.ID, + peerCustodyColumns map[uint64]bool, request *p2ppb.DataColumnSidecarsByRangeRequest, -) bool { +) { + defer wg.Done() + + // Extract the number of columns. + numberOfColumns := params.BeaconConfig().NumberOfColumns + + requestedColumnsCount := uint64(len(request.Columns)) + var requestedColumnsLog interface{} = "all" + if requestedColumnsCount < numberOfColumns { + requestedColumnsLog = request.Columns + } + + peerCustodyColumnsCount := uint64(len(peerCustodyColumns)) + var peerCustodyColumnsLog interface{} = "all" + if peerCustodyColumnsCount < numberOfColumns { + peerCustodyColumnsLog = uint64MapToSortedSlice(peerCustodyColumns) + } + // Define useful log field. - log := log.WithField("peer", peer) + log := log.WithFields(logrus.Fields{ + "peer": peer, + "identifier": identifier, + "start": request.StartSlot, + "count": request.Count, + "requestedColumns": requestedColumnsLog, + "custodyColumns": peerCustodyColumnsLog, + }) // Wait for peer bandwidth if needed. if err := func() error { @@ -1122,45 +1284,36 @@ func (f *blocksFetcher) fetchDataColumnFromPeer( return nil }(); err != nil { log.WithError(err).Warning("Fetch data columns from peers - could not wait for bandwidth") - return false + return } // Send the request to the peer. - requestStart := time.Now() roDataColumns, err := prysmsync.SendDataColumnsByRangeRequest(ctx, f.clock, f.p2p, peer, f.ctxMap, request) if err != nil { log.WithError(err).Warning("Fetch data columns from peers - could not send data columns by range request") - return false + return } - requestDuration := time.Since(requestStart) - if len(roDataColumns) == 0 { - log.Debug("Fetch data columns from peers - peer did not return any data columns") - return false + log.Debug("Fetch data columns from peers - no data column returned") + return } globalSuccess := false for _, dataColumn := range roDataColumns { - success := processDataColumn(bwbs, missingColumnsByRoot, f.cv, blocksByRoot, indicesByRoot, dataColumn) + success := processDataColumn(wrappedBwbsMissingColumns, f.cv, blocksByRoot, indicesByRoot, dataColumn) if success { globalSuccess = true } } if !globalSuccess { - log.Debug("Fetch data columns from peers - peer did not return any valid data columns") - return false + log.Debug("Fetch data columns from peers - no valid data column returned") + return } - totalDuration := time.Since(requestStart) - log.WithFields(logrus.Fields{ - "reqDuration": requestDuration, - "totalDuration": totalDuration, - }).Debug("Fetch data columns from peers - got some columns") - - return true + log.Debug("Fetch data columns from peers - got some columns") } // requestBlocks is a wrapper for handling BeaconBlocksByRangeRequest requests/streams. diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go index ddc15e4bae0..33f437f5f8e 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go @@ -15,7 +15,6 @@ import ( "github.com/consensys/gnark-crypto/ecc/bls12-381/fr" GoKZG "github.com/crate-crypto/go-kzg-4844" - "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/libp2p/go-libp2p" libp2pcore "github.com/libp2p/go-libp2p/core" @@ -41,7 +40,6 @@ import ( "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" leakybucket "github.com/prysmaticlabs/prysm/v5/container/leaky-bucket" "github.com/prysmaticlabs/prysm/v5/container/slice" - ecdsaprysm "github.com/prysmaticlabs/prysm/v5/crypto/ecdsa" "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v5/network/forks" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" @@ -1328,28 +1326,6 @@ type blockParams struct { hasBlobs bool } -func createPeer(t *testing.T, privateKeyOffset int, custodyCount uint64) (*enr.Record, peer.ID) { - privateKeyBytes := make([]byte, 32) - for i := 0; i < 32; i++ { - privateKeyBytes[i] = byte(privateKeyOffset + i) - } - - unmarshalledPrivateKey, err := crypto.UnmarshalSecp256k1PrivateKey(privateKeyBytes) - require.NoError(t, err) - - privateKey, err := ecdsaprysm.ConvertFromInterfacePrivKey(unmarshalledPrivateKey) - require.NoError(t, err) - - peerID, err := peer.IDFromPrivateKey(unmarshalledPrivateKey) - require.NoError(t, err) - - record := &enr.Record{} - record.Set(peerdas.Csc(custodyCount)) - record.Set(enode.Secp256k1(privateKey.PublicKey)) - - return record, peerID -} - func TestCustodyColumns(t *testing.T) { blocksFetcher := newBlocksFetcher(context.Background(), &blocksFetcherConfig{ p2p: p2ptest.NewTestP2P(t), @@ -1446,7 +1422,7 @@ func createAndConnectPeer( // Get the response to send. items, ok := peerParams.toRespond[reqString] - require.Equal(t, true, ok) + require.Equal(t, true, ok, "no response to send for request %s", reqString) for _, responseParams := range items[countFromRequest[reqString]] { // Get data columns sidecars for this slot. @@ -1707,7 +1683,13 @@ func TestBuildBwbSlices(t *testing.T) { blockRoot := bwb.Block.Root() missingColumnsByRoot[blockRoot] = missingColumns } - bwbSlices, err := buildBwbSlices(bwbs, missingColumnsByRoot) + + wrappedBwbsMissingColumns := &bwbsMissingColumns{ + bwbs: bwbs, + missingColumnsByRoot: missingColumnsByRoot, + } + + bwbSlices, err := buildBwbSlices(wrappedBwbsMissingColumns) require.NoError(t, err) require.Equal(t, true, areBwbSlicesEqual(tt.bwbSlices, bwbSlices)) }) @@ -1718,6 +1700,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { const ( blobsCount = 6 peersHeadSlot = 100 + delay = 0 * time.Second ) testCases := []struct { @@ -1746,11 +1729,15 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { // For the exact same data columns by range request, the peer will respond in the order they are specified. peersParams []peerParams + // The max count of data columns that will be requested in each batch. + batchSize uint64 + // OUTPUTS // ------- // Data columns that should be added to `bwb`. addedRODataColumns [][]int + isError bool }{ { name: "Deneb fork epoch not reached", @@ -1760,7 +1747,9 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { {slot: 2, hasBlobs: true}, // Before deneb fork epoch {slot: 3, hasBlobs: true}, // Before deneb fork epoch }, + batchSize: 32, addedRODataColumns: [][]int{nil, nil, nil}, + isError: false, }, { name: "All blocks are before EIP-7954 fork epoch", @@ -1773,7 +1762,9 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { {slot: 27, hasBlobs: false}, // Before EIP-7954 fork epoch {slot: 28, hasBlobs: false}, // Before EIP-7954 fork epoch }, + batchSize: 32, addedRODataColumns: [][]int{nil, nil, nil, nil}, + isError: false, }, { name: "All blocks with commitments before are EIP-7954 fork epoch", @@ -1787,6 +1778,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { {slot: 32, hasBlobs: false}, {slot: 33, hasBlobs: false}, }, + batchSize: 32, addedRODataColumns: [][]int{nil, nil, nil, nil, nil}, }, { @@ -1808,7 +1800,9 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { nil, {6: true, 38: true, 70: true, 102: true}, }, + batchSize: 32, addedRODataColumns: [][]int{nil, nil, nil, nil, nil}, + isError: false, }, { name: "Some blocks with blobs with missing data columns - one round needed", @@ -1841,21 +1835,8 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { }, peersParams: []peerParams{ { - // This peer custodies all the columns we need but - // will never respond any column. - csc: 128, - toRespond: map[string][][]responseParams{ - (ðpb.DataColumnSidecarsByRangeRequest{ - StartSlot: 33, - Count: 4, - Columns: []uint64{70, 102}, - }).String(): {{}}, - (ðpb.DataColumnSidecarsByRangeRequest{ - StartSlot: 37, - Count: 1, - Columns: []uint64{6, 70}, - }).String(): {{}}, - }, + csc: 0, + toRespond: map[string][][]responseParams{}, }, { csc: 128, @@ -1887,28 +1868,36 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { }, }, { - // This peer custodies all the columns we need but - // will never respond any column. csc: 128, toRespond: map[string][][]responseParams{ (ðpb.DataColumnSidecarsByRangeRequest{ StartSlot: 33, Count: 4, Columns: []uint64{70, 102}, - }).String(): {{}}, + }).String(): { + { + {slot: 33, columnIndex: 70}, + {slot: 33, columnIndex: 102}, + {slot: 34, columnIndex: 70}, + {slot: 34, columnIndex: 102}, + {slot: 36, columnIndex: 70}, + {slot: 36, columnIndex: 102}, + }, + }, (ðpb.DataColumnSidecarsByRangeRequest{ StartSlot: 37, Count: 1, Columns: []uint64{6, 70}, - }).String(): {{}}, + }).String(): { + { + {slot: 37, columnIndex: 6}, + {slot: 37, columnIndex: 70}, + }, + }, }, }, - { - // This peer should not be requested. - csc: 2, - toRespond: map[string][][]responseParams{}, - }, }, + batchSize: 32, addedRODataColumns: [][]int{ nil, // Slot 25 nil, // Slot 27 @@ -1921,6 +1910,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { nil, // Slot 38 nil, // Slot 39 }, + isError: false, }, { name: "Some blocks with blobs with missing data columns - partial responses", @@ -1954,31 +1944,10 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { {slot: 36, columnIndex: 70}, }, }, - (ðpb.DataColumnSidecarsByRangeRequest{ - StartSlot: 33, - Count: 4, - Columns: []uint64{70}, - }).String(): { - { - {slot: 33, columnIndex: 70}, - {slot: 34, columnIndex: 70}, - {slot: 36, columnIndex: 70}, - }, - }, (ðpb.DataColumnSidecarsByRangeRequest{ StartSlot: 33, Count: 4, Columns: []uint64{102}, - }).String(): {{}}, - }, - }, - { - csc: 128, - toRespond: map[string][][]responseParams{ - (ðpb.DataColumnSidecarsByRangeRequest{ - StartSlot: 33, - Count: 4, - Columns: []uint64{70, 102}, }).String(): { { {slot: 33, columnIndex: 102}, @@ -1986,25 +1955,10 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { {slot: 36, columnIndex: 102}, }, }, - (ðpb.DataColumnSidecarsByRangeRequest{ - StartSlot: 33, - Count: 4, - Columns: []uint64{102}, - }).String(): { - { - {slot: 33, columnIndex: 102}, - {slot: 34, columnIndex: 102}, - {slot: 36, columnIndex: 102}, - }, - }, - (ðpb.DataColumnSidecarsByRangeRequest{ - StartSlot: 33, - Count: 4, - Columns: []uint64{70}, - }).String(): {{}}, }, }, }, + batchSize: 32, addedRODataColumns: [][]int{ {70, 102}, // Slot 33 {70, 102}, // Slot 34 @@ -2020,9 +1974,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { blocksParams: []blockParams{ {slot: 38, hasBlobs: true}, }, - storedDataColumns: []map[int]bool{ - {38: true, 102: true}, - }, + storedDataColumns: []map[int]bool{{38: true, 102: true}}, peersParams: []peerParams{ { csc: 128, @@ -2049,42 +2001,115 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { }, }, }, - addedRODataColumns: [][]int{ - {6, 70}, + batchSize: 32, + addedRODataColumns: [][]int{{6, 70}}, + isError: false, + }, + { + name: "Some blocks with blobs with missing data columns - first response is empty", + denebForkEpoch: 0, + eip7954ForkEpoch: 1, + currentSlot: 40, + blocksParams: []blockParams{{slot: 38, hasBlobs: true}}, + storedDataColumns: []map[int]bool{{38: true, 102: true}}, + peersParams: []peerParams{ + { + csc: 128, + toRespond: map[string][][]responseParams{ + (ðpb.DataColumnSidecarsByRangeRequest{ + StartSlot: 38, + Count: 1, + Columns: []uint64{6, 70}, + }).String(): { + {}, + { + {slot: 38, columnIndex: 6}, + {slot: 38, columnIndex: 70}, + }, + }, + }, + }, }, + batchSize: 32, + addedRODataColumns: [][]int{{6, 70}}, + isError: false, }, { - name: "Some blocks with blobs with missing data columns - first response is empty", + name: "Some blocks with blobs with missing data columns - no response at all", + denebForkEpoch: 0, + eip7954ForkEpoch: 1, + currentSlot: 40, + blocksParams: []blockParams{{slot: 38, hasBlobs: true}}, + storedDataColumns: []map[int]bool{{38: true, 102: true}}, + peersParams: []peerParams{ + { + csc: 128, + toRespond: map[string][][]responseParams{ + (ðpb.DataColumnSidecarsByRangeRequest{ + StartSlot: 38, + Count: 1, + Columns: []uint64{6, 70}, + }).String(): {{}, {}, {}, {}, {}, {}, {}, {}, {}, {}}, + }, + }, + }, + batchSize: 32, + addedRODataColumns: [][]int{{}}, + isError: true, + }, + { + name: "Some blocks with blobs with missing data columns - request has to be split", denebForkEpoch: 0, eip7954ForkEpoch: 1, currentSlot: 40, blocksParams: []blockParams{ - {slot: 38, hasBlobs: true}, + {slot: 32, hasBlobs: true}, {slot: 33, hasBlobs: true}, {slot: 34, hasBlobs: true}, {slot: 35, hasBlobs: true}, // 4 + {slot: 36, hasBlobs: true}, {slot: 37, hasBlobs: true}, // 6 }, storedDataColumns: []map[int]bool{ - {38: true, 102: true}, + nil, nil, nil, nil, // 4 + nil, nil, // 6 + }, peersParams: []peerParams{ { csc: 128, toRespond: map[string][][]responseParams{ (ðpb.DataColumnSidecarsByRangeRequest{ - StartSlot: 38, - Count: 1, - Columns: []uint64{6, 70}, + StartSlot: 32, + Count: 4, + Columns: []uint64{6, 38, 70, 102}, }).String(): { - {}, { - {slot: 38, columnIndex: 6}, - {slot: 38, columnIndex: 70}, + {slot: 32, columnIndex: 6}, {slot: 32, columnIndex: 38}, {slot: 32, columnIndex: 70}, {slot: 32, columnIndex: 102}, + {slot: 33, columnIndex: 6}, {slot: 33, columnIndex: 38}, {slot: 33, columnIndex: 70}, {slot: 33, columnIndex: 102}, + {slot: 34, columnIndex: 6}, {slot: 34, columnIndex: 38}, {slot: 34, columnIndex: 70}, {slot: 34, columnIndex: 102}, + {slot: 35, columnIndex: 6}, {slot: 35, columnIndex: 38}, {slot: 35, columnIndex: 70}, {slot: 35, columnIndex: 102}, + }, + }, + (ðpb.DataColumnSidecarsByRangeRequest{ + StartSlot: 36, + Count: 2, + Columns: []uint64{6, 38, 70, 102}, + }).String(): { + { + {slot: 36, columnIndex: 6}, {slot: 36, columnIndex: 38}, {slot: 36, columnIndex: 70}, {slot: 36, columnIndex: 102}, + {slot: 37, columnIndex: 6}, {slot: 37, columnIndex: 38}, {slot: 37, columnIndex: 70}, {slot: 37, columnIndex: 102}, }, }, }, }, }, + batchSize: 4, addedRODataColumns: [][]int{ - {6, 70}, - }, + {6, 38, 70, 102}, // Slot 32 + {6, 38, 70, 102}, // Slot 33 + {6, 38, 70, 102}, // Slot 34 + {6, 38, 70, 102}, // Slot 35 + {6, 38, 70, 102}, // Slot 36 + {6, 38, 70, 102}, // Slot 37 + }, + isError: false, }, } @@ -2222,8 +2247,13 @@ func TestFetchDataColumnsFromPeers(t *testing.T) { }) // Fetch the data columns from the peers. - err = blocksFetcher.fetchDataColumnsFromPeers(ctx, bwb, peersID) - require.NoError(t, err) + err = blocksFetcher.fetchDataColumnsFromPeers(ctx, bwb, peersID, delay, tc.batchSize) + if !tc.isError { + require.NoError(t, err) + } else { + require.NotNil(t, err) + return + } // Check the added RO data columns. for i := range bwb { diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go index 38a48a64219..58e5cc432d5 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go @@ -3,6 +3,8 @@ package initialsync import ( "context" "fmt" + "slices" + "time" "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" @@ -211,6 +213,11 @@ func findForkReqRangeSize() uint64 { // findForkWithPeer loads some blocks from a peer in an attempt to find alternative blocks. func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot primitives.Slot) (*forkData, error) { + const ( + delay = 5 * time.Second + batchSize = 32 + ) + reqCount := findForkReqRangeSize() // Safe-guard, since previous epoch is used when calculating. if uint64(slot) < reqCount { @@ -282,7 +289,7 @@ func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot return nil, errors.Wrap(err, "invalid blocks received in findForkWithPeer") } if coreTime.PeerDASIsActive(block.Block().Slot()) { - if err := f.fetchDataColumnsFromPeers(ctx, bwb, []peer.ID{pid}); err != nil { + if err := f.fetchDataColumnsFromPeers(ctx, bwb, []peer.ID{pid}, delay, batchSize); err != nil { return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findForkWithPeer") } } else { @@ -302,6 +309,10 @@ func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot // findAncestor tries to figure out common ancestor slot that connects a given root to known block. func (f *blocksFetcher) findAncestor(ctx context.Context, pid peer.ID, b interfaces.ReadOnlySignedBeaconBlock) (*forkData, error) { + const ( + delay = 5 * time.Second + batchSize = 32 + ) outBlocks := []interfaces.ReadOnlySignedBeaconBlock{b} for i := uint64(0); i < backtrackingMaxHops; i++ { parentRoot := outBlocks[len(outBlocks)-1].Block().ParentRoot() @@ -312,7 +323,7 @@ func (f *blocksFetcher) findAncestor(ctx context.Context, pid peer.ID, b interfa return nil, errors.Wrap(err, "received invalid blocks in findAncestor") } if coreTime.PeerDASIsActive(b.Block().Slot()) { - if err := f.fetchDataColumnsFromPeers(ctx, bwb, []peer.ID{pid}); err != nil { + if err := f.fetchDataColumnsFromPeers(ctx, bwb, []peer.ID{pid}, delay, batchSize); err != nil { return nil, errors.Wrap(err, "unable to retrieve columns for blocks found in findAncestor") } } else { @@ -371,12 +382,12 @@ func (f *blocksFetcher) calculateHeadAndTargetEpochs() (headEpoch, targetEpoch p return headEpoch, targetEpoch, peers } -// custodyAllNeededColumns filter `inputPeers` that custody all columns in `columns`. -func (f *blocksFetcher) custodyAllNeededColumns(inputPeers map[peer.ID]bool, columns map[uint64]bool) (map[peer.ID]bool, error) { - outputPeers := make(map[peer.ID]bool, len(inputPeers)) +// custodyColumnFromPeer compute all costody columns indexed by peer. +func (f *blocksFetcher) custodyDataColumnsFromPeer(peers map[peer.ID]bool) (map[peer.ID]map[uint64]bool, error) { + peerCount := len(peers) -loop: - for peer := range inputPeers { + custodyDataColumnsByPeer := make(map[peer.ID]map[uint64]bool, peerCount) + for peer := range peers { // Get the node ID from the peer ID. nodeID, err := p2p.ConvertPeerIDToNodeID(peer) if err != nil { @@ -387,59 +398,125 @@ loop: custodyCount := f.p2p.DataColumnsCustodyCountFromRemotePeer(peer) // Get the custody columns from the peer. - remoteCustodyColumns, err := peerdas.CustodyColumns(nodeID, custodyCount) + custodyDataColumns, err := peerdas.CustodyColumns(nodeID, custodyCount) if err != nil { return nil, errors.Wrap(err, "custody columns") } - for column := range columns { - if !remoteCustodyColumns[column] { - continue loop + custodyDataColumnsByPeer[peer] = custodyDataColumns + } + + return custodyDataColumnsByPeer, nil +} + +// uint64MapToSortedSlice produces a sorted uint64 slice from a map. +func uint64MapToSortedSlice(input map[uint64]bool) []uint64 { + output := make([]uint64, 0, len(input)) + for idx := range input { + output = append(output, idx) + } + + slices.Sort[[]uint64](output) + return output +} + +// `filterPeerWhichCustodyAtLeastOneDataColumn` filters peers which custody at least one data column +// specified in `neededDataColumns`. It returns also a list of descriptions for non admissible peers. +func filterPeerWhichCustodyAtLeastOneDataColumn( + neededDataColumns map[uint64]bool, + inputDataColumnsByPeer map[peer.ID]map[uint64]bool, +) (map[peer.ID]map[uint64]bool, []string) { + // Get the count of needed data columns. + neededDataColumnsCount := uint64(len(neededDataColumns)) + + // Create pretty needed data columns for logs. + var neededDataColumnsLog interface{} = "all" + numberOfColumns := params.BeaconConfig().NumberOfColumns + + if neededDataColumnsCount < numberOfColumns { + neededDataColumnsLog = uint64MapToSortedSlice(neededDataColumns) + } + + outputDataColumnsByPeer := make(map[peer.ID]map[uint64]bool, len(inputDataColumnsByPeer)) + descriptions := make([]string, 0) + +outerLoop: + for peer, peerCustodyDataColumns := range inputDataColumnsByPeer { + for neededDataColumn := range neededDataColumns { + if peerCustodyDataColumns[neededDataColumn] { + outputDataColumnsByPeer[peer] = peerCustodyDataColumns + + continue outerLoop } } - outputPeers[peer] = true + peerCustodyColumnsCount := uint64(len(peerCustodyDataColumns)) + var peerCustodyColumnsLog interface{} = "all" + + if peerCustodyColumnsCount < numberOfColumns { + peerCustodyColumnsLog = uint64MapToSortedSlice(peerCustodyDataColumns) + } + + description := fmt.Sprintf( + "peer %s: does not custody any needed column, custody columns: %v, needed columns: %v", + peer, peerCustodyColumnsLog, neededDataColumnsLog, + ) + + descriptions = append(descriptions, description) } - return outputPeers, nil + return outputDataColumnsByPeer, descriptions } -// peersWithSlotAndDataColumns returns a list of peers that should custody all needed data columns for the given slot. -func (f *blocksFetcher) peersWithSlotAndDataColumns( - ctx context.Context, +// admissiblePeersForDataColumn returns a map of peers that: +// - custody at least one column listed in `neededDataColumns`, +// - are synced to `targetSlot`, and +// - have enough bandwidth to serve data columns corresponding to `count` blocks. +// It returns: +// - A map, where the key of the map is the peer, the value is the custody columns of the peer. +// - A map, where the key of the map is the data column, the value is the peer that custody the data column. +// - A slice of descriptions for non admissible peers. +// - An error if any. +func (f *blocksFetcher) admissiblePeersForDataColumn( peers []peer.ID, targetSlot primitives.Slot, - dataColumns map[uint64]bool, + neededDataColumns map[uint64]bool, count uint64, -) ([]peer.ID, []string, error) { - peersCount := len(peers) +) (map[peer.ID]map[uint64]bool, map[uint64][]peer.ID, []string, error) { + // If no peer is specified, get all connected peers. + inputPeers := peers + if inputPeers == nil { + inputPeers = f.p2p.Peers().Connected() + } - // Filter peers based on the percentage of peers to be used in a request. - peers = f.filterPeers(ctx, peers, peersPercentagePerRequestDataColumns) + inputPeerCount := len(inputPeers) + neededDataColumnsCount := uint64(len(neededDataColumns)) - // // Filter peers on bandwidth. - peers = f.hasSufficientBandwidth(peers, count) + // Create description slice for non admissible peers. + descriptions := make([]string, 0, inputPeerCount) - // Select peers which custody ALL wanted columns. - // Basically, it is very unlikely that a non-supernode peer will have custody of all columns. - // TODO: Modify to retrieve data columns from all possible peers. - // TODO: If a peer does respond some of the request columns, do not re-request responded columns. + // Filter peers on bandwidth. + peersWithSufficientBandwidth := f.hasSufficientBandwidth(inputPeers, count) + + // Convert peers with sufficient bandwidth to a map. + peerWithSufficientBandwidthMap := make(map[peer.ID]bool, len(peersWithSufficientBandwidth)) + for _, peer := range peersWithSufficientBandwidth { + peerWithSufficientBandwidthMap[peer] = true + } + + for _, peer := range inputPeers { + if !peerWithSufficientBandwidthMap[peer] { + description := fmt.Sprintf("peer %s: does not have sufficient bandwidth", peer) + descriptions = append(descriptions, description) + } + } // Compute the target epoch from the target slot. targetEpoch := slots.ToEpoch(targetSlot) - peersWithAdmissibleHeadEpoch := make(map[peer.ID]bool, peersCount) - descriptions := make([]string, 0, peersCount) - - // Filter out peers with head epoch lower than our target epoch. - // Technically, we should be able to use the head slot from the peer. - // However, our vision of the head slot of the peer is updated twice per epoch - // via P2P messages. So it is likely that we think the peer is lagging behind - // while it is actually not. - // ==> We use the head epoch as a proxy instead. - // However, if the peer is actually lagging for a few slots, - // we may requests some data columns it doesn't have yet. - for _, peer := range peers { + // Filter peers with head epoch lower than our target epoch. + peersWithAdmissibleHeadEpoch := make(map[peer.ID]bool, inputPeerCount) + for _, peer := range peersWithSufficientBandwidth { peerChainState, err := f.p2p.Peers().ChainState(peer) if err != nil { @@ -457,7 +534,7 @@ func (f *blocksFetcher) peersWithSlotAndDataColumns( peerHeadEpoch := slots.ToEpoch(peerChainState.HeadSlot) if peerHeadEpoch < targetEpoch { - description := fmt.Sprintf("peer %s: head epoch %d < target epoch %d", peer, peerHeadEpoch, targetEpoch) + description := fmt.Sprintf("peer %s: peer head epoch %d < our target epoch %d", peer, peerHeadEpoch, targetEpoch) descriptions = append(descriptions, description) continue } @@ -465,29 +542,112 @@ func (f *blocksFetcher) peersWithSlotAndDataColumns( peersWithAdmissibleHeadEpoch[peer] = true } - // Filter out peers that do not have all the data columns needed. - finalPeers, err := f.custodyAllNeededColumns(peersWithAdmissibleHeadEpoch, dataColumns) + // Compute custody columns for each peer. + dataColumnsByPeerWithAdmissibleHeadEpoch, err := f.custodyDataColumnsFromPeer(peersWithAdmissibleHeadEpoch) if err != nil { - return nil, nil, errors.Wrap(err, "custody all needed columns") + return nil, nil, nil, errors.Wrap(err, "custody columns from peer") } - for peer := range peersWithAdmissibleHeadEpoch { - if _, ok := finalPeers[peer]; !ok { - description := fmt.Sprintf("peer %s: does not custody all needed columns", peer) - descriptions = append(descriptions, description) + // Filter peers which custody at least one needed data column. + dataColumnsByAdmissiblePeer, localDescriptions := filterPeerWhichCustodyAtLeastOneDataColumn(neededDataColumns, dataColumnsByPeerWithAdmissibleHeadEpoch) + descriptions = append(descriptions, localDescriptions...) + + // Compute a map from needed data columns to their peers. + admissiblePeersByDataColumn := make(map[uint64][]peer.ID, neededDataColumnsCount) + for peer, peerCustodyDataColumns := range dataColumnsByAdmissiblePeer { + for dataColumn := range peerCustodyDataColumns { + admissiblePeersByDataColumn[dataColumn] = append(admissiblePeersByDataColumn[dataColumn], peer) + } + } + + return dataColumnsByAdmissiblePeer, admissiblePeersByDataColumn, descriptions, nil +} + +// selectPeersToFetchDataColumnsFrom implements greedy algorithm in order to select peers to fetch data columns from. +// https://en.wikipedia.org/wiki/Set_cover_problem#Greedy_algorithm +func selectPeersToFetchDataColumnsFrom( + neededDataColumns map[uint64]bool, + dataColumnsByPeer map[peer.ID]map[uint64]bool, +) (map[peer.ID][]uint64, error) { + dataColumnsFromSelectedPeers := make(map[peer.ID][]uint64) + + // Filter `dataColumnsByPeer` to only contain needed data columns. + neededDataColumnsByPeer := make(map[peer.ID]map[uint64]bool, len(dataColumnsByPeer)) + for pid, dataColumns := range dataColumnsByPeer { + for dataColumn := range dataColumns { + if neededDataColumns[dataColumn] { + if _, ok := neededDataColumnsByPeer[pid]; !ok { + neededDataColumnsByPeer[pid] = make(map[uint64]bool, len(neededDataColumns)) + } + + neededDataColumnsByPeer[pid][dataColumn] = true + } } } - // Convert the map to a slice. - finalPeersSlice := make([]peer.ID, 0, len(finalPeers)) - for peer := range finalPeers { - finalPeersSlice = append(finalPeersSlice, peer) + for len(neededDataColumns) > 0 { + // Check if at least one peer remains. If not, it means that we don't have enough peers to fetch all needed data columns. + if len(neededDataColumnsByPeer) == 0 { + missingDataColumnsSortedSlice := uint64MapToSortedSlice(neededDataColumns) + return dataColumnsFromSelectedPeers, errors.Errorf("no peer to fetch the following data columns: %v", missingDataColumnsSortedSlice) + } + + // Select the peer that custody the most needed data columns (greedy selection). + var bestPeer peer.ID + for peer, dataColumns := range neededDataColumnsByPeer { + if len(dataColumns) > len(neededDataColumnsByPeer[bestPeer]) { + bestPeer = peer + } + } + + dataColumnsSortedSlice := uint64MapToSortedSlice(neededDataColumnsByPeer[bestPeer]) + dataColumnsFromSelectedPeers[bestPeer] = dataColumnsSortedSlice + + // Remove the selected peer from the list of peers. + delete(neededDataColumnsByPeer, bestPeer) + + // Remove the selected peer's data columns from the list of needed data columns. + for _, dataColumn := range dataColumnsSortedSlice { + delete(neededDataColumns, dataColumn) + } + + // Remove the selected peer's data columns from the list of needed data columns by peer. + for _, dataColumn := range dataColumnsSortedSlice { + for peer, dataColumns := range neededDataColumnsByPeer { + delete(dataColumns, dataColumn) + + if len(dataColumns) == 0 { + delete(neededDataColumnsByPeer, peer) + } + } + } } - // Shuffle the peers. - f.rand.Shuffle(len(finalPeersSlice), func(i, j int) { - finalPeersSlice[i], finalPeersSlice[j] = finalPeersSlice[j], finalPeersSlice[i] - }) + return dataColumnsFromSelectedPeers, nil +} + +// buildDataColumnSidecarsByRangeRequests builds a list of data column sidecars by range requests. +// Each request contains at most `batchSize` items. +func buildDataColumnSidecarsByRangeRequests( + startSlot primitives.Slot, + count uint64, + columns []uint64, + batchSize uint64, +) []*p2ppb.DataColumnSidecarsByRangeRequest { + batches := make([]*p2ppb.DataColumnSidecarsByRangeRequest, 0) + + for i := uint64(0); i < count; i += batchSize { + localStartSlot := startSlot + primitives.Slot(i) + localCount := min(batchSize, uint64(startSlot)+count-uint64(localStartSlot)) + + batch := &p2ppb.DataColumnSidecarsByRangeRequest{ + StartSlot: localStartSlot, + Count: localCount, + Columns: columns, + } + + batches = append(batches, batch) + } - return finalPeersSlice, descriptions, nil + return batches } diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go index 93c2055f3d8..867f6db84af 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go @@ -2,6 +2,7 @@ package initialsync import ( "context" + "errors" "fmt" "sync" "testing" @@ -644,48 +645,210 @@ func TestBlocksFetcher_currentHeadAndTargetEpochs(t *testing.T) { } } -func TestCustodyAllNeededColumns(t *testing.T) { - const dataColumnsCount = 31 +func TestSelectPeersToFetchDataColumnsFrom(t *testing.T) { + testCases := []struct { + name string - p2p := p2pt.NewTestP2P(t) + // Inputs + neededDataColumns map[uint64]bool + dataColumnsByPeer map[peer.ID]map[uint64]bool - dataColumns := make(map[uint64]bool, dataColumnsCount) - for i := range dataColumnsCount { - dataColumns[uint64(i)] = true + // Expected outputs + dataColumnsToFetchByPeer map[peer.ID][]uint64 + err error + }{ + { + name: "no data columns needed", + neededDataColumns: map[uint64]bool{}, + dataColumnsByPeer: map[peer.ID]map[uint64]bool{ + peer.ID("peer1"): {1: true, 2: true}, + peer.ID("peer2"): {3: true, 4: true}, + }, + dataColumnsToFetchByPeer: map[peer.ID][]uint64{}, + err: nil, + }, + { + name: "one peer has all data columns needed", + neededDataColumns: map[uint64]bool{1: true, 3: true, 5: true}, + dataColumnsByPeer: map[peer.ID]map[uint64]bool{ + peer.ID("peer1"): {2: true, 4: true}, + peer.ID("peer2"): {1: true, 3: true, 5: true, 7: true, 9: true}, + peer.ID("peer3"): {6: true}, + }, + dataColumnsToFetchByPeer: map[peer.ID][]uint64{ + peer.ID("peer2"): {1, 3, 5}, + }, + err: nil, + }, + { + name: "multiple peers are needed - 1", + neededDataColumns: map[uint64]bool{1: true, 3: true, 5: true, 7: true, 9: true}, + dataColumnsByPeer: map[peer.ID]map[uint64]bool{ + peer.ID("peer1"): {3: true, 7: true}, + peer.ID("peer2"): {1: true, 3: true, 5: true, 9: true, 10: true}, + peer.ID("peer3"): {6: true, 10: true, 12: true, 14: true, 16: true, 18: true, 20: true}, + peer.ID("peer4"): {9: true}, + }, + dataColumnsToFetchByPeer: map[peer.ID][]uint64{ + peer.ID("peer2"): {1, 3, 5, 9}, + peer.ID("peer1"): {7}, + }, + err: nil, + }, + { + name: "multiple peers are needed - 2", + neededDataColumns: map[uint64]bool{1: true, 3: true, 5: true, 7: true, 9: true}, + dataColumnsByPeer: map[peer.ID]map[uint64]bool{ + peer.ID("peer1"): {9: true, 10: true}, + peer.ID("peer2"): {3: true, 7: true}, + peer.ID("peer3"): {1: true, 5: true}, + }, + dataColumnsToFetchByPeer: map[peer.ID][]uint64{ + peer.ID("peer1"): {9}, + peer.ID("peer2"): {3, 7}, + peer.ID("peer3"): {1, 5}, + }, + err: nil, + }, + { + name: "some columns are not owned by any peer", + neededDataColumns: map[uint64]bool{1: true, 3: true, 5: true, 7: true, 9: true}, + dataColumnsByPeer: map[peer.ID]map[uint64]bool{ + peer.ID("peer1"): {9: true, 10: true}, + peer.ID("peer2"): {2: true, 6: true}, + peer.ID("peer3"): {1: true, 5: true}, + }, + dataColumnsToFetchByPeer: map[peer.ID][]uint64{ + peer.ID("peer1"): {9}, + peer.ID("peer3"): {1, 5}, + }, + err: errors.New("no peer to fetch the following data columns: [3 7]"), + }, } - custodyCounts := [...]uint64{ - 4 * params.BeaconConfig().CustodyRequirement, - 32 * params.BeaconConfig().CustodyRequirement, - 4 * params.BeaconConfig().CustodyRequirement, - 32 * params.BeaconConfig().CustodyRequirement, - } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actual, err := selectPeersToFetchDataColumnsFrom(tc.neededDataColumns, tc.dataColumnsByPeer) - expected := make(map[peer.ID]bool) + if tc.err != nil { + require.Equal(t, tc.err.Error(), err.Error()) + } else { + require.NoError(t, err) + } - peersID := make(map[peer.ID]bool, len(custodyCounts)) - for _, custodyCount := range custodyCounts { - peerRecord, peerID := createPeer(t, len(peersID), custodyCount) - peersID[peerID] = true - p2p.Peers().Add(peerRecord, peerID, nil, network.DirOutbound) - if custodyCount == 32*params.BeaconConfig().CustodyRequirement { - expected[peerID] = true - } + expected := tc.dataColumnsToFetchByPeer + require.Equal(t, len(expected), len(actual)) + + for peerID, expectedDataColumns := range expected { + actualDataColumns, ok := actual[peerID] + require.Equal(t, true, ok) + require.DeepSSZEqual(t, expectedDataColumns, actualDataColumns) + } + }) } - blocksFetcher := newBlocksFetcher( - context.Background(), - &blocksFetcherConfig{ - p2p: p2p, - }, - ) +} - actual, err := blocksFetcher.custodyAllNeededColumns(peersID, dataColumns) - require.NoError(t, err) +func TestBuildDataColumnSidecarsByRangeRequest(t *testing.T) { + const batchSize = 32 + testCases := []struct { + name string + startSlot primitives.Slot + count uint64 + columns []uint64 + expected []*ethpb.DataColumnSidecarsByRangeRequest + }{ + { + name: "one item - 1", + startSlot: 20, + count: 10, + columns: []uint64{1, 2, 3, 4, 5}, + expected: []*ethpb.DataColumnSidecarsByRangeRequest{ + { + StartSlot: 20, + Count: 10, + Columns: []uint64{1, 2, 3, 4, 5}, + }, + }, + }, + { + name: "one item - 2", + startSlot: 20, + count: 32, + columns: []uint64{1, 2, 3, 4, 5}, + expected: []*ethpb.DataColumnSidecarsByRangeRequest{ + { + StartSlot: 20, + Count: 32, + Columns: []uint64{1, 2, 3, 4, 5}, + }, + }, + }, + { + name: "two items - 1", + startSlot: 20, + count: 33, + columns: []uint64{1, 2, 3, 4, 5}, + expected: []*ethpb.DataColumnSidecarsByRangeRequest{ + { + StartSlot: 20, + Count: 32, + Columns: []uint64{1, 2, 3, 4, 5}, + }, + { + StartSlot: 52, + Count: 1, + Columns: []uint64{1, 2, 3, 4, 5}, + }, + }, + }, + { + name: "two items - 2", + startSlot: 20, + count: 64, + columns: []uint64{1, 2, 3, 4, 5}, + expected: []*ethpb.DataColumnSidecarsByRangeRequest{ + { + StartSlot: 20, + Count: 32, + Columns: []uint64{1, 2, 3, 4, 5}, + }, + { + StartSlot: 52, + Count: 32, + Columns: []uint64{1, 2, 3, 4, 5}, + }, + }, + }, + { + name: "three items", + startSlot: 20, + count: 66, + columns: []uint64{1, 2, 3, 4, 5}, + expected: []*ethpb.DataColumnSidecarsByRangeRequest{ + { + StartSlot: 20, + Count: 32, + Columns: []uint64{1, 2, 3, 4, 5}, + }, + { + StartSlot: 52, + Count: 32, + Columns: []uint64{1, 2, 3, 4, 5}, + }, + { + StartSlot: 84, + Count: 2, + Columns: []uint64{1, 2, 3, 4, 5}, + }, + }, + }, + } - require.Equal(t, len(expected), len(actual)) - for peerID := range expected { - _, ok := actual[peerID] - require.Equal(t, true, ok) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actual := buildDataColumnSidecarsByRangeRequests(tc.startSlot, tc.count, tc.columns, batchSize) + require.DeepSSZEqual(t, tc.expected, actual) + }) } } diff --git a/beacon-chain/sync/rpc_data_column_sidecars_by_range.go b/beacon-chain/sync/rpc_data_column_sidecars_by_range.go index d677b6dba6c..d40fb8653e5 100644 --- a/beacon-chain/sync/rpc_data_column_sidecars_by_range.go +++ b/beacon-chain/sync/rpc_data_column_sidecars_by_range.go @@ -19,31 +19,42 @@ import ( "github.com/sirupsen/logrus" ) -func (s *Service) streamDataColumnBatch(ctx context.Context, batch blockBatch, wQuota uint64, wantedIndexes map[uint64]bool, stream libp2pcore.Stream) (uint64, error) { +func (s *Service) streamDataColumnBatch(ctx context.Context, batch blockBatch, wQuota uint64, wantedDataColumnIndices map[uint64]bool, stream libp2pcore.Stream) (uint64, error) { + _, span := trace.StartSpan(ctx, "sync.streamDataColumnBatch") + defer span.End() + // Defensive check to guard against underflow. if wQuota == 0 { return 0, nil } - _, span := trace.StartSpan(ctx, "sync.streamDataColumnBatch") - defer span.End() - for _, b := range batch.canonical() { - root := b.Root() - idxs, err := s.cfg.blobStorage.ColumnIndices(b.Root()) + + for _, block := range batch.canonical() { + // Get the block blockRoot. + blockRoot := block.Root() + + // Retrieve stored data columns indices for this block root. + storedDataColumnsIndices, err := s.cfg.blobStorage.ColumnIndices(blockRoot) + if err != nil { s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) - return wQuota, errors.Wrapf(err, "could not retrieve sidecars for block root %#x", root) + return wQuota, errors.Wrapf(err, "could not retrieve data columns indice for block root %#x", blockRoot) } - for i, l := uint64(0), uint64(len(idxs)); i < l; i++ { - // index not available or unwanted, skip - if !idxs[i] || !wantedIndexes[i] { + + for dataColumnIndex := range wantedDataColumnIndices { + isDataColumnStored := storedDataColumnsIndices[dataColumnIndex] + + // Skip if the data column is not stored. + if !isDataColumnStored { continue } + // We won't check for file not found since the .Indices method should normally prevent that from happening. - sc, err := s.cfg.blobStorage.GetColumn(b.Root(), i) + sc, err := s.cfg.blobStorage.GetColumn(blockRoot, dataColumnIndex) if err != nil { s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) - return wQuota, errors.Wrapf(err, "could not retrieve data column sidecar: index %d, block root %#x", i, root) + return wQuota, errors.Wrapf(err, "could not retrieve data column sidecar: index %d, block root %#x", dataColumnIndex, blockRoot) } + SetStreamWriteDeadline(stream, defaultWriteDuration) if chunkErr := WriteDataColumnSidecarChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), sc); chunkErr != nil { log.WithError(chunkErr).Debug("Could not send a chunked response") @@ -51,24 +62,28 @@ func (s *Service) streamDataColumnBatch(ctx context.Context, batch blockBatch, w tracing.AnnotateError(span, chunkErr) return wQuota, chunkErr } + s.rateLimiter.add(stream, 1) wQuota -= 1 + // Stop streaming results once the quota of writes for the request is consumed. if wQuota == 0 { return 0, nil } } } + return wQuota, nil } // dataColumnSidecarsByRangeRPCHandler looks up the request data columns from the database from a given start slot index func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { - var err error ctx, span := trace.StartSpan(ctx, "sync.DataColumnSidecarsByRangeHandler") defer span.End() + ctx, cancel := context.WithTimeout(ctx, respTimeout) defer cancel() + SetRPCStreamDeadlines(stream) r, ok := msg.(*pb.DataColumnSidecarsByRangeRequest) @@ -93,7 +108,6 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i requestedColumnsCount := uint64(len(requestedColumns)) // Format log fields. - var ( custodyColumnsLog interface{} = "all" requestedColumnsLog interface{} = "all" @@ -121,10 +135,11 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i if err := s.rateLimiter.validateRequest(stream, 1); err != nil { return err } + rp, err := validateDataColumnsByRange(r, s.cfg.chain.CurrentSlot()) if err != nil { s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream) - s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) + s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer) tracing.AnnotateError(span, err) return err } @@ -132,6 +147,7 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i // Ticker to stagger out large requests. ticker := time.NewTicker(time.Second) defer ticker.Stop() + batcher, err := newBlockRangeBatcher(rp, s.cfg.beaconDB, s.rateLimiter, s.cfg.chain.IsCanonical, ticker) if err != nil { log.WithError(err).Info("Error in DataColumnSidecarsByRange batch") @@ -139,8 +155,9 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i tracing.AnnotateError(span, err) return err } + // Derive the wanted columns for the request. - wantedColumns := map[uint64]bool{} + wantedColumns := make(map[uint64]bool, len(r.Columns)) for _, c := range r.Columns { wantedColumns[c] = true } @@ -182,18 +199,19 @@ func columnBatchLimit() uint64 { // TODO: Generalize between data columns and blobs, while the validation parameters used are different they // are the same value in the config. Can this be safely abstracted ? -func validateDataColumnsByRange(r *pb.DataColumnSidecarsByRangeRequest, current primitives.Slot) (rangeParams, error) { +func validateDataColumnsByRange(r *pb.DataColumnSidecarsByRangeRequest, currentSlot primitives.Slot) (rangeParams, error) { if r.Count == 0 { return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "invalid request Count parameter") } + rp := rangeParams{ start: r.StartSlot, size: r.Count, } // Peers may overshoot the current slot when in initial sync, so we don't want to penalize them by treating the // request as an error. So instead we return a set of params that acts as a noop. - if rp.start > current { - return rangeParams{start: current, end: current, size: 0}, nil + if rp.start > currentSlot { + return rangeParams{start: currentSlot, end: currentSlot, size: 0}, nil } var err error @@ -202,10 +220,13 @@ func validateDataColumnsByRange(r *pb.DataColumnSidecarsByRangeRequest, current return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "overflow start + count -1") } - maxRequest := params.MaxRequestBlock(slots.ToEpoch(current)) + // Get current epoch from current slot. + currentEpoch := slots.ToEpoch(currentSlot) + + maxRequest := params.MaxRequestBlock(currentEpoch) // Allow some wiggle room, up to double the MaxRequestBlocks past the current slot, // to give nodes syncing close to the head of the chain some margin for error. - maxStart, err := current.SafeAdd(maxRequest * 2) + maxStart, err := currentSlot.SafeAdd(maxRequest * 2) if err != nil { return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "current + maxRequest * 2 > max uint") } @@ -214,20 +235,23 @@ func validateDataColumnsByRange(r *pb.DataColumnSidecarsByRangeRequest, current // [max(current_epoch - MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS, DENEB_FORK_EPOCH), current_epoch] // where current_epoch is defined by the current wall-clock time, // and clients MUST support serving requests of data columns on this range. - minStartSlot, err := DataColumnsRPCMinValidSlot(current) + minStartSlot, err := DataColumnsRPCMinValidSlot(currentSlot) if err != nil { return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "DataColumnsRPCMinValidSlot error") } + if rp.start > maxStart { return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "start > maxStart") } + if rp.start < minStartSlot { rp.start = minStartSlot } - if rp.end > current { - rp.end = current + if rp.end > currentSlot { + rp.end = currentSlot } + if rp.end < rp.start { rp.end = rp.start } @@ -236,6 +260,7 @@ func validateDataColumnsByRange(r *pb.DataColumnSidecarsByRangeRequest, current if limit > maxRequest { limit = maxRequest } + if rp.size > limit { rp.size = limit }