From 4c2caa9bf9c5569a21ef5a899e6e8a57835ba2c1 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 4 Jun 2024 09:52:25 +0200 Subject: [PATCH] PeerDAS: Implement / use data column feed from database. (#14062) * Remove some `_` identifiers. * Blob storage: Implement a notifier system for data columns. * `dataColumnSidecarByRootRPCHandler`: Remove ugly `time.Sleep(100 * time.Millisecond)`. * Address Nishant's comment. --- beacon-chain/db/filesystem/BUILD.bazel | 1 + beacon-chain/db/filesystem/blob.go | 25 ++++++- .../sync/rpc_data_column_sidecars_by_root.go | 71 ++++++++++++------- 3 files changed, 68 insertions(+), 29 deletions(-) diff --git a/beacon-chain/db/filesystem/BUILD.bazel b/beacon-chain/db/filesystem/BUILD.bazel index e4008d70072e..5de9dd084482 100644 --- a/beacon-chain/db/filesystem/BUILD.bazel +++ b/beacon-chain/db/filesystem/BUILD.bazel @@ -13,6 +13,7 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem", visibility = ["//visibility:public"], deps = [ + "//async/event:go_default_library", "//beacon-chain/verification:go_default_library", "//config/fieldparams:go_default_library", "//config/params:go_default_library", diff --git a/beacon-chain/db/filesystem/blob.go b/beacon-chain/db/filesystem/blob.go index 6b6e0664d3da..63dbf323d725 100644 --- a/beacon-chain/db/filesystem/blob.go +++ b/beacon-chain/db/filesystem/blob.go @@ -12,6 +12,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v5/async/event" "github.com/prysmaticlabs/prysm/v5/beacon-chain/verification" fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" @@ -39,8 +40,15 @@ const ( directoryPermissions = 0700 ) -// BlobStorageOption is a functional option for configuring a BlobStorage. -type BlobStorageOption func(*BlobStorage) error +type ( + // BlobStorageOption is a functional option for configuring a BlobStorage. + BlobStorageOption func(*BlobStorage) error + + RootIndexPair struct { + Root [fieldparams.RootLength]byte + Index uint64 + } +) // WithBasePath is a required option that sets the base path of blob storage. func WithBasePath(base string) BlobStorageOption { @@ -70,7 +78,10 @@ func WithSaveFsync(fsync bool) BlobStorageOption { // attempt to hold a file lock to guarantee exclusive control of the blob storage directory, so this should only be // initialized once per beacon node. func NewBlobStorage(opts ...BlobStorageOption) (*BlobStorage, error) { - b := &BlobStorage{} + b := &BlobStorage{ + DataColumnFeed: new(event.Feed), + } + for _, o := range opts { if err := o(b); err != nil { return nil, errors.Wrap(err, "failed to create blob storage") @@ -99,6 +110,7 @@ type BlobStorage struct { fsync bool fs afero.Fs pruner *blobPruner + DataColumnFeed *event.Feed } // WarmCache runs the prune routine with an expiration of slot of 0, so nothing will be pruned, but the pruner's cache @@ -312,6 +324,13 @@ func (bs *BlobStorage) SaveDataColumn(column blocks.VerifiedRODataColumn) error return errors.Wrap(err, "failed to rename partial file to final name") } partialMoved = true + + // Notify the data column notifier that a new data column has been saved. + bs.DataColumnFeed.Send(RootIndexPair{ + Root: column.BlockRoot(), + Index: column.ColumnIndex, + }) + // TODO: Use new metrics for data columns blobsWrittenCounter.Inc() blobSaveLatency.Observe(float64(time.Since(startTime).Milliseconds())) diff --git a/beacon-chain/sync/rpc_data_column_sidecars_by_root.go b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go index c149a629556d..241fd8185f29 100644 --- a/beacon-chain/sync/rpc_data_column_sidecars_by_root.go +++ b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" "github.com/prysmaticlabs/prysm/v5/beacon-chain/db" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" @@ -19,7 +20,6 @@ import ( "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v5/monitoring/tracing" - eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/time/slots" "github.com/sirupsen/logrus" "go.opencensus.io/trace" @@ -71,12 +71,13 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int return errors.Wrapf(err, "unexpected error computing min valid blob request slot, current_slot=%d", cs) } - // Compute all custodied columns. + // Compute all custodied subnets. custodiedSubnets := params.BeaconConfig().CustodyRequirement if flags.Get().SubscribeToAllSubnets { custodiedSubnets = params.BeaconConfig().DataColumnSidecarSubnetCount } + // Compute all custodied columns. custodiedColumns, err := peerdas.CustodyColumns(s.cfg.p2p.NodeID(), custodiedSubnets) if err != nil { log.WithError(err).Errorf("unexpected error retrieving the node id") @@ -101,6 +102,11 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int "requestedCount": len(requestedColumnsList), }).Debug("Received data column sidecar by root request") + // Subscribe to the data column feed. + rootIndexChan := make(chan filesystem.RootIndexPair) + subscription := s.cfg.blobStorage.DataColumnFeed.Subscribe(rootIndexChan) + defer subscription.Unsubscribe() + for i := range requestedColumnIdents { if err := ctx.Err(); err != nil { closeStream(stream, log) @@ -121,9 +127,10 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int } s.rateLimiter.add(stream, 1) - root, idx := bytesutil.ToBytes32(requestedColumnIdents[i].BlockRoot), requestedColumnIdents[i].Index + requestedRoot, requestedIndex := bytesutil.ToBytes32(requestedColumnIdents[i].BlockRoot), requestedColumnIdents[i].Index - isCustodied := custodiedColumns[idx] + // Decrease the peer's score if it requests a column that is not custodied. + isCustodied := custodiedColumns[requestedIndex] if !isCustodied { s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) s.writeErrorResponseToStream(responseCodeInvalidRequest, types.ErrInvalidColumnIndex.Error(), stream) @@ -133,42 +140,54 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int // TODO: Differentiate between blobs and columns for our storage engine // If the data column is nil, it means it is not yet available in the db. // We wait for it to be available. - // TODO: Use a real feed like `nc := s.blobNotifiers.forRoot(root)` instead of this for/sleep loop looking in the DB. - var sc *eth.DataColumnSidecar - for { - sc, err = s.cfg.blobStorage.GetColumn(root, idx) - if err != nil { - if ctxErr := ctx.Err(); ctxErr != nil { - closeStream(stream, log) - return ctxErr - } + // Retrieve the data column from the database. + dataColumnSidecar, err := s.cfg.blobStorage.GetColumn(requestedRoot, requestedIndex) + + if err != nil && !db.IsNotFound(err) { + s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) + return errors.Wrap(err, "get column") + } + + if err != nil && db.IsNotFound(err) { + fields := logrus.Fields{ + "root": fmt.Sprintf("%#x", requestedRoot), + "index": requestedIndex, + } - if db.IsNotFound(err) { - fields := logrus.Fields{ - "root": fmt.Sprintf("%#x", root), - "index": idx, + log.WithFields(fields).Debug("Peer requested data column sidecar by root not found in db, waiting for it to be available") + + loop: + for { + select { + case receivedRootIndex := <-rootIndexChan: + if receivedRootIndex.Root == requestedRoot && receivedRootIndex.Index == requestedIndex { + // This is the data column we are looking for. + log.WithFields(fields).Debug("Data column sidecar by root is now available in the db") + + break loop } - log.WithFields(fields).Debugf("Peer requested data column sidecar by root not found in db, waiting for it to be available") - time.Sleep(100 * time.Millisecond) // My heart is crying - continue + case <-ctx.Done(): + closeStream(stream, log) + return errors.Errorf("context closed while waiting for data column with root %#x and index %d", requestedRoot, requestedIndex) } + } - log.WithError(err).Errorf("unexpected db error retrieving data column, root=%x, index=%d", root, idx) + // Retrieve the data column from the db. + dataColumnSidecar, err = s.cfg.blobStorage.GetColumn(requestedRoot, requestedIndex) + if err != nil { + // This time, no error (even not found error) should be returned. s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) - return errors.Wrap(err, "get column") } - - break } // If any root in the request content references a block earlier than minimum_request_epoch, // peers MAY respond with error code 3: ResourceUnavailable or not include the data column in the response. // note: we are deviating from the spec to allow requests for data column that are before minimum_request_epoch, // up to the beginning of the retention period. - if sc.SignedBlockHeader.Header.Slot < minReqSlot { + if dataColumnSidecar.SignedBlockHeader.Header.Slot < minReqSlot { s.writeErrorResponseToStream(responseCodeResourceUnavailable, types.ErrDataColumnLTMinRequest.Error(), stream) log.WithError(types.ErrDataColumnLTMinRequest). Debugf("requested data column for block %#x before minimum_request_epoch", requestedColumnIdents[i].BlockRoot) @@ -176,7 +195,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int } SetStreamWriteDeadline(stream, defaultWriteDuration) - if chunkErr := WriteDataColumnSidecarChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), sc); chunkErr != nil { + if chunkErr := WriteDataColumnSidecarChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), dataColumnSidecar); chunkErr != nil { log.WithError(chunkErr).Debug("Could not send a chunked response") s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) tracing.AnnotateError(span, chunkErr)