Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

call fsync between part file write and rename #13652

Merged
merged 1 commit into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 45 additions & 21 deletions beacon-chain/db/filesystem/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var (
errIndexOutOfBounds = errors.New("blob index in file name >= MaxBlobsPerBlock")
errEmptyBlobWritten = errors.New("zero bytes written to disk when saving blob sidecar")
errSidecarEmptySSZData = errors.New("sidecar marshalled to an empty ssz byte slice")
errNoBasePath = errors.New("BlobStorage base path not specified in init")
)

const (
Expand All @@ -36,45 +37,63 @@ const (
// BlobStorageOption is a functional option for configuring a BlobStorage.
type BlobStorageOption func(*BlobStorage) error

// WithBasePath is a required option that sets the base path of blob storage.
func WithBasePath(base string) BlobStorageOption {
return func(b *BlobStorage) error {
b.base = base
return nil
}
}

// WithBlobRetentionEpochs is an option that changes the number of epochs blobs will be persisted.
func WithBlobRetentionEpochs(e primitives.Epoch) BlobStorageOption {
return func(b *BlobStorage) error {
pruner, err := newBlobPruner(b.fs, e)
if err != nil {
return err
}
b.pruner = pruner
b.retentionEpochs = e
return nil
}
}

// WithSaveFsync is an option that causes Save to call fsync before renaming part files for improved durability.
func WithSaveFsync(fsync bool) BlobStorageOption {
return func(b *BlobStorage) error {
b.fsync = fsync
return nil
}
}

// NewBlobStorage creates a new instance of the BlobStorage object. Note that the implementation of BlobStorage may
// attempt to hold a file lock to guarantee exclusive control of the blob storage directory, so this should only be
// initialized once per beacon node.
func NewBlobStorage(base string, opts ...BlobStorageOption) (*BlobStorage, error) {
base = path.Clean(base)
if err := file.MkdirAll(base); err != nil {
return nil, fmt.Errorf("failed to create blob storage at %s: %w", base, err)
}
fs := afero.NewBasePathFs(afero.NewOsFs(), base)
b := &BlobStorage{
fs: fs,
}
func NewBlobStorage(opts ...BlobStorageOption) (*BlobStorage, error) {
b := &BlobStorage{}
for _, o := range opts {
if err := o(b); err != nil {
return nil, fmt.Errorf("failed to create blob storage at %s: %w", base, err)
return nil, errors.Wrap(err, "failed to create blob storage")
}
}
if b.pruner == nil {
log.Warn("Initializing blob filesystem storage with pruning disabled")
if b.base == "" {
return nil, errNoBasePath
}
b.base = path.Clean(b.base)
if err := file.MkdirAll(b.base); err != nil {
return nil, errors.Wrapf(err, "failed to create blob storage at %s", b.base)
}
b.fs = afero.NewBasePathFs(afero.NewOsFs(), b.base)
pruner, err := newBlobPruner(b.fs, b.retentionEpochs)
if err != nil {
return nil, err
}
b.pruner = pruner
return b, nil
}

// BlobStorage is the concrete implementation of the filesystem backend for saving and retrieving BlobSidecars.
type BlobStorage struct {
fs afero.Fs
pruner *blobPruner
base string
retentionEpochs primitives.Epoch
fsync bool
fs afero.Fs
pruner *blobPruner
}

// WarmCache runs the prune routine with an expiration of slot of 0, so nothing will be pruned, but the pruner's cache
Expand Down Expand Up @@ -151,8 +170,13 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
}
return errors.Wrap(err, "failed to write to partial file")
}
err = partialFile.Close()
if err != nil {
if bs.fsync {
if err := partialFile.Sync(); err != nil {
return err
}
}

if err := partialFile.Close(); err != nil {
return err
}

Expand Down
6 changes: 4 additions & 2 deletions beacon-chain/db/filesystem/blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestBlobStorage_SaveBlobData(t *testing.T) {
// to be empty. This test ensures that several routines can safely save the same blob at the
// same time. This isn't ideal behavior from the caller, but should be handled safely anyway.
// See https://github.com/prysmaticlabs/prysm/pull/13648
b, err := NewBlobStorage(t.TempDir())
b, err := NewBlobStorage(WithBasePath(t.TempDir()))
require.NoError(t, err)
blob := testSidecars[0]

Expand Down Expand Up @@ -268,6 +268,8 @@ func BenchmarkPruning(b *testing.B) {
}

func TestNewBlobStorage(t *testing.T) {
_, err := NewBlobStorage(path.Join(t.TempDir(), "good"))
_, err := NewBlobStorage()
require.ErrorIs(t, err, errNoBasePath)
_, err = NewBlobStorage(WithBasePath(path.Join(t.TempDir(), "good")))
require.NoError(t, err)
}
11 changes: 11 additions & 0 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ type BeaconNode struct {
BackfillOpts []backfill.ServiceOption
initialSyncComplete chan struct{}
BlobStorage *filesystem.BlobStorage
BlobStorageOptions []filesystem.BlobStorageOption
blobRetentionEpochs primitives.Epoch
verifyInitWaiter *verification.InitializerWaiter
syncChecker *initialsync.SyncChecker
Expand Down Expand Up @@ -209,6 +210,16 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
return nil, err
}

// Allow tests to set it as an opt.
if beacon.BlobStorage == nil {
beacon.BlobStorageOptions = append(beacon.BlobStorageOptions, filesystem.WithSaveFsync(features.Get().BlobSaveFsync))
blobs, err := filesystem.NewBlobStorage(beacon.BlobStorageOptions...)
if err != nil {
return nil, err
}
beacon.BlobStorage = blobs
}

log.Debugln("Starting DB")
if err := beacon.startDB(cliCtx, depositAddress); err != nil {
return nil, err
Expand Down
9 changes: 9 additions & 0 deletions beacon-chain/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ func WithBlobStorage(bs *filesystem.BlobStorage) Option {
}
}

// WithBlobStorageOptions appends 1 or more filesystem.BlobStorageOption on the beacon node,
// to be used when initializing blob storage.
func WithBlobStorageOptions(opt ...filesystem.BlobStorageOption) Option {
return func(bn *BeaconNode) error {
bn.BlobStorageOptions = append(bn.BlobStorageOptions, opt...)
return nil
}
}

// WithBlobRetentionEpochs sets the blobRetentionEpochs value, used in kv store initialization.
func WithBlobRetentionEpochs(e primitives.Epoch) Option {
return func(bn *BeaconNode) error {
Expand Down
9 changes: 4 additions & 5 deletions cmd/beacon-chain/storage/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,10 @@ func BeaconNodeOptions(c *cli.Context) ([]node.Option, error) {
if err != nil {
return nil, err
}
bs, err := filesystem.NewBlobStorage(blobStoragePath(c), filesystem.WithBlobRetentionEpochs(e))
if err != nil {
return nil, err
}
return []node.Option{node.WithBlobStorage(bs), node.WithBlobRetentionEpochs(e)}, nil
opts := []node.Option{node.WithBlobStorageOptions(
filesystem.WithBlobRetentionEpochs(e), filesystem.WithBasePath(blobStoragePath(c)),
)}
return opts, nil
}

func blobStoragePath(c *cli.Context) string {
Expand Down
8 changes: 7 additions & 1 deletion config/features/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ type Flags struct {
EnableEIP4881 bool // EnableEIP4881 specifies whether to use the deposit tree from EIP4881

PrepareAllPayloads bool // PrepareAllPayloads informs the engine to prepare a block on every slot.

// BlobSaveFsync requires blob saving to block on fsync to ensure blobs are durably persisted before passing DA.
BlobSaveFsync bool
// KeystoreImportDebounceInterval specifies the time duration the validator waits to reload new keys if they have
// changed on disk. This feature is for advanced use cases only.
KeystoreImportDebounceInterval time.Duration
Expand Down Expand Up @@ -245,6 +246,11 @@ func ConfigureBeaconChain(ctx *cli.Context) error {
logEnabled(EnableLightClient)
cfg.EnableLightClient = true
}
if ctx.IsSet(BlobSaveFsync.Name) {
logEnabled(BlobSaveFsync)
cfg.BlobSaveFsync = true
}

cfg.AggregateIntervals = [3]time.Duration{aggregateFirstInterval.Value, aggregateSecondInterval.Value, aggregateThirdInterval.Value}
Init(cfg)
return nil
Expand Down
7 changes: 6 additions & 1 deletion config/features/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,16 @@ var (
Name: "disable-resource-manager",
Usage: "Disables running the libp2p resource manager.",
}

// DisableRegistrationCache a flag for disabling the validator registration cache and use db instead.
DisableRegistrationCache = &cli.BoolFlag{
Name: "disable-registration-cache",
Usage: "Temporary flag for disabling the validator registration cache instead of using the DB. Note: registrations do not clear on restart while using the DB.",
}
// BlobSaveFsync enforces durable filesystem writes for use cases where blob availability is critical.
BlobSaveFsync = &cli.BoolFlag{
Name: "blob-save-fsync",
Usage: "Forces new blob files to be fysnc'd before continuing, ensuring durable blob writes.",
}
)

// devModeFlags holds list of flags that are set when development mode is on.
Expand Down Expand Up @@ -209,6 +213,7 @@ var BeaconChainFlags = append(deprecatedBeaconFlags, append(deprecatedFlags, []c
disableResourceManager,
DisableRegistrationCache,
EnableLightClient,
BlobSaveFsync,
}...)...)

// E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E.
Expand Down
Loading