From 9fa115125b570e86cbcec9c44bc287aa865829ca Mon Sep 17 00:00:00 2001 From: Kasey Kirkham Date: Mon, 1 Apr 2024 08:50:55 -0500 Subject: [PATCH] make calling WarmCache safe using withWarmedCache --- beacon-chain/db/filesystem/pruner.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/beacon-chain/db/filesystem/pruner.go b/beacon-chain/db/filesystem/pruner.go index 954b9f53d38d..e42855071c51 100644 --- a/beacon-chain/db/filesystem/pruner.go +++ b/beacon-chain/db/filesystem/pruner.go @@ -33,7 +33,8 @@ type blobPruner struct { prunedBefore atomic.Uint64 windowSize primitives.Slot cache *blobStorageCache - cacheWarmed chan struct{} + cacheReady chan struct{} + warmed bool fs afero.Fs } @@ -51,7 +52,7 @@ func newBlobPruner(fs afero.Fs, retain primitives.Epoch, opts ...prunerOpt) (*bl return nil, errors.Wrap(err, "could not set retentionSlots") } cw := make(chan struct{}) - p := &blobPruner{fs: fs, windowSize: r, cache: newBlobStorageCache(), cacheWarmed: cw} + p := &blobPruner{fs: fs, windowSize: r, cache: newBlobStorageCache(), cacheReady: cw} for _, o := range opts { if err := o(p); err != nil { return nil, err @@ -71,6 +72,8 @@ func (p *blobPruner) notify(root [32]byte, latest primitives.Slot, idx uint64) e return nil } go func() { + p.Lock() + defer p.Unlock() if err := p.prune(primitives.Slot(pruned)); err != nil { log.WithError(err).Errorf("Failed to prune blobs from slot %d", latest) } @@ -88,16 +91,21 @@ func windowMin(latest, offset primitives.Slot) primitives.Slot { } func (p *blobPruner) warmCache() error { + p.Lock() + defer p.Unlock() if err := p.prune(0); err != nil { return err } - close(p.cacheWarmed) + if !p.warmed { + p.warmed = true + close(p.cacheReady) + } return nil } func (p *blobPruner) waitForCache(ctx context.Context) (*blobStorageCache, error) { select { - case <-p.cacheWarmed: + case <-p.cacheReady: return p.cache, nil case <-ctx.Done(): return nil, ctx.Err() @@ -108,8 +116,6 @@ func (p *blobPruner) waitForCache(ctx context.Context) (*blobStorageCache, error // It deletes blobs older than currentEpoch - (retentionEpochs+bufferEpochs). // This is so that we keep a slight buffer and blobs are deleted after n+2 epochs. func (p *blobPruner) prune(pruneBefore primitives.Slot) error { - p.Lock() - defer p.Unlock() start := time.Now() totalPruned, totalErr := 0, 0 // Customize logging/metrics behavior for the initial cache warmup when slot=0.