Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initsync skip local blobs #13827

Merged
merged 10 commits into from
Apr 5, 2024
9 changes: 9 additions & 0 deletions beacon-chain/das/availability.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ func (s *LazilyPersistentStore) IsDataAvailable(ctx context.Context, current pri
entry := s.cache.ensure(key)
defer s.cache.delete(key)
root := b.Root()
sumz, err := s.store.WaitForSummarizer(ctx)
if err != nil {
log.WithField("root", fmt.Sprintf("%#x", b.Root())).
WithError(err).
Debug("Failed to receive BlobStorageSummarizer within IsDataAvailable")
} else {
entry.setDiskSummary(sumz.Summary(root))
}

// Verify we have all the expected sidecars, and fail fast if any are missing or inconsistent.
// We don't try to salvage problematic batches because this indicates a misbehaving peer and we'd rather
// ignore their response and decrease their peer score.
Expand Down
16 changes: 15 additions & 1 deletion beacon-chain/das/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
Expand Down Expand Up @@ -59,7 +60,12 @@ func (c *cache) delete(key cacheKey) {

// cacheEntry holds a fixed-length cache of BlobSidecars.
type cacheEntry struct {
scs [fieldparams.MaxBlobsPerBlock]*blocks.ROBlob
scs [fieldparams.MaxBlobsPerBlock]*blocks.ROBlob
diskSummary filesystem.BlobStorageSummary
}

func (e *cacheEntry) setDiskSummary(sum filesystem.BlobStorageSummary) {
e.diskSummary = sum
}

// stash adds an item to the in-memory cache of BlobSidecars.
Expand All @@ -81,9 +87,17 @@ func (e *cacheEntry) stash(sc *blocks.ROBlob) error {
// the cache do not match those found in the block. If err is nil, then all expected
// commitments were found in the cache and the sidecar slice return value can be used
// to perform a DA check against the cached sidecars.
// filter only returns blobs that need to be checked. Blobs already available on disk will be excluded.
func (e *cacheEntry) filter(root [32]byte, kc safeCommitmentArray) ([]blocks.ROBlob, error) {
if e.diskSummary.AllAvailable(kc.count()) {
return nil, nil
}
scs := make([]blocks.ROBlob, kc.count())
for i := uint64(0); i < fieldparams.MaxBlobsPerBlock; i++ {
// We already have this blob, we don't need to write it or validate it.
if e.diskSummary.HasIndex(i) {
continue
}
if kc[i] == nil {
if e.scs[i] != nil {
return nil, errors.Wrapf(errCommitmentMismatch, "root=%#x, index=%#x, commitment=%#x, no block commitment", root, i, e.scs[i].KzgCommitment)
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/db/filesystem/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ go_library(
srcs = [
"blob.go",
"cache.go",
"ephemeral.go",
"log.go",
"metrics.go",
"mock.go",
"pruner.go",
],
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem",
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/db/filesystem/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ var ErrBlobStorageSummarizerUnavailable = errors.New("BlobStorage not initialize
// BlobStorageSummarizer is not ready immediately on node startup because it needs to sample the blob filesystem to
// determine which blobs are available.
func (bs *BlobStorage) WaitForSummarizer(ctx context.Context) (BlobStorageSummarizer, error) {
if bs.pruner == nil {
if bs == nil || bs.pruner == nil {
return nil, ErrBlobStorageSummarizerUnavailable
}
return bs.pruner.waitForCache(ctx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// improving test performance and simplifying cleanup.
func NewEphemeralBlobStorage(t testing.TB) *BlobStorage {
fs := afero.NewMemMapFs()
pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest)
pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest, withWarmedCache())
if err != nil {
t.Fatal("test setup issue", err)
}
Expand All @@ -23,7 +23,7 @@ func NewEphemeralBlobStorage(t testing.TB) *BlobStorage {
// in order to interact with it outside the parameters of the BlobStorage api.
func NewEphemeralBlobStorageWithFs(t testing.TB) (afero.Fs, *BlobStorage, error) {
fs := afero.NewMemMapFs()
pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest)
pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest, withWarmedCache())
if err != nil {
t.Fatal("test setup issue", err)
}
Expand Down Expand Up @@ -61,3 +61,15 @@ func NewEphemeralBlobStorageWithMocker(_ testing.TB) (*BlobMocker, *BlobStorage)
bs := &BlobStorage{fs: fs}
return &BlobMocker{fs: fs, bs: bs}, bs
}

func NewMockBlobStorageSummarizer(t *testing.T, set map[[32]byte][]int) BlobStorageSummarizer {
c := newBlobStorageCache()
for k, v := range set {
for i := range v {
if err := c.ensure(rootString(k), 0, uint64(v[i])); err != nil {
t.Fatal(err)
}
}
}
return c
}
34 changes: 27 additions & 7 deletions beacon-chain/db/filesystem/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,32 @@ type blobPruner struct {
prunedBefore atomic.Uint64
windowSize primitives.Slot
cache *blobStorageCache
cacheWarmed chan struct{}
cacheReady chan struct{}
warmed bool
fs afero.Fs
}

func newBlobPruner(fs afero.Fs, retain primitives.Epoch) (*blobPruner, error) {
type prunerOpt func(*blobPruner) error

func withWarmedCache() prunerOpt {
return func(p *blobPruner) error {
return p.warmCache()
}
}

func newBlobPruner(fs afero.Fs, retain primitives.Epoch, opts ...prunerOpt) (*blobPruner, error) {
r, err := slots.EpochStart(retain + retentionBuffer)
if err != nil {
return nil, errors.Wrap(err, "could not set retentionSlots")
}
cw := make(chan struct{})
return &blobPruner{fs: fs, windowSize: r, cache: newBlobStorageCache(), cacheWarmed: cw}, nil
p := &blobPruner{fs: fs, windowSize: r, cache: newBlobStorageCache(), cacheReady: cw}
for _, o := range opts {
if err := o(p); err != nil {
return nil, err
}
}
return p, nil
}

// notify updates the pruner's view of root->blob mappings. This allows the pruner to build a cache
Expand All @@ -57,6 +72,8 @@ func (p *blobPruner) notify(root [32]byte, latest primitives.Slot, idx uint64) e
return nil
}
go func() {
p.Lock()
defer p.Unlock()
if err := p.prune(primitives.Slot(pruned)); err != nil {
log.WithError(err).Errorf("Failed to prune blobs from slot %d", latest)
}
Expand All @@ -74,16 +91,21 @@ func windowMin(latest, offset primitives.Slot) primitives.Slot {
}

func (p *blobPruner) warmCache() error {
p.Lock()
defer p.Unlock()
if err := p.prune(0); err != nil {
return err
}
close(p.cacheWarmed)
if !p.warmed {
p.warmed = true
close(p.cacheReady)
}
return nil
}

func (p *blobPruner) waitForCache(ctx context.Context) (*blobStorageCache, error) {
select {
case <-p.cacheWarmed:
case <-p.cacheReady:
return p.cache, nil
case <-ctx.Done():
return nil, ctx.Err()
Expand All @@ -94,8 +116,6 @@ func (p *blobPruner) waitForCache(ctx context.Context) (*blobStorageCache, error
// It deletes blobs older than currentEpoch - (retentionEpochs+bufferEpochs).
// This is so that we keep a slight buffer and blobs are deleted after n+2 epochs.
func (p *blobPruner) prune(pruneBefore primitives.Slot) error {
p.Lock()
defer p.Unlock()
start := time.Now()
totalPruned, totalErr := 0, 0
// Customize logging/metrics behavior for the initial cache warmup when slot=0.
Expand Down
3 changes: 1 addition & 2 deletions beacon-chain/sync/initial-sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ go_library(
"//beacon-chain/verification:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/params:go_default_library",
"//consensus-types:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
Expand Down Expand Up @@ -83,10 +82,10 @@ go_test(
"//beacon-chain/p2p/types:go_default_library",
"//beacon-chain/startup:go_default_library",
"//beacon-chain/sync:go_default_library",
"//beacon-chain/sync/verify:go_default_library",
"//beacon-chain/verification:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/features:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
Expand Down
Loading
Loading