Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Amend Blobs Database to Use a Rotating Buffer for Keys #3

Merged
merged 13 commits into from
Nov 4, 2022
3 changes: 1 addition & 2 deletions beacon-chain/db/iface/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,8 @@ type NoHeadAccessDatabase interface {
SaveBlocks(ctx context.Context, blocks []interfaces.SignedBeaconBlock) error
SaveGenesisBlockRoot(ctx context.Context, blockRoot [32]byte) error
// Blob related methods.
DeleteBlobsSidecar(ctx context.Context, root [32]byte) error
SaveBlobsSidecar(ctx context.Context, blob *ethpb.BlobsSidecar) error
CleanupBlobs(ctx context.Context) error
DeleteBlobsSidecar(ctx context.Context, blockRoot [32]byte) error
// State related methods.
SaveState(ctx context.Context, state state.ReadOnlyBeaconState, blockRoot [32]byte) error
SaveStates(ctx context.Context, states []state.ReadOnlyBeaconState, blockRoots [][32]byte) error
Expand Down
207 changes: 113 additions & 94 deletions beacon-chain/db/kv/blobs.go
Original file line number Diff line number Diff line change
@@ -1,58 +1,93 @@
package kv

import (
"bytes"
"context"
"time"
"fmt"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/config/params"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
)

// DeleteBlobsSidecar removes the blobs from the db.
func (s *Store) DeleteBlobsSidecar(ctx context.Context, root [32]byte) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.DeleteBlobsSidecar")
defer span.End()
return s.db.Update(func(tx *bolt.Tx) error {
if err := tx.Bucket(blobsBucket).Delete(root[:]); err != nil {
return err
}
return tx.Bucket(blobsAgesBucket).Delete(root[:])
})
}

