Skip to content

Commit

Permalink
Initsync skip local blobs (#13827)
Browse files Browse the repository at this point in the history
* wip - init-sync skip available blob req

* satisfy deep source

* gaz

* don't need to sort blobs; simplify blobRequest stack

* wip debug log to watch blob skip behavior

* unit tests for new blob req generator

* refactor to reduce blob req func count

* log when WaitForSummarizer fails

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
  • Loading branch information
2 people authored and nisdas committed Jun 19, 2024
1 parent 7b91d79 commit a0e37c4
Show file tree
Hide file tree
Showing 12 changed files with 460 additions and 254 deletions.
9 changes: 9 additions & 0 deletions beacon-chain/das/availability.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ func (s *LazilyPersistentStore) IsDataAvailable(ctx context.Context, current pri
entry := s.cache.ensure(key)
defer s.cache.delete(key)
root := b.Root()
sumz, err := s.store.WaitForSummarizer(ctx)
if err != nil {
log.WithField("root", fmt.Sprintf("%#x", b.Root())).
WithError(err).
Debug("Failed to receive BlobStorageSummarizer within IsDataAvailable")
} else {
entry.setDiskSummary(sumz.Summary(root))
}

// Verify we have all the expected sidecars, and fail fast if any are missing or inconsistent.
// We don't try to salvage problematic batches because this indicates a misbehaving peer and we'd rather
// ignore their response and decrease their peer score.
Expand Down
16 changes: 15 additions & 1 deletion beacon-chain/das/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
Expand Down Expand Up @@ -59,7 +60,12 @@ func (c *cache) delete(key cacheKey) {

// cacheEntry holds a fixed-length cache of BlobSidecars.
type cacheEntry struct {
scs [fieldparams.MaxBlobsPerBlock]*blocks.ROBlob
scs [fieldparams.MaxBlobsPerBlock]*blocks.ROBlob
diskSummary filesystem.BlobStorageSummary
}

func (e *cacheEntry) setDiskSummary(sum filesystem.BlobStorageSummary) {
e.diskSummary = sum
}

// stash adds an item to the in-memory cache of BlobSidecars.
Expand All @@ -81,9 +87,17 @@ func (e *cacheEntry) stash(sc *blocks.ROBlob) error {
// the cache do not match those found in the block. If err is nil, then all expected
// commitments were found in the cache and the sidecar slice return value can be used
// to perform a DA check against the cached sidecars.
// filter only returns blobs that need to be checked. Blobs already available on disk will be excluded.
func (e *cacheEntry) filter(root [32]byte, kc safeCommitmentArray) ([]blocks.ROBlob, error) {
if e.diskSummary.AllAvailable(kc.count()) {
return nil, nil
}
scs := make([]blocks.ROBlob, kc.count())
for i := uint64(0); i < fieldparams.MaxBlobsPerBlock; i++ {
// We already have this blob, we don't need to write it or validate it.
if e.diskSummary.HasIndex(i) {
continue
}
if kc[i] == nil {
if e.scs[i] != nil {
return nil, errors.Wrapf(errCommitmentMismatch, "root=%#x, index=%#x, commitment=%#x, no block commitment", root, i, e.scs[i].KzgCommitment)
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/db/filesystem/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ go_library(
srcs = [
"blob.go",
"cache.go",
"ephemeral.go",
"log.go",
"metrics.go",
"mock.go",
"pruner.go",
],
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem",
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/db/filesystem/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ var ErrBlobStorageSummarizerUnavailable = errors.New("BlobStorage not initialize
// BlobStorageSummarizer is not ready immediately on node startup because it needs to sample the blob filesystem to
// determine which blobs are available.
func (bs *BlobStorage) WaitForSummarizer(ctx context.Context) (BlobStorageSummarizer, error) {
if bs.pruner == nil {
if bs == nil || bs.pruner == nil {
return nil, ErrBlobStorageSummarizerUnavailable
}
return bs.pruner.waitForCache(ctx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// improving test performance and simplifying cleanup.
func NewEphemeralBlobStorage(t testing.TB) *BlobStorage {
fs := afero.NewMemMapFs()
pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest)
pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest, withWarmedCache())
if err != nil {
t.Fatal("test setup issue", err)
}
Expand All @@ -23,7 +23,7 @@ func NewEphemeralBlobStorage(t testing.TB) *BlobStorage {
// in order to interact with it outside the parameters of the BlobStorage api.
func NewEphemeralBlobStorageWithFs(t testing.TB) (afero.Fs, *BlobStorage, error) {
fs := afero.NewMemMapFs()
pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest)
pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest, withWarmedCache())
if err != nil {
t.Fatal("test setup issue", err)
}
Expand Down Expand Up @@ -61,3 +61,15 @@ func NewEphemeralBlobStorageWithMocker(_ testing.TB) (*BlobMocker, *BlobStorage)
bs := &BlobStorage{fs: fs}
return &BlobMocker{fs: fs, bs: bs}, bs
}

func NewMockBlobStorageSummarizer(t *testing.T, set map[[32]byte][]int) BlobStorageSummarizer {
c := newBlobStorageCache()
for k, v := range set {
for i := range v {
if err := c.ensure(rootString(k), 0, uint64(v[i])); err != nil {
t.Fatal(err)
}
}
}
return c
}
34 changes: 27 additions & 7 deletions beacon-chain/db/filesystem/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,32 @@ type blobPruner struct {
prunedBefore atomic.Uint64
windowSize primitives.Slot
cache *blobStorageCache
cacheWarmed chan struct{}
cacheReady chan struct{}
warmed bool
fs afero.Fs
}

func newBlobPruner(fs afero.Fs, retain primitives.Epoch) (*blobPruner, error) {
type prunerOpt func(*blobPruner) error

func withWarmedCache() prunerOpt {
return func(p *blobPruner) error {
return p.warmCache()
}
}

func newBlobPruner(fs afero.Fs, retain primitives.Epoch, opts ...prunerOpt) (*blobPruner, error) {
r, err := slots.EpochStart(retain + retentionBuffer)
if err != nil {
return nil, errors.Wrap(err, "could not set retentionSlots")
}
cw := make(chan struct{})
return &blobPruner{fs: fs, windowSize: r, cache: newBlobStorageCache(), cacheWarmed: cw}, nil
p := &blobPruner{fs: fs, windowSize: r, cache: newBlobStorageCache(), cacheReady: cw}
for _, o := range opts {
if err := o(p); err != nil {
return nil, err
}
}
return p, nil
}

// notify updates the pruner's view of root->blob mappings. This allows the pruner to build a cache
Expand All @@ -57,6 +72,8 @@ func (p *blobPruner) notify(root [32]byte, latest primitives.Slot, idx uint64) e
return nil
}
go func() {
p.Lock()
defer p.Unlock()
if err := p.prune(primitives.Slot(pruned)); err != nil {
log.WithError(err).Errorf("Failed to prune blobs from slot %d", latest)
}
Expand All @@ -74,16 +91,21 @@ func windowMin(latest, offset primitives.Slot) primitives.Slot {
}

func (p *blobPruner) warmCache() error {
p.Lock()
defer p.Unlock()
if err := p.prune(0); err != nil {
return err
}
close(p.cacheWarmed)
if !p.warmed {
p.warmed = true
close(p.cacheReady)
}
return nil
}

func (p *blobPruner) waitForCache(ctx context.Context) (*blobStorageCache, error) {
select {
case <-p.cacheWarmed:
case <-p.cacheReady:
return p.cache, nil
case <-ctx.Done():
return nil, ctx.Err()
Expand All @@ -94,8 +116,6 @@ func (p *blobPruner) waitForCache(ctx context.Context) (*blobStorageCache, error
// It deletes blobs older than currentEpoch - (retentionEpochs+bufferEpochs).
// This is so that we keep a slight buffer and blobs are deleted after n+2 epochs.
func (p *blobPruner) prune(pruneBefore primitives.Slot) error {
p.Lock()
defer p.Unlock()
start := time.Now()
totalPruned, totalErr := 0, 0
// Customize logging/metrics behavior for the initial cache warmup when slot=0.
Expand Down
3 changes: 1 addition & 2 deletions beacon-chain/sync/initial-sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ go_library(
"//beacon-chain/verification:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/params:go_default_library",
"//consensus-types:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
Expand Down Expand Up @@ -83,10 +82,10 @@ go_test(
"//beacon-chain/p2p/types:go_default_library",
"//beacon-chain/startup:go_default_library",
"//beacon-chain/sync:go_default_library",
"//beacon-chain/sync/verify:go_default_library",
"//beacon-chain/verification:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/features:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
Expand Down
Loading

0 comments on commit a0e37c4

Please sign in to comment.