Skip to content

Commit

Permalink
wip - init-sync skip available blob req
Browse files Browse the repository at this point in the history
  • Loading branch information
kasey committed Apr 1, 2024
1 parent 53fdd2d commit 2e7b7e2
Show file tree
Hide file tree
Showing 10 changed files with 284 additions and 179 deletions.
5 changes: 5 additions & 0 deletions beacon-chain/das/availability.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ func (s *LazilyPersistentStore) IsDataAvailable(ctx context.Context, current pri
entry := s.cache.ensure(key)
defer s.cache.delete(key)
root := b.Root()
sumz, err := s.store.WaitForSummarizer(ctx)
if err == nil {
entry.setDiskSummary(sumz.Summary(root))
}

// Verify we have all the expected sidecars, and fail fast if any are missing or inconsistent.
// We don't try to salvage problematic batches because this indicates a misbehaving peer and we'd rather
// ignore their response and decrease their peer score.
Expand Down
16 changes: 15 additions & 1 deletion beacon-chain/das/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
Expand Down Expand Up @@ -59,7 +60,12 @@ func (c *cache) delete(key cacheKey) {

// cacheEntry holds a fixed-length cache of BlobSidecars.
type cacheEntry struct {
scs [fieldparams.MaxBlobsPerBlock]*blocks.ROBlob
scs [fieldparams.MaxBlobsPerBlock]*blocks.ROBlob
diskSummary filesystem.BlobStorageSummary
}

func (e *cacheEntry) setDiskSummary(sum filesystem.BlobStorageSummary) {
e.diskSummary = sum
}

// stash adds an item to the in-memory cache of BlobSidecars.
Expand All @@ -81,9 +87,17 @@ func (e *cacheEntry) stash(sc *blocks.ROBlob) error {
// the cache do not match those found in the block. If err is nil, then all expected
// commitments were found in the cache and the sidecar slice return value can be used
// to perform a DA check against the cached sidecars.
// filter only returns blobs that need to be checked. Blobs already available on disk will be excluded.
func (e *cacheEntry) filter(root [32]byte, kc safeCommitmentArray) ([]blocks.ROBlob, error) {
if e.diskSummary.AllAvailable(kc.count()) {
return nil, nil
}
scs := make([]blocks.ROBlob, kc.count())
for i := uint64(0); i < fieldparams.MaxBlobsPerBlock; i++ {
// We already have this blob, we don't need to write it or validate it.
if e.diskSummary.HasIndex(i) {
continue
}
if kc[i] == nil {
if e.scs[i] != nil {
return nil, errors.Wrapf(errCommitmentMismatch, "root=%#x, index=%#x, commitment=%#x, no block commitment", root, i, e.scs[i].KzgCommitment)
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/db/filesystem/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ var ErrBlobStorageSummarizerUnavailable = errors.New("BlobStorage not initialize
// BlobStorageSummarizer is not ready immediately on node startup because it needs to sample the blob filesystem to
// determine which blobs are available.
func (bs *BlobStorage) WaitForSummarizer(ctx context.Context) (BlobStorageSummarizer, error) {
if bs.pruner == nil {
if bs == nil || bs.pruner == nil {
return nil, ErrBlobStorageSummarizerUnavailable
}
return bs.pruner.waitForCache(ctx)
Expand Down
13 changes: 13 additions & 0 deletions beacon-chain/db/filesystem/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package filesystem

import (
"sync"
"testing"

fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
Expand Down Expand Up @@ -117,3 +118,15 @@ func (s *blobStorageCache) updateMetrics(delta float64) {
blobDiskCount.Set(s.nBlobs)
blobDiskSize.Set(s.nBlobs * bytesPerSidecar)
}

func NewMockBlobStorageSummarizer(t *testing.T, set map[[32]byte][]int) BlobStorageSummarizer {
c := newBlobStorageCache()
for k, v := range set {
for i := range v {
if err := c.ensure(rootString(k), 0, uint64(v[i])); err != nil {
t.Fatal(err)
}
}
}
return c
}
4 changes: 2 additions & 2 deletions beacon-chain/db/filesystem/ephemeral.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// improving test performance and simplifying cleanup.
func NewEphemeralBlobStorage(t testing.TB) *BlobStorage {
fs := afero.NewMemMapFs()
pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest)
pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest, withWarmedCache())
if err != nil {
t.Fatal("test setup issue", err)
}
Expand All @@ -23,7 +23,7 @@ func NewEphemeralBlobStorage(t testing.TB) *BlobStorage {
// in order to interact with it outside the parameters of the BlobStorage api.
func NewEphemeralBlobStorageWithFs(t testing.TB) (afero.Fs, *BlobStorage, error) {
fs := afero.NewMemMapFs()
pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest)
pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest, withWarmedCache())
if err != nil {
t.Fatal("test setup issue", err)
}
Expand Down
34 changes: 27 additions & 7 deletions beacon-chain/db/filesystem/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,32 @@ type blobPruner struct {
prunedBefore atomic.Uint64
windowSize primitives.Slot
cache *blobStorageCache
cacheWarmed chan struct{}
cacheReady chan struct{}
warmed bool
fs afero.Fs
}

func newBlobPruner(fs afero.Fs, retain primitives.Epoch) (*blobPruner, error) {
type prunerOpt func(*blobPruner) error

func withWarmedCache() prunerOpt {
return func(p *blobPruner) error {
return p.warmCache()
}
}

func newBlobPruner(fs afero.Fs, retain primitives.Epoch, opts ...prunerOpt) (*blobPruner, error) {
r, err := slots.EpochStart(retain + retentionBuffer)
if err != nil {
return nil, errors.Wrap(err, "could not set retentionSlots")
}
cw := make(chan struct{})
return &blobPruner{fs: fs, windowSize: r, cache: newBlobStorageCache(), cacheWarmed: cw}, nil
p := &blobPruner{fs: fs, windowSize: r, cache: newBlobStorageCache(), cacheReady: cw}
for _, o := range opts {
if err := o(p); err != nil {
return nil, err
}
}
return p, nil
}

// notify updates the pruner's view of root->blob mappings. This allows the pruner to build a cache
Expand All @@ -57,6 +72,8 @@ func (p *blobPruner) notify(root [32]byte, latest primitives.Slot, idx uint64) e
return nil
}
go func() {
p.Lock()
defer p.Unlock()
if err := p.prune(primitives.Slot(pruned)); err != nil {
log.WithError(err).Errorf("Failed to prune blobs from slot %d", latest)
}
Expand All @@ -74,16 +91,21 @@ func windowMin(latest, offset primitives.Slot) primitives.Slot {
}

func (p *blobPruner) warmCache() error {
p.Lock()
defer p.Unlock()
if err := p.prune(0); err != nil {
return err
}
close(p.cacheWarmed)
if !p.warmed {
p.warmed = true
close(p.cacheReady)
}
return nil
}

func (p *blobPruner) waitForCache(ctx context.Context) (*blobStorageCache, error) {
select {
case <-p.cacheWarmed:
case <-p.cacheReady:
return p.cache, nil
case <-ctx.Done():
return nil, ctx.Err()
Expand All @@ -94,8 +116,6 @@ func (p *blobPruner) waitForCache(ctx context.Context) (*blobStorageCache, error
// It deletes blobs older than currentEpoch - (retentionEpochs+bufferEpochs).
// This is so that we keep a slight buffer and blobs are deleted after n+2 epochs.
func (p *blobPruner) prune(pruneBefore primitives.Slot) error {
p.Lock()
defer p.Unlock()
start := time.Now()
totalPruned, totalErr := 0, 0
// Customize logging/metrics behavior for the initial cache warmup when slot=0.
Expand Down
Loading

0 comments on commit 2e7b7e2

Please sign in to comment.