// SaveBlobsSidecar saves the blobs for a given epoch in the sidecar bucket.
func (s *Store) SaveBlobsSidecar(ctx context.Context, blob *ethpb.BlobsSidecar) error {
const blobSidecarKeyLength = 48 // slot_to_rotating_buffer(blob.slot) ++ blob.slot ++ blob.block_root

// SaveBlobsSidecar saves the blobs for a given epoch in the sidecar bucket. When we receive a blob:
// 1. Convert slot using a modulo operator to [0, maxSlots] where maxSlots = MAX_BLOB_EPOCHS*SLOTS_PER_EPOCH
// 2. Compute key for blob as bytes(slot_to_rotating_buffer(blob.slot)) ++ bytes(blob.slot) ++ blob.block_root
// 3. Begin the save algorithm: If the incoming blob has a slot bigger than the saved slot at the spot
// in the rotating keys buffer, we overwrite all elements for that slot.
//
// firstElemKey = getFirstElement(bucket)
// shouldOverwrite = blob.slot > bytes_to_slot(firstElemKey[8:16])
// if shouldOverwrite:
// for existingKey := seek prefix bytes(slot_to_rotating_buffer(blob.slot))
// bucket.delete(existingKey)
// bucket.put(key, blob)
func (s *Store) SaveBlobsSidecar(ctx context.Context, blobSidecar *ethpb.BlobsSidecar) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveBlobsSidecar")
defer span.End()
return s.db.Update(func(tx *bolt.Tx) error {
blobKey := blob.BeaconBlockRoot
insertTime := time.Now().Format(time.RFC3339)
ageBkt := tx.Bucket(blobsAgesBucket)
if err := ageBkt.Put(blobKey, []byte(insertTime)); err != nil {
encodedBlobSidecar, err := encode(ctx, blobSidecar)
if err != nil {
return err
}

bkt := tx.Bucket(blobsBucket)
enc, err := encode(ctx, blob)
if err != nil {
return err
c := bkt.Cursor()
key := blobSidecarKey(blobSidecar)
rotatingBufferPrefix := key[0:8]
var firstElementKey []byte
for k, _ := c.Seek(rotatingBufferPrefix); bytes.HasPrefix(k, rotatingBufferPrefix); k, _ = c.Next() {
if len(k) != 0 {
firstElementKey = k
break
}
}
// If there is no element stored at blob.slot % MAX_SLOTS_TO_PERSIST_BLOBS, then we simply
// store the blob by key and exit early.
if len(firstElementKey) == 0 {
return bkt.Put(key, encodedBlobSidecar)
} else if len(firstElementKey) != len(key) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this even possible?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah maybe someone changes the schema. It's just a defensive check to prevent panics and instead error gracefully because otherwise we'll be accessing slice indices out of range

return fmt.Errorf(
"key length %d (%#x) != existing key length %d (%#x)",
len(key),
key,
len(firstElementKey),
firstElementKey,
)
}
return bkt.Put(blobKey, enc)
slotOfFirstElement := firstElementKey[8:16]
// If we should overwrite old blobs at the spot in the rotating buffer, we clear data at that spot.
shouldOverwrite := blobSidecar.BeaconBlockSlot > bytesutil.BytesToSlotBigEndian(slotOfFirstElement)
if shouldOverwrite {
for k, _ := c.Seek(rotatingBufferPrefix); bytes.HasPrefix(k, rotatingBufferPrefix); k, _ = c.Next() {
if err := bkt.Delete(k); err != nil {
log.WithError(err).Warnf("Could not delete blob with key %#x", k)
}
}
}
return bkt.Put(key, encodedBlobSidecar)
})
}

// BlobsSidecar retrieves the blobs given a block root.
func (s *Store) BlobsSidecar(ctx context.Context, blockRoot [32]byte) (*ethpb.BlobsSidecar, error) {
// BlobsSidecar retrieves the blobs given a beacon block root.
func (s *Store) BlobsSidecar(ctx context.Context, beaconBlockRoot [32]byte) (*ethpb.BlobsSidecar, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.BlobsSidecar")
defer span.End()

var enc []byte
if err := s.db.View(func(tx *bolt.Tx) error {
enc = tx.Bucket(blobsBucket).Get(blockRoot[:])
c := tx.Bucket(blobsBucket).Cursor()
// Bucket size is bounded and bolt cursors are fast. Moreover, a thin caching layer can be added.
for k, v := c.First(); k != nil; k, v = c.Next() {
if bytes.HasSuffix(k, beaconBlockRoot[:]) {
enc = v
break
}
}
return nil
}); err != nil {
return nil, err
Expand All @@ -71,91 +106,75 @@ func (s *Store) BlobsSidecar(ctx context.Context, blockRoot [32]byte) (*ethpb.Bl
func (s *Store) BlobsSidecarsBySlot(ctx context.Context, slot types.Slot) ([]*ethpb.BlobsSidecar, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.BlobsSidecarsBySlot")
defer span.End()

var blobsSidecars []*ethpb.BlobsSidecar
err := s.db.View(func(tx *bolt.Tx) error {
blockRoots, err := blockRootsBySlot(ctx, tx, slot)
if err != nil {
return err
}

for _, blockRoot := range blockRoots {
enc := tx.Bucket(blobsBucket).Get(blockRoot[:])
if len(enc) == 0 {
encodedItems := make([][]byte, 0)
if err := s.db.View(func(tx *bolt.Tx) error {
c := tx.Bucket(blobsBucket).Cursor()
// Bucket size is bounded and bolt cursors are fast. Moreover, a thin caching layer can be added.
for k, v := c.First(); k != nil; k, v = c.Next() {
if len(k) != blobSidecarKeyLength {
continue
}
Comment on lines +114 to 116
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this even possible?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above

blobs := &ethpb.BlobsSidecar{}
if err := decode(ctx, enc, blobs); err != nil {
return err
slotInKey := bytesutil.BytesToSlotBigEndian(k[8:16])
if slotInKey == slot {
encodedItems = append(encodedItems, v)
}
blobsSidecars = append(blobsSidecars, blobs)
}

return nil
})
if err != nil {
return nil, errors.Wrap(err, "could not retrieve blobs")
}); err != nil {
return nil, err
}
sidecars := make([]*ethpb.BlobsSidecar, len(encodedItems))
if len(encodedItems) == 0 {
return sidecars, nil
}
return blobsSidecars, nil
for i, enc := range encodedItems {
blob := &ethpb.BlobsSidecar{}
if err := decode(ctx, enc, blob); err != nil {
return nil, err
}
sidecars[i] = blob
}
return sidecars, nil
}

// HasBlobsSidecar returns true if the blobs are in the db.
func (s *Store) HasBlobsSidecar(ctx context.Context, root [32]byte) bool {
func (s *Store) HasBlobsSidecar(ctx context.Context, beaconBlockRoot [32]byte) bool {
ctx, span := trace.StartSpan(ctx, "BeaconDB.HasBlobsSidecar")
defer span.End()

exists := false
if err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(blobsBucket)
exists = bkt.Get(root[:]) != nil
return nil
}); err != nil { // This view never returns an error, but we'll handle anyway for sanity.
panic(err)
blobSidecar, err := s.BlobsSidecar(ctx, beaconBlockRoot)
if err != nil {
return false
}
return exists
return blobSidecar != nil
}

// CleanupBlobs removes blobs that are older than the cutoff time.
func (s *Store) CleanupBlobs(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.CleanupBlobs")
// DeleteBlobsSidecar returns true if the blobs are in the db.
func (s *Store) DeleteBlobsSidecar(ctx context.Context, beaconBlockRoot [32]byte) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.DeleteBlobsSidecar")
defer span.End()

secsInEpoch := time.Duration(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot)) * time.Second
ttl := secsInEpoch * (time.Duration(params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest) + 1) // add one more epoch as slack

var expiredBlobs [][]byte
now := time.Now()
err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(blobsAgesBucket)
c := bkt.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
insertTime, err := time.Parse(time.RFC3339, string(v))
if err != nil {
return err
}
if now.Sub(insertTime) > ttl {
expiredBlobs = append(expiredBlobs, k)
}
}
return nil
})
if err != nil {
return err
}

log.WithField("count", len(expiredBlobs)).Info("Cleaning up blobs")

return s.db.Update(func(tx *bolt.Tx) error {
agesBkt := tx.Bucket(blobsAgesBucket)
bkt := tx.Bucket(blobsBucket)
for _, root := range expiredBlobs {
if err := bkt.Delete(root); err != nil {
return err
}
if err := agesBkt.Delete(root); err != nil {
return err
c := bkt.Cursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
if bytes.HasSuffix(k, beaconBlockRoot[:]) {
if err := bkt.Delete(k); err != nil {
return err
}
}
}
return nil
})
}

// We define a blob sidecar key as: bytes(slot_to_rotating_buffer(blob.slot)) ++ bytes(blob.slot) ++ blob.block_root
// where slot_to_rotating_buffer(slot) = slot % MAX_SLOTS_TO_PERSIST_BLOBS.
func blobSidecarKey(blob *ethpb.BlobsSidecar) []byte {
slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch
maxEpochsToPersistBlobs := params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be configurable via a flag? I can imagine users wanting to save blobs for longer.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep it should be done via a feature config IMO...will reserve that for a second PR

maxSlotsToPersistBlobs := types.Slot(maxEpochsToPersistBlobs.Mul(uint64(slotsPerEpoch)))
slotInRotatingBuffer := blob.BeaconBlockSlot.ModSlot(maxSlotsToPersistBlobs)
key := bytesutil.SlotToBytesBigEndian(slotInRotatingBuffer)
key = append(key, bytesutil.SlotToBytesBigEndian(blob.BeaconBlockSlot)...)
key = append(key, blob.BeaconBlockRoot...)
return key
}
101 changes: 101 additions & 0 deletions beacon-chain/db/kv/blobs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package kv

import (
"bytes"
"context"
"fmt"
"testing"

"github.com/prysmaticlabs/prysm/v3/config/params"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
enginev1 "github.com/prysmaticlabs/prysm/v3/proto/engine/v1"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/testing/require"
bolt "go.etcd.io/bbolt"
)

func TestBlobsSidecar_Overwriting(t *testing.T) {
params.SetupTestConfigCleanup(t)
cfg := params.BeaconNetworkConfig()
// For purposes of testing, we only keep blob sidecars around for 2 epochs. At third epoch, we will
// wrap around and overwrite the oldest epoch's elements as the keys for blobs work as a rotating buffer.
cfg.MinEpochsForBlobsSidecarsRequest = 2
params.OverrideBeaconNetworkConfig(cfg)
db := setupDB(t)

sidecars := make([]*ethpb.BlobsSidecar, 0)
numSlots := uint64(cfg.MinEpochsForBlobsSidecarsRequest) * uint64(params.BeaconConfig().SlotsPerEpoch)
for i := uint64(0); i < numSlots; i++ {
// There can be multiple blobs per slot with different block roots, so we create some
// in order to have a thorough test.
root1 := bytesutil.ToBytes32([]byte(fmt.Sprintf("foo-%d", i)))
root2 := bytesutil.ToBytes32([]byte(fmt.Sprintf("bar-%d", i)))
sidecars = append(sidecars, &ethpb.BlobsSidecar{
BeaconBlockRoot: root1[:],
BeaconBlockSlot: types.Slot(i),
Blobs: make([]*enginev1.Blob, 0),
AggregatedProof: make([]byte, 48),
})
sidecars = append(sidecars, &ethpb.BlobsSidecar{
BeaconBlockRoot: root2[:],
BeaconBlockSlot: types.Slot(i),
Blobs: make([]*enginev1.Blob, 0),
AggregatedProof: make([]byte, 48),
})
}
ctx := context.Background()
for _, blobSidecar := range sidecars {
require.NoError(t, db.SaveBlobsSidecar(ctx, blobSidecar))
require.Equal(t, true, db.HasBlobsSidecar(ctx, bytesutil.ToBytes32(blobSidecar.BeaconBlockRoot)))
}

// We check there are only two blob sidecars stored at slot 0, as an example.
keyPrefix := append(bytesutil.SlotToBytesBigEndian(0), bytesutil.SlotToBytesBigEndian(0)...)
numBlobs := countBlobsWithPrefix(t, db, keyPrefix)
require.Equal(t, 2, numBlobs)

// Attempting to save another blob sidecar with slot 0 and a new block root should result
// in three blob sidecars stored at slot 0. This means we are NOT overwriting old data.
root := bytesutil.ToBytes32([]byte("baz-0"))
sidecar := &ethpb.BlobsSidecar{
BeaconBlockRoot: root[:],
BeaconBlockSlot: types.Slot(0),
Blobs: make([]*enginev1.Blob, 0),
AggregatedProof: make([]byte, 48),
}
require.NoError(t, db.SaveBlobsSidecar(ctx, sidecar))
require.Equal(t, true, db.HasBlobsSidecar(ctx, bytesutil.ToBytes32(sidecar.BeaconBlockRoot)))

numBlobs = countBlobsWithPrefix(t, db, keyPrefix)
require.Equal(t, 3, numBlobs)

// Now, we attempt to save a blob sidecar with slot = MAX_SLOTS_TO_PERSIST_BLOBS. This SHOULD cause us to
// overwrite ALL old data at slot 0, as slot % MAX_SLOTS_TO_PERSIST_BLOBS will be equal to 0.
// We should expect a single blob sidecar to exist at slot 0 after this operation.
root = bytesutil.ToBytes32([]byte(fmt.Sprintf("foo-%d", numSlots)))
sidecar = &ethpb.BlobsSidecar{
BeaconBlockRoot: root[:],
BeaconBlockSlot: types.Slot(numSlots),
Blobs: make([]*enginev1.Blob, 0),
AggregatedProof: make([]byte, 48),
}
require.NoError(t, db.SaveBlobsSidecar(ctx, sidecar))
require.Equal(t, true, db.HasBlobsSidecar(ctx, bytesutil.ToBytes32(sidecar.BeaconBlockRoot)))

keyPrefix = append(bytesutil.SlotToBytesBigEndian(0), bytesutil.SlotToBytesBigEndian(64)...)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should not append 64 because the original sidecar at round buffer's 0 value had a different slot.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this example, the key we are expecting is 0 ++ 64 ++ bytes32(foo-64). A key contains the round buffer, the slot, and the block root. The new value we are storing at round buffer 0 will be under this new key, so this is the correct value to expect

numBlobs = countBlobsWithPrefix(t, db, keyPrefix)
require.Equal(t, 1, numBlobs)
}

func countBlobsWithPrefix(t *testing.T, db *Store, prefix []byte) int {
numBlobSidecars := 0
require.NoError(t, db.db.View(func(tx *bolt.Tx) error {
c := tx.Bucket(blobsBucket).Cursor()
for k, _ := c.Seek(prefix); bytes.HasPrefix(k, prefix); k, _ = c.Next() {
numBlobSidecars++
}
return nil
}))
return numBlobSidecars
}
Loading