Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics for count and size of blob files #13614

Merged
merged 3 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion beacon-chain/db/filesystem/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
return nil
}
if bs.pruner != nil {
bs.pruner.notify(sidecar.BlockRoot(), sidecar.Slot())
if err := bs.pruner.notify(sidecar.BlockRoot(), sidecar.Slot(), sidecar.Index); err != nil {
return errors.Wrapf(err, "problem maintaining pruning cache/metrics for sidecar with root=%#x", sidecar.BlockRoot())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the downstream effect if this returns an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We wouldn't write the sidecar. The only error condition is if the index is out of bounds, which would be a panic. I was on the fence about returning an error here, because we should never have a VerifiedROBlob with an index out of bounds, so this should be logically unreachable code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I meant to say upstream. I think block processing will fail, but that feels sane to me. If we cant write a blob to disk then we shouldn't be allowed to write a block anyway

}
}

// Serialize the ethpb.BlobSidecar to binary data using SSZ.
Expand Down
10 changes: 9 additions & 1 deletion beacon-chain/db/filesystem/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,15 @@ var (
Help: "Number of BlobSidecar files pruned.",
})
blobsWrittenCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "blobs_written",
Name: "blob_written",
Help: "Number of BlobSidecar files written.",
kasey marked this conversation as resolved.
Show resolved Hide resolved
})
blobDiskCount = promauto.NewGauge(prometheus.GaugeOpts{
Name: "blob_disk_count",
Help: "Approximate number of blob files in storage",
})
blobDiskSize = promauto.NewGauge(prometheus.GaugeOpts{
Name: "blob_disk_bytes",
Help: "Approximate number of bytes occupied by blobs in storage",
})
)
80 changes: 70 additions & 10 deletions beacon-chain/db/filesystem/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand All @@ -20,6 +21,7 @@ import (
)

const retentionBuffer primitives.Epoch = 2
const bytesPerSidecar = 131928

