From 5279dd1644f4cc58827b4d7f4f31beceb00faa57 Mon Sep 17 00:00:00 2001 From: Kasey Kirkham Date: Fri, 26 Jan 2024 16:11:11 -0600 Subject: [PATCH] retrieve and save blobs during backfill --- beacon-chain/node/node.go | 18 +-- beacon-chain/sync/backfill/BUILD.bazel | 10 ++ beacon-chain/sync/backfill/batch.go | 46 ++++++- beacon-chain/sync/backfill/batch_test.go | 21 ++++ beacon-chain/sync/backfill/blobs.go | 134 +++++++++++++++++++++ beacon-chain/sync/backfill/metrics.go | 51 +++++++- beacon-chain/sync/backfill/pool.go | 28 +++-- beacon-chain/sync/backfill/pool_test.go | 17 ++- beacon-chain/sync/backfill/service.go | 98 +++++++++++---- beacon-chain/sync/backfill/service_test.go | 15 ++- beacon-chain/sync/backfill/status.go | 9 +- beacon-chain/sync/backfill/status_test.go | 3 +- beacon-chain/sync/backfill/verify.go | 37 +++++- beacon-chain/sync/backfill/worker.go | 66 ++++++++-- beacon-chain/sync/rpc_send_request.go | 25 ++-- beacon-chain/verification/blob.go | 3 + beacon-chain/verification/initializer.go | 2 +- 17 files changed, 503 insertions(+), 80 deletions(-) create mode 100644 beacon-chain/sync/backfill/batch_test.go create mode 100644 beacon-chain/sync/backfill/blobs.go diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 2e5be1e9294b..8c600dc2bcc6 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -229,14 +229,6 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco if err != nil { return nil, errors.Wrap(err, "backfill status initialization error") } - pa := peers.NewAssigner(beacon.fetchP2P().Peers(), beacon.forkChoicer) - bf, err := backfill.NewService(ctx, bfs, beacon.clockWaiter, beacon.fetchP2P(), pa, beacon.BackfillOpts...) - if err != nil { - return nil, errors.Wrap(err, "error initializing backfill service") - } - if err := beacon.services.RegisterService(bf); err != nil { - return nil, errors.Wrap(err, "error registering backfill service") - } log.Debugln("Starting State Gen") if err := beacon.startStateGen(ctx, bfs, beacon.forkChoicer); err != nil { @@ -251,6 +243,16 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco beacon.verifyInitWaiter = verification.NewInitializerWaiter( beacon.clockWaiter, forkchoice.NewROForkChoice(beacon.forkChoicer), beacon.stateGen) + pa := peers.NewAssigner(beacon.fetchP2P().Peers(), beacon.forkChoicer) + beacon.BackfillOpts = append(beacon.BackfillOpts, backfill.WithVerifierWaiter(beacon.verifyInitWaiter)) + bf, err := backfill.NewService(ctx, bfs, beacon.BlobStorage, beacon.clockWaiter, beacon.fetchP2P(), pa, beacon.BackfillOpts...) + if err != nil { + return nil, errors.Wrap(err, "error initializing backfill service") + } + if err := beacon.services.RegisterService(bf); err != nil { + return nil, errors.Wrap(err, "error registering backfill service") + } + log.Debugln("Registering POW Chain Service") if err := beacon.registerPOWChainService(); err != nil { return nil, err diff --git a/beacon-chain/sync/backfill/BUILD.bazel b/beacon-chain/sync/backfill/BUILD.bazel index 6abde4fa563c..4bf25d24ec5b 100644 --- a/beacon-chain/sync/backfill/BUILD.bazel +++ b/beacon-chain/sync/backfill/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "batch.go", "batcher.go", + "blobs.go", "metrics.go", "pool.go", "service.go", @@ -17,12 +18,15 @@ go_library( deps = [ "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/core/signing:go_default_library", + "//beacon-chain/das:go_default_library", "//beacon-chain/db:go_default_library", + "//beacon-chain/db/filesystem:go_default_library", "//beacon-chain/p2p:go_default_library", "//beacon-chain/p2p/peers:go_default_library", "//beacon-chain/startup:go_default_library", "//beacon-chain/state:go_default_library", "//beacon-chain/sync:go_default_library", + "//beacon-chain/verification:go_default_library", "//config/fieldparams:go_default_library", "//config/params:go_default_library", "//consensus-types/blocks:go_default_library", @@ -34,6 +38,7 @@ go_library( "//proto/dbval:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "//runtime:go_default_library", + "//runtime/version:go_default_library", "//time/slots:go_default_library", "@com_github_libp2p_go_libp2p//core/peer:go_default_library", "@com_github_pkg_errors//:go_default_library", @@ -46,6 +51,7 @@ go_library( go_test( name = "go_default_test", srcs = [ + "batch_test.go", "batcher_test.go", "pool_test.go", "service_test.go", @@ -56,10 +62,14 @@ go_test( deps = [ "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/core/signing:go_default_library", + "//beacon-chain/das:go_default_library", "//beacon-chain/db:go_default_library", + "//beacon-chain/db/filesystem:go_default_library", "//beacon-chain/p2p/testing:go_default_library", "//beacon-chain/startup:go_default_library", "//beacon-chain/state:go_default_library", + "//beacon-chain/sync:go_default_library", + "//beacon-chain/verification:go_default_library", "//config/fieldparams:go_default_library", "//config/params:go_default_library", "//consensus-types/blocks:go_default_library", diff --git a/beacon-chain/sync/backfill/batch.go b/beacon-chain/sync/backfill/batch.go index f6c04a2c756e..8e33f5c61e3f 100644 --- a/beacon-chain/sync/backfill/batch.go +++ b/beacon-chain/sync/backfill/batch.go @@ -2,10 +2,13 @@ package backfill import ( "fmt" + "sort" "time" "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/das" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" log "github.com/sirupsen/logrus" @@ -33,6 +36,8 @@ func (s batchState) String() string { return "import_complete" case batchEndSequence: return "end_sequence" + case batchBlobSync: + return "blob_sync" default: return "unknown" } @@ -43,6 +48,7 @@ const ( batchInit batchSequenced batchErrRetryable + batchBlobSync batchImportable batchImportComplete batchEndSequence @@ -57,10 +63,13 @@ type batch struct { retries int begin primitives.Slot end primitives.Slot // half-open interval, [begin, end), ie >= start, < end. - results VerifiedROBlocks + results verifiedROBlocks err error state batchState - pid peer.ID + busy peer.ID + blockPid peer.ID + blobPid peer.ID + bs *blobSync } func (b batch) logFields() log.Fields { @@ -72,7 +81,9 @@ func (b batch) logFields() log.Fields { "retries": b.retries, "begin": b.begin, "end": b.end, - "pid": b.pid, + "busyPid": b.busy, + "blockPid": b.blockPid, + "blobPid": b.blobPid, } } @@ -101,7 +112,7 @@ func (b batch) ensureParent(expected [32]byte) error { return nil } -func (b batch) request() *eth.BeaconBlocksByRangeRequest { +func (b batch) blockRequest() *eth.BeaconBlocksByRangeRequest { return ð.BeaconBlocksByRangeRequest{ StartSlot: b.begin, Count: uint64(b.end - b.begin), @@ -109,6 +120,13 @@ func (b batch) request() *eth.BeaconBlocksByRangeRequest { } } +func (b batch) blobRequest() *eth.BlobSidecarsByRangeRequest { + return ð.BlobSidecarsByRangeRequest{ + StartSlot: b.begin, + Count: uint64(b.end - b.begin), + } +} + func (b batch) withState(s batchState) batch { if s == batchSequenced { b.scheduled = time.Now() @@ -130,7 +148,7 @@ func (b batch) withState(s batchState) batch { } func (b batch) withPeer(p peer.ID) batch { - b.pid = p + b.blockPid = p backfillBatchTimeWaiting.Observe(float64(time.Since(b.scheduled).Milliseconds())) return b } @@ -139,3 +157,21 @@ func (b batch) withRetryableError(err error) batch { b.err = err return b.withState(batchErrRetryable) } + +func (b batch) blobsNeeded() int { + return b.bs.blobsNeeded() +} + +func (b batch) blobResponseValidator() sync.BlobResponseValidation { + return b.bs.validateNext +} + +func (b batch) availabilityStore() das.AvailabilityStore { + return b.bs.store +} + +func sortBatchDesc(todo []batch) { + sort.Slice(todo, func(i, j int) bool { + return todo[j].end < todo[i].end + }) +} diff --git a/beacon-chain/sync/backfill/batch_test.go b/beacon-chain/sync/backfill/batch_test.go new file mode 100644 index 000000000000..ecfa94f95f8c --- /dev/null +++ b/beacon-chain/sync/backfill/batch_test.go @@ -0,0 +1,21 @@ +package backfill + +import ( + "testing" + + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v4/testing/require" +) + +func TestSortBatchDesc(t *testing.T) { + orderIn := []primitives.Slot{100, 10000, 1} + orderOut := []primitives.Slot{10000, 100, 1} + batches := make([]batch, len(orderIn)) + for i := range orderIn { + batches[i] = batch{end: orderIn[i]} + } + sortBatchDesc(batches) + for i := range orderOut { + require.Equal(t, orderOut[i], batches[i].end) + } +} diff --git a/beacon-chain/sync/backfill/blobs.go b/beacon-chain/sync/backfill/blobs.go new file mode 100644 index 000000000000..1dbd1e734f7f --- /dev/null +++ b/beacon-chain/sync/backfill/blobs.go @@ -0,0 +1,134 @@ +package backfill + +import ( + "context" + + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/das" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" + fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" +) + +var ( + errUnexpectedResponseSize = errors.New("received more blobs than expected for the requested range") + errUnexpectedCommitment = errors.New("BlobSidecar commitment does not match block") + errUnexpectedResponseContent = errors.New("BlobSidecar response does not include expected values in expected order") + errBatchVerifierMismatch = errors.New("the list of blocks passed to the availability check does not match what was verified") +) + +type blobSummary struct { + blockRoot [32]byte + index uint64 + commitment [48]byte + signature [fieldparams.BLSSignatureLength]byte +} + +func newBlobSync(cs, retentionStart primitives.Slot, vbs verifiedROBlocks, nbv verification.NewBlobVerifier, st *filesystem.BlobStorage) (*blobSync, error) { + todo, err := vbs.blobIdents(retentionStart) + if err != nil { + return nil, err + } + bbv := newBlobBatchVerifier(nbv) + as := das.NewLazilyPersistentStore(st, bbv) + return &blobSync{cs: cs, expected: todo, bbv: bbv, store: as}, nil +} + +type blobVerifierMap map[[32]byte][fieldparams.MaxBlobsPerBlock]verification.BlobVerifier + +type blobSync struct { + store das.AvailabilityStore + expected []blobSummary + next int + bbv *blobBatchVerifier + cs primitives.Slot +} + +func (bs *blobSync) blobsNeeded() int { + return len(bs.expected) - bs.next +} + +func (bs *blobSync) validateNext(rb blocks.ROBlob) error { + if bs.next >= len(bs.expected) { + return errUnexpectedResponseSize + } + next := bs.expected[bs.next] + bs.next += 1 + // Get the super cheap verifications out of the way before we init a verifier. + if next.blockRoot != rb.BlockRoot() { + return errors.Wrapf(errUnexpectedResponseContent, "next expected root=%#x, saw=%#x", next.blockRoot, rb.BlockRoot()) + } + if next.index != rb.Index { + return errors.Wrapf(errUnexpectedResponseContent, "next expected root=%#x, saw=%#x for root=%#x", next.index, rb.Index, next.blockRoot) + } + if next.commitment != bytesutil.ToBytes48(rb.KzgCommitment) { + return errors.Wrapf(errUnexpectedResponseContent, "next expected commitment=%#x, saw=%#x for root=%#x", next.commitment, rb.KzgCommitment, rb.BlockRoot()) + } + + if bytesutil.ToBytes96(rb.SignedBlockHeader.Signature) != next.signature { + return verification.ErrInvalidProposerSignature + } + v := bs.bbv.newVerifier(rb) + if err := v.BlobIndexInBounds(); err != nil { + return err + } + v.SatisfyRequirement(verification.RequireValidProposerSignature) + if err := v.SidecarInclusionProven(); err != nil { + return err + } + if err := v.SidecarKzgProofVerified(); err != nil { + return err + } + if err := bs.store.Persist(bs.cs, rb); err != nil { + return err + } + + return nil +} + +func newBlobBatchVerifier(nbv verification.NewBlobVerifier) *blobBatchVerifier { + return &blobBatchVerifier{newBlobVerifier: nbv, verifiers: make(blobVerifierMap)} +} + +type blobBatchVerifier struct { + newBlobVerifier verification.NewBlobVerifier + verifiers blobVerifierMap +} + +func (bbv *blobBatchVerifier) newVerifier(rb blocks.ROBlob) verification.BlobVerifier { + m := bbv.verifiers[rb.BlockRoot()] + m[rb.Index] = bbv.newBlobVerifier(rb, verification.BackfillSidecarRequirements) + bbv.verifiers[rb.BlockRoot()] = m + return m[rb.Index] +} + +func (bbv blobBatchVerifier) VerifiedROBlobs(_ context.Context, blk blocks.ROBlock, _ []blocks.ROBlob) ([]blocks.VerifiedROBlob, error) { + m, ok := bbv.verifiers[blk.Root()] + if !ok { + return nil, errors.Wrapf(verification.ErrMissingVerification, "no record of verifiers for root %#x", blk.Root()) + } + c, err := blk.Block().Body().BlobKzgCommitments() + if err != nil { + return nil, errors.Wrapf(errUnexpectedCommitment, "error reading commitments from block root %#x", blk.Root()) + } + vbs := make([]blocks.VerifiedROBlob, len(c)) + for i := range c { + if m[i] == nil { + return nil, errors.Wrapf(errBatchVerifierMismatch, "do not have verifier for block root %#x idx %d", blk.Root(), i) + } + vb, err := m[i].VerifiedROBlob() + if err != nil { + return nil, err + } + if bytesutil.ToBytes48(vb.KzgCommitment) != bytesutil.ToBytes48(c[i]) { + return nil, errors.Wrapf(errBatchVerifierMismatch, "commitments do not match, verified=%#x da check=%#x for root %#x", vb.KzgCommitment, c[i], vb.BlockRoot()) + } + vbs[i] = vb + } + return vbs, nil +} + +var _ das.BlobBatchVerifier = &blobBatchVerifier{} diff --git a/beacon-chain/sync/backfill/metrics.go b/beacon-chain/sync/backfill/metrics.go index 97ecf4b2ef45..1bfaa7a2b874 100644 --- a/beacon-chain/sync/backfill/metrics.go +++ b/beacon-chain/sync/backfill/metrics.go @@ -3,6 +3,9 @@ package backfill import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" + "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" ) var ( @@ -30,10 +33,28 @@ var ( Help: "Number of backfill batches downloaded and imported.", }, ) - backfillBatchApproximateBytes = promauto.NewCounter( + backfillBlocksApproximateBytes = promauto.NewCounter( prometheus.CounterOpts{ - Name: "backfill_batch_bytes_downloaded", - Help: "Count of bytes downloaded from peers", + Name: "backfill_blocks_bytes_downloaded", + Help: "BeaconBlock bytes downloaded from peers for backfill.", + }, + ) + backfillBlobsApproximateBytes = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "backfill_blobs_bytes_downloaded", + Help: "BlobSidecar bytes downloaded from peers for backfill.", + }, + ) + backfillBlobsDownloadCount = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "backfill_blobs_download_count", + Help: "Number of BlobSidecar values downloaded from peers for backfill.", + }, + ) + backfillBlocksDownloadCount = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "backfill_blocks_download_count", + Help: "Number of BeaconBlock values downloaded from peers for backfill.", }, ) backfillBatchTimeRoundtrip = promauto.NewHistogram( @@ -50,13 +71,20 @@ var ( Buckets: []float64{50, 100, 300, 1000, 2000}, }, ) - backfillBatchTimeDownloading = promauto.NewHistogram( + backfillBatchTimeDownloadingBlocks = promauto.NewHistogram( prometheus.HistogramOpts{ - Name: "backfill_batch_time_download", + Name: "backfill_batch_blocks_time_download", Help: "Time batch spent downloading blocks from peer.", Buckets: []float64{100, 300, 1000, 2000, 4000, 8000}, }, ) + backfillBatchTimeDownloadingBlobs = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "backfill_batch_blobs_time_download", + Help: "Time batch spent downloading blobs from peer.", + Buckets: []float64{100, 300, 1000, 2000, 4000, 8000}, + }, + ) backfillBatchTimeVerifying = promauto.NewHistogram( prometheus.HistogramOpts{ Name: "backfill_batch_time_verify", @@ -65,3 +93,16 @@ var ( }, ) ) + +func blobValidationMetrics(_ blocks.ROBlob) error { + backfillBlobsDownloadCount.Inc() + return nil +} + +func blockValidationMetrics(interfaces.ReadOnlySignedBeaconBlock) error { + backfillBlocksDownloadCount.Inc() + return nil +} + +var _ sync.BlobResponseValidation = blobValidationMetrics +var _ sync.BeaconBlockProcessor = blockValidationMetrics diff --git a/beacon-chain/sync/backfill/pool.go b/beacon-chain/sync/backfill/pool.go index 1024895e7090..e4121458dcf6 100644 --- a/beacon-chain/sync/backfill/pool.go +++ b/beacon-chain/sync/backfill/pool.go @@ -7,15 +7,18 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem" "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/peers" "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" log "github.com/sirupsen/logrus" ) type batchWorkerPool interface { - spawn(ctx context.Context, n int, clock *startup.Clock, a PeerAssigner, v *verifier) + spawn(ctx context.Context, n int, clock *startup.Clock, a PeerAssigner, v *verifier, cm sync.ContextByteVersions, blobVerifier verification.NewBlobVerifier, bfs *filesystem.BlobStorage) todo(b batch) complete() (batch, error) } @@ -24,11 +27,11 @@ type worker interface { run(context.Context) } -type newWorker func(id workerId, in, out chan batch, c *startup.Clock, v *verifier) worker +type newWorker func(id workerId, in, out chan batch, c *startup.Clock, v *verifier, cm sync.ContextByteVersions, nbv verification.NewBlobVerifier, bfs *filesystem.BlobStorage) worker func defaultNewWorker(p p2p.P2P) newWorker { - return func(id workerId, in, out chan batch, c *startup.Clock, v *verifier) worker { - return newP2pWorker(id, p, in, out, c, v) + return func(id workerId, in, out chan batch, c *startup.Clock, v *verifier, cm sync.ContextByteVersions, nbv verification.NewBlobVerifier, bfs *filesystem.BlobStorage) worker { + return newP2pWorker(id, p, in, out, c, v, cm, nbv, bfs) } } @@ -60,11 +63,11 @@ func newP2PBatchWorkerPool(p p2p.P2P, maxBatches int) *p2pBatchWorkerPool { } } -func (p *p2pBatchWorkerPool) spawn(ctx context.Context, n int, c *startup.Clock, a PeerAssigner, v *verifier) { +func (p *p2pBatchWorkerPool) spawn(ctx context.Context, n int, c *startup.Clock, a PeerAssigner, v *verifier, cm sync.ContextByteVersions, nbv verification.NewBlobVerifier, bfs *filesystem.BlobStorage) { p.ctx, p.cancel = context.WithCancel(ctx) go p.batchRouter(a) for i := 0; i < n; i++ { - go p.newWorker(workerId(i), p.toWorkers, p.fromWorkers, c, v).run(p.ctx) + go p.newWorker(workerId(i), p.toWorkers, p.fromWorkers, c, v, cm, nbv, bfs).run(p.ctx) } } @@ -106,14 +109,21 @@ func (p *p2pBatchWorkerPool) batchRouter(pa PeerAssigner) { select { case b := <-p.toRouter: todo = append(todo, b) + // sort batches in descending order so that we'll always process the dependent batches first + sortBatchDesc(todo) case <-rt.C: // Worker assignments can fail if assignBatch can't find a suitable peer. // This ticker exists to periodically break out of the channel select // to retry failed assignments. case b := <-p.fromWorkers: - pid := b.pid + pid := b.busy busy[pid] = false - p.fromRouter <- b + if b.state == batchBlobSync { + todo = append(todo, b) + sortBatchDesc(todo) + } else { + p.fromRouter <- b + } case <-p.ctx.Done(): log.WithError(p.ctx.Err()).Info("p2pBatchWorkerPool context canceled, shutting down") p.shutdown(p.ctx.Err()) @@ -135,7 +145,7 @@ func (p *p2pBatchWorkerPool) batchRouter(pa PeerAssigner) { } for _, pid := range assigned { busy[pid] = true - todo[0].pid = pid + todo[0].busy = pid p.toWorkers <- todo[0].withPeer(pid) if todo[0].begin < earliest { earliest = todo[0].begin diff --git a/beacon-chain/sync/backfill/pool_test.go b/beacon-chain/sync/backfill/pool_test.go index efcfef2bc80e..bd20b570111a 100644 --- a/beacon-chain/sync/backfill/pool_test.go +++ b/beacon-chain/sync/backfill/pool_test.go @@ -6,8 +6,13 @@ import ( "time" "github.com/libp2p/go-libp2p/core/peer" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem" p2ptest "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing" "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v4/testing/require" "github.com/prysmaticlabs/prysm/v4/testing/util" ) @@ -28,6 +33,10 @@ func (m mockAssigner) Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error) { var _ PeerAssigner = &mockAssigner{} +func mockNewBlobVerifier(_ blocks.ROBlob, _ []verification.Requirement) verification.BlobVerifier { + return &verification.MockBlobVerifier{} +} + func TestPoolDetectAllEnded(t *testing.T) { nw := 5 p2p := p2ptest.NewTestP2P(t) @@ -40,7 +49,11 @@ func TestPoolDetectAllEnded(t *testing.T) { require.NoError(t, err) v, err := newBackfillVerifier(st.GenesisValidatorsRoot(), keys) require.NoError(t, err) - pool.spawn(ctx, nw, startup.NewClock(time.Now(), [32]byte{}), ma, v) + + ctxMap, err := sync.ContextByteVersionsForValRoot(bytesutil.ToBytes32(st.GenesisValidatorsRoot())) + require.NoError(t, err) + bfs := filesystem.NewEphemeralBlobStorage(t) + pool.spawn(ctx, nw, startup.NewClock(time.Now(), [32]byte{}), ma, v, ctxMap, mockNewBlobVerifier, bfs) br := batcher{min: 10, size: 10} endSeq := br.before(0) require.Equal(t, batchEndSequence, endSeq.state) @@ -59,7 +72,7 @@ type mockPool struct { todoChan chan batch } -func (m *mockPool) spawn(_ context.Context, _ int, _ *startup.Clock, _ PeerAssigner, _ *verifier) { +func (m *mockPool) spawn(_ context.Context, _ int, _ *startup.Clock, _ PeerAssigner, _ *verifier, _ sync.ContextByteVersions, _ verification.NewBlobVerifier, _ *filesystem.BlobStorage) { } func (m *mockPool) todo(b batch) { diff --git a/beacon-chain/sync/backfill/service.go b/beacon-chain/sync/backfill/service.go index 41ed8a1a878f..476a1171a360 100644 --- a/beacon-chain/sync/backfill/service.go +++ b/beacon-chain/sync/backfill/service.go @@ -6,8 +6,12 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem" "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v4/proto/dbval" @@ -17,19 +21,24 @@ import ( ) type Service struct { - ctx context.Context - store *Store - ms minimumSlotter - cw startup.ClockWaiter - enabled bool // service is disabled by default while feature is experimental - nWorkers int - batchSeq *batchSequencer - batchSize uint64 - pool batchWorkerPool - verifier *verifier - p2p p2p.P2P - pa PeerAssigner - batchImporter batchImporter + ctx context.Context + enabled bool // service is disabled by default while feature is experimental + clock *startup.Clock + store *Store + ms minimumSlotter + cw startup.ClockWaiter + verifierWaiter InitializerWaiter + newBlobVerifier verification.NewBlobVerifier + nWorkers int + batchSeq *batchSequencer + batchSize uint64 + pool batchWorkerPool + verifier *verifier + ctxMap sync.ContextByteVersions + p2p p2p.P2P + pa PeerAssigner + batchImporter batchImporter + blobStore *filesystem.BlobStorage } var _ runtime.Service = (*Service)(nil) @@ -57,6 +66,22 @@ func WithBatchSize(n uint64) ServiceOption { } } +// InitializerWaiter is an interface that is satisfied by verification.InitializerWaiter. +// Using this interface enables node init to satisfy this requirement for the backfill service +// while also allowing backfill to mock it in tests. +type InitializerWaiter interface { + WaitForInitializer(ctx context.Context) (*verification.Initializer, error) +} + +// WithVerifierWaiter sets the verification.InitializerWaiter +// for the backfill Service. +func WithVerifierWaiter(viw InitializerWaiter) ServiceOption { + return func(s *Service) error { + s.verifierWaiter = viw + return nil + } +} + type minimumSlotter interface { minimumSlot() primitives.Slot setClock(*startup.Clock) @@ -86,9 +111,9 @@ func (d defaultMinimumSlotter) setClock(c *startup.Clock) { var _ minimumSlotter = &defaultMinimumSlotter{} -type batchImporter func(ctx context.Context, b batch, su *Store) (*dbval.BackfillStatus, error) +type batchImporter func(ctx context.Context, current primitives.Slot, b batch, su *Store) (*dbval.BackfillStatus, error) -func defaultBatchImporter(ctx context.Context, b batch, su *Store) (*dbval.BackfillStatus, error) { +func defaultBatchImporter(ctx context.Context, current primitives.Slot, b batch, su *Store) (*dbval.BackfillStatus, error) { status := su.status() if err := b.ensureParent(bytesutil.ToBytes32(status.LowParentRoot)); err != nil { return status, err @@ -96,7 +121,7 @@ func defaultBatchImporter(ctx context.Context, b batch, su *Store) (*dbval.Backf // Import blocks to db and update db state to reflect the newly imported blocks. // Other parts of the beacon node may use the same StatusUpdater instance // via the coverage.AvailableBlocker interface to safely determine if a given slot has been backfilled. - status, err := su.fillBack(ctx, b.results) + status, err := su.fillBack(ctx, current, b.results, b.availabilityStore()) if err != nil { log.WithError(err).Fatal("Non-recoverable db error in backfill service, quitting.") } @@ -104,7 +129,7 @@ func defaultBatchImporter(ctx context.Context, b batch, su *Store) (*dbval.Backf } // PeerAssigner describes a type that provides an Assign method, which can assign the best peer -// to service an RPC request. The Assign method takes a map of peers that should be excluded, +// to service an RPC blockRequest. The Assign method takes a map of peers that should be excluded, // allowing the caller to avoid making multiple concurrent requests to the same peer. type PeerAssigner interface { Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error) @@ -112,10 +137,11 @@ type PeerAssigner interface { // NewService initializes the backfill Service. Like all implementations of the Service interface, // the service won't begin its runloop until Start() is called. -func NewService(ctx context.Context, su *Store, cw startup.ClockWaiter, p p2p.P2P, pa PeerAssigner, opts ...ServiceOption) (*Service, error) { +func NewService(ctx context.Context, su *Store, bStore *filesystem.BlobStorage, cw startup.ClockWaiter, p p2p.P2P, pa PeerAssigner, opts ...ServiceOption) (*Service, error) { s := &Service{ ctx: ctx, store: su, + blobStore: bStore, cw: cw, ms: &defaultMinimumSlotter{cw: cw, ctx: ctx}, p2p: p, @@ -132,16 +158,22 @@ func NewService(ctx context.Context, su *Store, cw startup.ClockWaiter, p p2p.P2 return s, nil } -func (s *Service) initVerifier(ctx context.Context) (*verifier, error) { +func (s *Service) initVerifier(ctx context.Context) (*verifier, sync.ContextByteVersions, error) { cps, err := s.store.originState(ctx) if err != nil { - return nil, err + return nil, nil, err } keys, err := cps.PublicKeys() if err != nil { - return nil, errors.Wrap(err, "Unable to retrieve public keys for all validators in the origin state") + return nil, nil, errors.Wrap(err, "Unable to retrieve public keys for all validators in the origin state") } - return newBackfillVerifier(cps.GenesisValidatorsRoot(), keys) + vr := cps.GenesisValidatorsRoot() + ctxMap, err := sync.ContextByteVersionsForValRoot(bytesutil.ToBytes32(vr)) + if err != nil { + return nil, nil, errors.Wrapf(err, "unable to initialize context version map using genesis validator root = %#x", vr) + } + v, err := newBackfillVerifier(vr, keys) + return v, ctxMap, err } func (s *Service) updateComplete() bool { @@ -166,12 +198,13 @@ func (s *Service) importBatches(ctx context.Context) { } backfillBatchesImported.Add(float64(imported)) }() + current := s.clock.CurrentSlot() for i := range importable { ib := importable[i] if len(ib.results) == 0 { log.WithFields(ib.logFields()).Error("Batch with no results, skipping importer.") } - _, err := s.batchImporter(ctx, ib, s.store) + _, err := s.batchImporter(ctx, current, ib, s.store) if err != nil { log.WithError(err).WithFields(ib.logFields()).Debug("Backfill batch failed to import.") s.downscore(ib) @@ -225,7 +258,14 @@ func (s *Service) Start() { if err != nil { log.WithError(err).Fatal("Backfill service failed to start while waiting for genesis data.") } + s.clock = clock s.ms.setClock(clock) + v, err := s.verifierWaiter.WaitForInitializer(ctx) + s.newBlobVerifier = newBlobVerifierFromInitializer(v) + + if err != nil { + log.WithError(err).Fatal("could not initialize blob verifier in backfill service") + } if s.store.isGenesisSync() { log.Info("Exiting backfill service as the node has been initialized with a genesis state or the backfill status is missing") @@ -239,11 +279,11 @@ func (s *Service) Start() { Info("Exiting backfill service; minimum block retention slot > lowest backfilled block.") return } - s.verifier, err = s.initVerifier(ctx) + s.verifier, s.ctxMap, err = s.initVerifier(ctx) if err != nil { log.WithError(err).Fatal("Unable to initialize backfill verifier, quitting.") } - s.pool.spawn(ctx, s.nWorkers, clock, s.pa, s.verifier) + s.pool.spawn(ctx, s.nWorkers, clock, s.pa, s.verifier, s.ctxMap, s.newBlobVerifier, s.blobStore) s.batchSeq = newBatchSequencer(s.nWorkers, s.ms.minimumSlot(), primitives.Slot(status.LowSlot), primitives.Slot(s.batchSize)) if err = s.initBatches(); err != nil { @@ -278,7 +318,7 @@ func (s *Service) initBatches() error { } func (s *Service) downscore(b batch) { - s.p2p.Peers().Scorers().BadResponsesScorer().Increment(b.pid) + s.p2p.Peers().Scorers().BadResponsesScorer().Increment(b.blockPid) } func (s *Service) Stop() error { @@ -304,3 +344,9 @@ func minimumBackfillSlot(current primitives.Slot) primitives.Slot { } return current - offset } + +func newBlobVerifierFromInitializer(ini *verification.Initializer) verification.NewBlobVerifier { + return func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier { + return ini.NewBlobVerifier(b, reqs) + } +} diff --git a/beacon-chain/sync/backfill/service_test.go b/beacon-chain/sync/backfill/service_test.go index c723f28e1313..f530d2f09dff 100644 --- a/beacon-chain/sync/backfill/service_test.go +++ b/beacon-chain/sync/backfill/service_test.go @@ -6,9 +6,11 @@ import ( "time" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem" p2ptest "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing" "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/proto/dbval" @@ -29,6 +31,13 @@ func (m mockMinimumSlotter) minimumSlot() primitives.Slot { func (m mockMinimumSlotter) setClock(*startup.Clock) { } +type mockInitalizerWaiter struct { +} + +func (mi *mockInitalizerWaiter) WaitForInitializer(ctx context.Context) (*verification.Initializer, error) { + return &verification.Initializer{}, nil +} + func TestServiceInit(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*300) defer cancel() @@ -52,11 +61,13 @@ func TestServiceInit(t *testing.T) { require.NoError(t, cw.SetClock(startup.NewClock(time.Now(), [32]byte{}))) pool := &mockPool{todoChan: make(chan batch, nWorkers), finishedChan: make(chan batch, nWorkers)} p2pt := p2ptest.NewTestP2P(t) - srv, err := NewService(ctx, su, cw, p2pt, &mockAssigner{}, WithBatchSize(batchSize), WithWorkerCount(nWorkers), WithEnableBackfill(true)) + bfs := filesystem.NewEphemeralBlobStorage(t) + srv, err := NewService(ctx, su, bfs, cw, p2pt, &mockAssigner{}, + WithBatchSize(batchSize), WithWorkerCount(nWorkers), WithEnableBackfill(true), WithVerifierWaiter(&mockInitalizerWaiter{})) require.NoError(t, err) srv.ms = mockMinimumSlotter{min: primitives.Slot(high - batchSize*uint64(nBatches))} srv.pool = pool - srv.batchImporter = func(context.Context, batch, *Store) (*dbval.BackfillStatus, error) { + srv.batchImporter = func(context.Context, primitives.Slot, batch, *Store) (*dbval.BackfillStatus, error) { return &dbval.BackfillStatus{}, nil } go srv.Start() diff --git a/beacon-chain/sync/backfill/status.go b/beacon-chain/sync/backfill/status.go index 2ec809caae58..71460c41a7d6 100644 --- a/beacon-chain/sync/backfill/status.go +++ b/beacon-chain/sync/backfill/status.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/das" "github.com/prysmaticlabs/prysm/v4/beacon-chain/db" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state" "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" @@ -73,7 +74,7 @@ func (s *Store) status() *dbval.BackfillStatus { // fillBack saves the slice of blocks and updates the BackfillStatus LowSlot/Root/ParentRoot tracker to the values // from the first block in the slice. This method assumes that the block slice has been fully validated and // sorted in slot order by the calling function. -func (s *Store) fillBack(ctx context.Context, blocks []blocks.ROBlock) (*dbval.BackfillStatus, error) { +func (s *Store) fillBack(ctx context.Context, current primitives.Slot, blocks []blocks.ROBlock, store das.AvailabilityStore) (*dbval.BackfillStatus, error) { status := s.status() if len(blocks) == 0 { return status, nil @@ -87,6 +88,12 @@ func (s *Store) fillBack(ctx context.Context, blocks []blocks.ROBlock) (*dbval.B status.LowParentRoot, highest.Root(), status.LowSlot, highest.Block().Slot()) } + for i := range blocks { + if err := store.IsDataAvailable(ctx, current, blocks[i]); err != nil { + return nil, err + } + } + if err := s.store.SaveROBlocks(ctx, blocks, false); err != nil { return nil, errors.Wrapf(err, "error saving backfill blocks") } diff --git a/beacon-chain/sync/backfill/status_test.go b/beacon-chain/sync/backfill/status_test.go index 0ee020827557..7e7df64442a1 100644 --- a/beacon-chain/sync/backfill/status_test.go +++ b/beacon-chain/sync/backfill/status_test.go @@ -5,6 +5,7 @@ import ( "context" "testing" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/das" "github.com/prysmaticlabs/prysm/v4/beacon-chain/db" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state" "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" @@ -138,7 +139,7 @@ func TestStatusUpdater_FillBack(t *testing.T) { require.NoError(t, err) s := &Store{bs: &dbval.BackfillStatus{LowSlot: 100, LowParentRoot: rob.RootSlice()}, store: mdb} require.Equal(t, false, s.AvailableBlock(95)) - _, err = s.fillBack(ctx, []blocks.ROBlock{rob}) + _, err = s.fillBack(ctx, 0, []blocks.ROBlock{rob}, &das.MockAvailabilityStore{}) require.NoError(t, err) require.Equal(t, true, s.AvailableBlock(95)) } diff --git a/beacon-chain/sync/backfill/verify.go b/beacon-chain/sync/backfill/verify.go index 1d3f78ef856e..2e7bc42a9212 100644 --- a/beacon-chain/sync/backfill/verify.go +++ b/beacon-chain/sync/backfill/verify.go @@ -9,7 +9,9 @@ import ( "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/crypto/bls" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v4/network/forks" + "github.com/prysmaticlabs/prysm/v4/runtime/version" "github.com/prysmaticlabs/prysm/v4/time/slots" ) @@ -17,8 +19,37 @@ var errInvalidBatchChain = errors.New("parent_root of block does not match the p var errProposerIndexTooHigh = errors.New("proposer index not present in origin state") var errUnknownDomain = errors.New("runtime error looking up signing domain for fork") -// VerifiedROBlocks represents a slice of blocks that have passed signature verification. -type VerifiedROBlocks []blocks.ROBlock +// verifiedROBlocks represents a slice of blocks that have passed signature verification. +type verifiedROBlocks []blocks.ROBlock + +func (v verifiedROBlocks) blobIdents(retentionStart primitives.Slot) ([]blobSummary, error) { + // early return if the newest block is outside the retention window + if len(v) > 0 && v[len(v)-1].Block().Slot() < retentionStart { + return nil, nil + } + bs := make([]blobSummary, 0) + for i := range v { + if v[i].Block().Slot() < retentionStart { + continue + } + if v[i].Block().Version() < version.Deneb { + continue + } + c, err := v[i].Block().Body().BlobKzgCommitments() + if err != nil { + return nil, errors.Wrapf(err, "unexpected error checking commitments for block root %#x", v[i].Root()) + } + if len(c) == 0 { + continue + } + for ci := range c { + bs = append(bs, blobSummary{ + blockRoot: v[i].Root(), signature: v[i].Signature(), + index: uint64(ci), commitment: bytesutil.ToBytes48(c[ci])}) + } + } + return bs, nil +} type verifier struct { keys [][fieldparams.BLSPubkeyLength]byte @@ -27,7 +58,7 @@ type verifier struct { } // TODO: rewrite this to use ROBlock. -func (vr verifier) verify(blks []interfaces.ReadOnlySignedBeaconBlock) (VerifiedROBlocks, error) { +func (vr verifier) verify(blks []interfaces.ReadOnlySignedBeaconBlock) (verifiedROBlocks, error) { var err error result := make([]blocks.ROBlock, len(blks)) sigSet := bls.NewSet() diff --git a/beacon-chain/sync/backfill/worker.go b/beacon-chain/sync/backfill/worker.go index 954575c37895..44caadb54083 100644 --- a/beacon-chain/sync/backfill/worker.go +++ b/beacon-chain/sync/backfill/worker.go @@ -4,9 +4,12 @@ import ( "context" "time" + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem" "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" log "github.com/sirupsen/logrus" ) @@ -19,6 +22,9 @@ type p2pWorker struct { p2p p2p.P2P v *verifier c *startup.Clock + cm sync.ContextByteVersions + nbv verification.NewBlobVerifier + bfs *filesystem.BlobStorage } func (w *p2pWorker) run(ctx context.Context) { @@ -26,7 +32,11 @@ func (w *p2pWorker) run(ctx context.Context) { select { case b := <-w.todo: log.WithFields(b.logFields()).WithField("backfill_worker", w.id).Debug("Backfill worker received batch.") - w.done <- w.handle(ctx, b) + if b.state == batchBlobSync { + w.done <- w.handleBlobs(ctx, b) + } else { + w.done <- w.handleBlocks(ctx, b) + } case <-ctx.Done(): log.WithField("backfill_worker", w.id).Info("Backfill worker exiting after context canceled.") return @@ -34,11 +44,17 @@ func (w *p2pWorker) run(ctx context.Context) { } } -func (w *p2pWorker) handle(ctx context.Context, b batch) batch { +func (w *p2pWorker) handleBlocks(ctx context.Context, b batch) batch { + cs := w.c.CurrentSlot() + blobRetentionStart, err := sync.BlobsByRangeMinStartSlot(cs) + if err != nil { + return b.withRetryableError(errors.Wrap(err, "configuration issue, could not compute minimum blob retention slot")) + } + b.blockPid = b.busy start := time.Now() - results, err := sync.SendBeaconBlocksByRangeRequest(ctx, w.c, w.p2p, b.pid, b.request(), nil) + results, err := sync.SendBeaconBlocksByRangeRequest(ctx, w.c, w.p2p, b.blockPid, b.blockRequest(), blockValidationMetrics) dlt := time.Now() - backfillBatchTimeDownloading.Observe(float64(dlt.Sub(start).Milliseconds())) + backfillBatchTimeDownloadingBlocks.Observe(float64(dlt.Sub(start).Milliseconds())) if err != nil { log.WithError(err).WithFields(b.logFields()).Debug("Batch requesting failed") return b.withRetryableError(err) @@ -56,13 +72,46 @@ func (w *p2pWorker) handle(ctx context.Context, b batch) batch { for i := range vb { bdl += vb[i].SizeSSZ() } - backfillBatchApproximateBytes.Add(float64(bdl)) - log.WithField("dlbytes", bdl).Debug("backfill batch bytes downloaded") + backfillBlocksApproximateBytes.Add(float64(bdl)) + log.WithFields(b.logFields()).WithField("dlbytes", bdl).Debug("backfill batch block bytes downloaded") b.results = vb + b.bs, err = newBlobSync(cs, blobRetentionStart, b.results, w.nbv, w.bfs) + if err != nil { + return b.withRetryableError(err) + } + if b.blobsNeeded() > 0 { + return b.withState(batchBlobSync) + } + return b.withState(batchImportable) +} + +func (w *p2pWorker) handleBlobs(ctx context.Context, b batch) batch { + b.blobPid = b.busy + start := time.Now() + // we don't need to use the response for anything other than metrics, because blobResponseValidation + // adds each of them to a batch AvailabilityStore once it is checked. + blobs, err := sync.SendBlobsByRangeRequest(ctx, w.c, w.p2p, b.blobPid, w.cm, b.blobRequest(), b.blobResponseValidator(), blobValidationMetrics) + if err != nil { + b.bs = nil + return b.withRetryableError(err) + } + dlt := time.Now() + backfillBatchTimeDownloadingBlobs.Observe(float64(dlt.Sub(start).Milliseconds())) + if len(blobs) > 0 { + // All blobs are the same size, so we can compute 1 and use it for all in the batch. + sz := blobs[0].SizeSSZ() * len(blobs) + backfillBlobsApproximateBytes.Add(float64(sz)) + log.WithFields(b.logFields()).WithField("dlbytes", sz).Debug("backfill batch blob bytes downloaded") + } + if b.blobsNeeded() > 0 { + log.WithFields(b.logFields()).WithField("blobs_missing", b.blobsNeeded()).Error("batch still missing blobs after downloading from peer") + b.bs = nil + return b.withState(batchErrRetryable) + } return b.withState(batchImportable) } -func newP2pWorker(id workerId, p p2p.P2P, todo, done chan batch, c *startup.Clock, v *verifier) *p2pWorker { +func newP2pWorker(id workerId, p p2p.P2P, todo, done chan batch, c *startup.Clock, v *verifier, cm sync.ContextByteVersions, nbv verification.NewBlobVerifier, bfs *filesystem.BlobStorage) *p2pWorker { return &p2pWorker{ id: id, todo: todo, @@ -70,5 +119,8 @@ func newP2pWorker(id workerId, p p2p.P2P, todo, done chan batch, c *startup.Cloc p2p: p, v: v, c: c, + cm: cm, + nbv: nbv, + bfs: bfs, } } diff --git a/beacon-chain/sync/rpc_send_request.go b/beacon-chain/sync/rpc_send_request.go index 9d1e8e33cc65..37e15964e42d 100644 --- a/beacon-chain/sync/rpc_send_request.go +++ b/beacon-chain/sync/rpc_send_request.go @@ -155,7 +155,7 @@ func SendBeaconBlocksByRootRequest( return blocks, nil } -func SendBlobsByRangeRequest(ctx context.Context, tor blockchain.TemporalOracle, p2pApi p2p.SenderEncoder, pid peer.ID, ctxMap ContextByteVersions, req *pb.BlobSidecarsByRangeRequest) ([]blocks.ROBlob, error) { +func SendBlobsByRangeRequest(ctx context.Context, tor blockchain.TemporalOracle, p2pApi p2p.SenderEncoder, pid peer.ID, ctxMap ContextByteVersions, req *pb.BlobSidecarsByRangeRequest, bvs ...BlobResponseValidation) ([]blocks.ROBlob, error) { topic, err := p2p.TopicFromMessage(p2p.BlobSidecarsByRangeName, slots.ToEpoch(tor.CurrentSlot())) if err != nil { return nil, err @@ -175,8 +175,11 @@ func SendBlobsByRangeRequest(ctx context.Context, tor blockchain.TemporalOracle, if max > req.Count*fieldparams.MaxBlobsPerBlock { max = req.Count * fieldparams.MaxBlobsPerBlock } - blobVal := composeBlobValidations(blobValidatorFromRangeReq(req), newSequentialBlobValidator()) - return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobVal, max) + vfuncs := []BlobResponseValidation{blobValidatorFromRangeReq(req), newSequentialBlobValidator()} + if len(bvs) > 0 { + vfuncs = append(vfuncs, bvs...) + } + return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, composeBlobValidations(vfuncs...), max) } func SendBlobSidecarByRoot( @@ -205,9 +208,11 @@ func SendBlobSidecarByRoot( return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobValidatorFromRootReq(req), max) } -type blobResponseValidation func(blocks.ROBlob) error +// BlobResponseValidation represents a function that can validate aspects of a single unmarshaled blob +// that was received from a peer in response to an rpc request. +type BlobResponseValidation func(blocks.ROBlob) error -func composeBlobValidations(vf ...blobResponseValidation) blobResponseValidation { +func composeBlobValidations(vf ...BlobResponseValidation) BlobResponseValidation { return func(blob blocks.ROBlob) error { for i := range vf { if err := vf[i](blob); err != nil { @@ -264,14 +269,14 @@ func (sbv *seqBlobValid) nextValid(blob blocks.ROBlob) error { return nil } -func newSequentialBlobValidator() blobResponseValidation { +func newSequentialBlobValidator() BlobResponseValidation { sbv := &seqBlobValid{} return func(blob blocks.ROBlob) error { return sbv.nextValid(blob) } } -func blobValidatorFromRootReq(req *p2ptypes.BlobSidecarsByRootReq) blobResponseValidation { +func blobValidatorFromRootReq(req *p2ptypes.BlobSidecarsByRootReq) BlobResponseValidation { blobIds := make(map[[32]byte]map[uint64]bool) for _, sc := range *req { blockRoot := bytesutil.ToBytes32(sc.BlockRoot) @@ -293,7 +298,7 @@ func blobValidatorFromRootReq(req *p2ptypes.BlobSidecarsByRootReq) blobResponseV } } -func blobValidatorFromRangeReq(req *pb.BlobSidecarsByRangeRequest) blobResponseValidation { +func blobValidatorFromRangeReq(req *pb.BlobSidecarsByRangeRequest) BlobResponseValidation { end := req.StartSlot + primitives.Slot(req.Count) return func(sc blocks.ROBlob) error { if sc.Slot() < req.StartSlot || sc.Slot() >= end { @@ -303,7 +308,7 @@ func blobValidatorFromRangeReq(req *pb.BlobSidecarsByRangeRequest) blobResponseV } } -func readChunkEncodedBlobs(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf blobResponseValidation, max uint64) ([]blocks.ROBlob, error) { +func readChunkEncodedBlobs(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf BlobResponseValidation, max uint64) ([]blocks.ROBlob, error) { sidecars := make([]blocks.ROBlob, 0) // Attempt an extra read beyond max to check if the peer is violating the spec by // sending more than MAX_REQUEST_BLOB_SIDECARS, or more blobs than requested. @@ -327,7 +332,7 @@ func readChunkEncodedBlobs(stream network.Stream, encoding encoder.NetworkEncodi return sidecars, nil } -func readChunkedBlobSidecar(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf blobResponseValidation) (blocks.ROBlob, error) { +func readChunkedBlobSidecar(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf BlobResponseValidation) (blocks.ROBlob, error) { var b blocks.ROBlob pb := ðpb.BlobSidecar{} decode := encoding.DecodeWithMaxLength diff --git a/beacon-chain/verification/blob.go b/beacon-chain/verification/blob.go index dfd8f5acf48f..bd7a503ea6ac 100644 --- a/beacon-chain/verification/blob.go +++ b/beacon-chain/verification/blob.go @@ -63,6 +63,9 @@ var InitsyncSidecarRequirements = []Requirement{ RequireSidecarInclusionProven, } +// BackfillSidecarRequirements is the same as InitsyncSidecarRequirements +var BackfillSidecarRequirements = InitsyncSidecarRequirements + var ( ErrBlobInvalid = errors.New("blob failed verification") // ErrBlobIndexInvalid means RequireBlobIndexInBounds failed. diff --git a/beacon-chain/verification/initializer.go b/beacon-chain/verification/initializer.go index 7abf842c8bda..2e91c9e3d51e 100644 --- a/beacon-chain/verification/initializer.go +++ b/beacon-chain/verification/initializer.go @@ -40,7 +40,7 @@ type sharedResources struct { // Initializer is used to create different Verifiers. // Verifiers require access to stateful data structures, like caches, -// and it is Initializer's job to provides access to those. +// and it is Initializer's job to provide access to those. type Initializer struct { shared *sharedResources }