Skip to content

Commit

Permalink
blob save fsync feature flag
Browse files Browse the repository at this point in the history
  • Loading branch information
kasey committed Feb 23, 2024
1 parent e100fb0 commit 0f907da
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 40 deletions.
66 changes: 45 additions & 21 deletions beacon-chain/db/filesystem/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var (
errIndexOutOfBounds = errors.New("blob index in file name >= MaxBlobsPerBlock")
errEmptyBlobWritten = errors.New("zero bytes written to disk when saving blob sidecar")
errSidecarEmptySSZData = errors.New("sidecar marshalled to an empty ssz byte slice")
errNoBasePath = errors.New("BlobStorage base path not specified in init")
)

const (
Expand All @@ -36,45 +37,63 @@ const (
// BlobStorageOption is a functional option for configuring a BlobStorage.
type BlobStorageOption func(*BlobStorage) error

// WithBasePath is a required option that sets the base path of blob storage.
func WithBasePath(base string) BlobStorageOption {
return func(b *BlobStorage) error {
b.base = base
return nil
}
}

// WithBlobRetentionEpochs is an option that changes the number of epochs blobs will be persisted.
func WithBlobRetentionEpochs(e primitives.Epoch) BlobStorageOption {
return func(b *BlobStorage) error {
pruner, err := newBlobPruner(b.fs, e)
if err != nil {
return err
}
b.pruner = pruner
b.retentionEpochs = e
return nil
}
}

// WithSaveFsync is an option that causes Save to call fsync before renaming part files for improved durability.
func WithSaveFsync(fsync bool) BlobStorageOption {
return func(b *BlobStorage) error {
b.fsync = fsync
return nil
}
}

// NewBlobStorage creates a new instance of the BlobStorage object. Note that the implementation of BlobStorage may
// attempt to hold a file lock to guarantee exclusive control of the blob storage directory, so this should only be
// initialized once per beacon node.
func NewBlobStorage(base string, opts ...BlobStorageOption) (*BlobStorage, error) {
base = path.Clean(base)
if err := file.MkdirAll(base); err != nil {
return nil, fmt.Errorf("failed to create blob storage at %s: %w", base, err)
}
fs := afero.NewBasePathFs(afero.NewOsFs(), base)
b := &BlobStorage{
fs: fs,
}
func NewBlobStorage(opts ...BlobStorageOption) (*BlobStorage, error) {
b := &BlobStorage{}
for _, o := range opts {
if err := o(b); err != nil {
return nil, fmt.Errorf("failed to create blob storage at %s: %w", base, err)
return nil, errors.Wrap(err, "failed to create blob storage")
}
}
if b.pruner == nil {
log.Warn("Initializing blob filesystem storage with pruning disabled")
if b.base == "" {
return nil, errNoBasePath
}
b.base = path.Clean(b.base)
if err := file.MkdirAll(b.base); err != nil {
return nil, errors.Wrapf(err, "failed to create blob storage at %s", b.base)
}
b.fs = afero.NewBasePathFs(afero.NewOsFs(), b.base)
pruner, err := newBlobPruner(b.fs, b.retentionEpochs)
if err != nil {
return nil, err
}
b.pruner = pruner
return b, nil
}

// BlobStorage is the concrete implementation of the filesystem backend for saving and retrieving BlobSidecars.
type BlobStorage struct {
fs afero.Fs
pruner *blobPruner
base string
retentionEpochs primitives.Epoch
fsync bool
fs afero.Fs
pruner *blobPruner
}

// WarmCache runs the prune routine with an expiration of slot of 0, so nothing will be pruned, but the pruner's cache
Expand Down Expand Up @@ -151,8 +170,13 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
}
return errors.Wrap(err, "failed to write to partial file")
}
err = partialFile.Close()
if err != nil {
if bs.fsync {
if err := partialFile.Sync(); err != nil {
return err
}
}

if err := partialFile.Close(); err != nil {
return err
}

Expand Down
6 changes: 4 additions & 2 deletions beacon-chain/db/filesystem/blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestBlobStorage_SaveBlobData(t *testing.T) {
// to be empty. This test ensures that several routines can safely save the same blob at the
// same time. This isn't ideal behavior from the caller, but should be handled safely anyway.
// See https://github.com/prysmaticlabs/prysm/pull/13648
b, err := NewBlobStorage(t.TempDir())
b, err := NewBlobStorage(WithBasePath(t.TempDir()))
require.NoError(t, err)
blob := testSidecars[0]

Expand Down Expand Up @@ -268,6 +268,8 @@ func BenchmarkPruning(b *testing.B) {
}

func TestNewBlobStorage(t *testing.T) {
_, err := NewBlobStorage(path.Join(t.TempDir(), "good"))
_, err := NewBlobStorage()
require.ErrorIs(t, err, errNoBasePath)
_, err = NewBlobStorage(WithBasePath(path.Join(t.TempDir(), "good")))
require.NoError(t, err)
}
1 change: 0 additions & 1 deletion beacon-chain/node/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ go_test(
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/builder:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/db/filesystem:go_default_library",
"//beacon-chain/execution:go_default_library",
"//beacon-chain/execution/testing:go_default_library",
"//beacon-chain/monitor:go_default_library",
Expand Down
8 changes: 8 additions & 0 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ type BeaconNode struct {
BackfillOpts []backfill.ServiceOption
initialSyncComplete chan struct{}
BlobStorage *filesystem.BlobStorage
BlobStorageOptions []filesystem.BlobStorageOption
blobRetentionEpochs primitives.Epoch
verifyInitWaiter *verification.InitializerWaiter
syncChecker *initialsync.SyncChecker
Expand Down Expand Up @@ -209,6 +210,13 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
return nil, err
}

beacon.BlobStorageOptions = append(beacon.BlobStorageOptions, filesystem.WithSaveFsync(features.Get().BlobSaveFsync))
blobs, err := filesystem.NewBlobStorage(beacon.BlobStorageOptions...)
if err != nil {
return nil, err
}
beacon.BlobStorage = blobs

log.Debugln("Starting DB")
if err := beacon.startDB(cliCtx, depositAddress); err != nil {
return nil, err
Expand Down
13 changes: 4 additions & 9 deletions beacon-chain/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/builder"
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/execution"
mockExecution "github.com/prysmaticlabs/prysm/v5/beacon-chain/execution/testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/monitor"
Expand Down Expand Up @@ -60,7 +59,7 @@ func TestNodeClose_OK(t *testing.T) {
cmd.ValidatorMonitorIndicesFlag.Value.SetInt(1)
ctx, cancel := newCliContextWithCancel(&app, set)

node, err := New(ctx, cancel, WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)))
node, err := New(ctx, cancel)
require.NoError(t, err)

node.Close()
Expand All @@ -80,8 +79,7 @@ func TestNodeStart_Ok(t *testing.T) {
ctx, cancel := newCliContextWithCancel(&app, set)
node, err := New(ctx, cancel, WithBlockchainFlagOptions([]blockchain.Option{}),
WithBuilderFlagOptions([]builder.Option{}),
WithExecutionChainOptions([]execution.Option{}),
WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)))
WithExecutionChainOptions([]execution.Option{}))
require.NoError(t, err)
node.services = &runtime.ServiceRegistry{}
go func() {
Expand All @@ -104,8 +102,7 @@ func TestNodeStart_SyncChecker(t *testing.T) {
ctx, cancel := newCliContextWithCancel(&app, set)
node, err := New(ctx, cancel, WithBlockchainFlagOptions([]blockchain.Option{}),
WithBuilderFlagOptions([]builder.Option{}),
WithExecutionChainOptions([]execution.Option{}),
WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)))
WithExecutionChainOptions([]execution.Option{}))
require.NoError(t, err)
go func() {
node.Start()
Expand Down Expand Up @@ -152,8 +149,7 @@ func TestNodeStart_Ok_registerDeterministicGenesisService(t *testing.T) {
ctx, cancel := newCliContextWithCancel(&app, set)
node, err := New(ctx, cancel, WithBlockchainFlagOptions([]blockchain.Option{}),
WithBuilderFlagOptions([]builder.Option{}),
WithExecutionChainOptions([]execution.Option{}),
WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)))
WithExecutionChainOptions([]execution.Option{}))
require.NoError(t, err)
node.services = &runtime.ServiceRegistry{}
go func() {
Expand Down Expand Up @@ -185,7 +181,6 @@ func TestClearDB(t *testing.T) {
context, cancel := newCliContextWithCancel(&app, set)
options := []Option{
WithExecutionChainOptions([]execution.Option{execution.WithHttpEndpoint(endpoint)}),
WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)),
}
_, err = New(context, cancel, options...)
require.NoError(t, err)
Expand Down
9 changes: 9 additions & 0 deletions beacon-chain/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ func WithBlobStorage(bs *filesystem.BlobStorage) Option {
}
}