var (
errPruningFailures = errors.New("blobs could not be pruned for some roots")
Expand All @@ -43,17 +45,20 @@ func newBlobPruner(fs afero.Fs, retain primitives.Epoch) (*blobPruner, error) {

// notify updates the pruner's view of root->blob mappings. This allows the pruner to build a cache
// of root->slot mappings and decide when to evict old blobs based on the age of present blobs.
func (p *blobPruner) notify(root [32]byte, latest primitives.Slot) {
p.slotMap.ensure(rootString(root), latest)
func (p *blobPruner) notify(root [32]byte, latest primitives.Slot, idx uint64) error {
if err := p.slotMap.ensure(rootString(root), latest, idx); err != nil {
return err
}
pruned := uint64(windowMin(latest, p.windowSize))
if p.prunedBefore.Swap(pruned) == pruned {
return
return nil
}
go func() {
if err := p.prune(primitives.Slot(pruned)); err != nil {
log.WithError(err).Errorf("Failed to prune blobs from slot %d", latest)
}
}()
return nil
}

func windowMin(latest primitives.Slot, offset primitives.Slot) primitives.Slot {
Expand Down Expand Up @@ -140,7 +145,15 @@ func (p *blobPruner) tryPruneDir(dir string, pruneBefore primitives.Slot) (int,
if err != nil {
return 0, errors.Wrapf(err, "slot could not be read from blob file %s", scFiles[0])
}
p.slotMap.ensure(root, slot)
for i := range scFiles {
idx, err := idxFromPath(scFiles[i])
if err != nil {
return 0, errors.Wrapf(err, "index could not be determined for blob file %s", scFiles[i])
}
if err := p.slotMap.ensure(root, slot, idx); err != nil {
return 0, errors.Wrapf(err, "could not update prune cache for blob file %s", scFiles[i])
}
}
if shouldRetain(slot, pruneBefore) {
return 0, nil
}
Expand Down Expand Up @@ -169,6 +182,15 @@ func (p *blobPruner) tryPruneDir(dir string, pruneBefore primitives.Slot) (int,
return len(scFiles), nil
}

func idxFromPath(fname string) (uint64, error) {
fname = path.Base(fname)
parts := strings.Split(fname, ".")
if len(parts) != 2 {
return 0, errors.New("not a blob ssz file")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Someone could in theory have some other file with 2 parts that isn't an ssz. Wouldn't it be safer to check if it has the extension?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called downstream of the ssz filter (scFiles := filter(entries, filterSsz)), but we could double-check it here.

}
return strconv.ParseUint(parts[0], 10, 64)
}

func rootFromDir(dir string) string {
return filepath.Base(dir) // end of the path should be the blob directory, named by hex encoding of root
}
Expand Down Expand Up @@ -245,30 +267,68 @@ func filterPart(s string) bool {

func newSlotForRoot() *slotForRoot {
return &slotForRoot{
cache: make(map[string]primitives.Slot, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest*fieldparams.SlotsPerEpoch),
cache: make(map[string]*slotCacheEntry, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest*fieldparams.SlotsPerEpoch),
}
}

type slotCacheEntry struct {
slot primitives.Slot
mask [fieldparams.MaxBlobsPerBlock]bool
}

type slotForRoot struct {
sync.RWMutex
cache map[string]primitives.Slot
nBlobs float64
cache map[string]*slotCacheEntry
}

func (s *slotForRoot) ensure(key string, slot primitives.Slot) {
func (s *slotForRoot) updateMetrics(delta float64) {
s.nBlobs += delta
blobDiskCount.Set(s.nBlobs)
blobDiskSize.Set(s.nBlobs * bytesPerSidecar)
}

func (s *slotForRoot) ensure(key string, slot primitives.Slot, idx uint64) error {
if idx >= fieldparams.MaxBlobsPerBlock {
return errIndexOutOfBounds
}
s.Lock()
defer s.Unlock()
s.cache[key] = slot
v, ok := s.cache[key]
if !ok {
v = &slotCacheEntry{}
}
v.slot = slot
if !v.mask[idx] {
s.updateMetrics(1)
}
v.mask[idx] = true
s.cache[key] = v
return nil
}

func (s *slotForRoot) slot(key string) (primitives.Slot, bool) {
s.RLock()
defer s.RUnlock()
slot, ok := s.cache[key]
return slot, ok
v, ok := s.cache[key]
if !ok {
return 0, false
}
return v.slot, ok
}

func (s *slotForRoot) evict(key string) {
s.Lock()
defer s.Unlock()
v, ok := s.cache[key]
var deleted float64
if ok {
for i := range v.mask {
if v.mask[i] {
deleted += 1
}
}
s.updateMetrics(-deleted)
}
delete(s.cache, key)
}
4 changes: 2 additions & 2 deletions beacon-chain/db/filesystem/pruner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestTryPruneDir_CachedNotExpired(t *testing.T) {
root := fmt.Sprintf("%#x", sc.BlockRoot())
// This slot is right on the edge of what would need to be pruned, so by adding it to the cache and
// skipping any other test setup, we can be certain the hot cache path never touches the filesystem.
pr.slotMap.ensure(root, sc.Slot())
require.NoError(t, pr.slotMap.ensure(root, sc.Slot(), 0))
pruned, err := pr.tryPruneDir(root, pr.windowSize)
require.NoError(t, err)
require.Equal(t, 0, pruned)
Expand All @@ -45,7 +45,7 @@ func TestTryPruneDir_CachedExpired(t *testing.T) {
require.NoError(t, err)
root := fmt.Sprintf("%#x", sc.BlockRoot())
require.NoError(t, fs.Mkdir(root, directoryPermissions)) // make empty directory
pr.slotMap.ensure(root, sc.Slot())
require.NoError(t, pr.slotMap.ensure(root, sc.Slot(), 0))
pruned, err := pr.tryPruneDir(root, slot+1)
require.NoError(t, err)
require.Equal(t, 0, pruned)
Expand Down
Loading