// WithBlobStorageOptions appends 1 or more filesystem.BlobStorageOption on the beacon node,
// to be used when initializing blob storage.
func WithBlobStorageOptions(opt ...filesystem.BlobStorageOption) Option {
return func(bn *BeaconNode) error {
bn.BlobStorageOptions = append(bn.BlobStorageOptions, opt...)
return nil
}
}

// WithBlobRetentionEpochs sets the blobRetentionEpochs value, used in kv store initialization.
func WithBlobRetentionEpochs(e primitives.Epoch) Option {
return func(bn *BeaconNode) error {
Expand Down
9 changes: 4 additions & 5 deletions cmd/beacon-chain/storage/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,10 @@ func BeaconNodeOptions(c *cli.Context) ([]node.Option, error) {
if err != nil {
return nil, err
}
bs, err := filesystem.NewBlobStorage(blobStoragePath(c), filesystem.WithBlobRetentionEpochs(e))
if err != nil {
return nil, err
}
return []node.Option{node.WithBlobStorage(bs), node.WithBlobRetentionEpochs(e)}, nil
opts := []node.Option{node.WithBlobStorageOptions(
filesystem.WithBlobRetentionEpochs(e), filesystem.WithBasePath(blobStoragePath(c)),
)}
return opts, nil
}

func blobStoragePath(c *cli.Context) string {
Expand Down
8 changes: 7 additions & 1 deletion config/features/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ type Flags struct {
EnableEIP4881 bool // EnableEIP4881 specifies whether to use the deposit tree from EIP4881

PrepareAllPayloads bool // PrepareAllPayloads informs the engine to prepare a block on every slot.

// BlobSaveFsync requires blob saving to block on fsync to ensure blobs are durably persisted before passing DA.
BlobSaveFsync bool
// KeystoreImportDebounceInterval specifies the time duration the validator waits to reload new keys if they have
// changed on disk. This feature is for advanced use cases only.
KeystoreImportDebounceInterval time.Duration
Expand Down Expand Up @@ -245,6 +246,11 @@ func ConfigureBeaconChain(ctx *cli.Context) error {
logEnabled(EnableLightClient)
cfg.EnableLightClient = true
}
if ctx.IsSet(BlobSaveFsync.Name) {
logEnabled(BlobSaveFsync)
cfg.BlobSaveFsync = true
}

cfg.AggregateIntervals = [3]time.Duration{aggregateFirstInterval.Value, aggregateSecondInterval.Value, aggregateThirdInterval.Value}
Init(cfg)
return nil
Expand Down
7 changes: 6 additions & 1 deletion config/features/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,16 @@ var (
Name: "disable-resource-manager",
Usage: "Disables running the libp2p resource manager.",
}

// DisableRegistrationCache a flag for disabling the validator registration cache and use db instead.
DisableRegistrationCache = &cli.BoolFlag{
Name: "disable-registration-cache",
Usage: "Temporary flag for disabling the validator registration cache instead of using the DB. Note: registrations do not clear on restart while using the DB.",
}
// BlobSaveFsync enforces durable filesystem writes for use cases where blob availability is critical.
BlobSaveFsync = &cli.BoolFlag{
Name: "blob-save-fsync",
Usage: "Forces new blob files to be fysnc'd before continuing, ensuring durable blob writes.",
}
)

// devModeFlags holds list of flags that are set when development mode is on.
Expand Down Expand Up @@ -209,6 +213,7 @@ var BeaconChainFlags = append(deprecatedBeaconFlags, append(deprecatedFlags, []c
disableResourceManager,
DisableRegistrationCache,
EnableLightClient,
BlobSaveFsync,
}...)...)

// E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E.
Expand Down

0 comments on commit 0f907da

Please sign in to comment.