Skip to content

Commit

Permalink
PeerDAS: Implement / use data column feed from database. (#14062)
Browse files Browse the repository at this point in the history
* Remove some `_` identifiers.

* Blob storage: Implement a notifier system for data columns.

* `dataColumnSidecarByRootRPCHandler`: Remove ugly `time.Sleep(100 * time.Millisecond)`.

* Address Nishant's comment.
  • Loading branch information
nalepae authored Jun 4, 2024
1 parent b692050 commit 54f2d91
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 33 deletions.
1 change: 1 addition & 0 deletions beacon-chain/db/filesystem/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem",
visibility = ["//visibility:public"],
deps = [
"//async/event:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
Expand Down
25 changes: 22 additions & 3 deletions beacon-chain/db/filesystem/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/async/event"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
Expand Down Expand Up @@ -39,8 +40,15 @@ const (
directoryPermissions = 0700
)

// BlobStorageOption is a functional option for configuring a BlobStorage.
type BlobStorageOption func(*BlobStorage) error
type (
// BlobStorageOption is a functional option for configuring a BlobStorage.
BlobStorageOption func(*BlobStorage) error

RootIndexPair struct {
Root [fieldparams.RootLength]byte
Index uint64
}
)

// WithBasePath is a required option that sets the base path of blob storage.
func WithBasePath(base string) BlobStorageOption {
Expand Down Expand Up @@ -70,7 +78,10 @@ func WithSaveFsync(fsync bool) BlobStorageOption {
// attempt to hold a file lock to guarantee exclusive control of the blob storage directory, so this should only be
// initialized once per beacon node.
func NewBlobStorage(opts ...BlobStorageOption) (*BlobStorage, error) {
b := &BlobStorage{}
b := &BlobStorage{
DataColumnFeed: new(event.Feed),
}

for _, o := range opts {
if err := o(b); err != nil {
return nil, errors.Wrap(err, "failed to create blob storage")
Expand Down Expand Up @@ -99,6 +110,7 @@ type BlobStorage struct {
fsync bool
fs afero.Fs
pruner *blobPruner
DataColumnFeed *event.Feed
}

// WarmCache runs the prune routine with an expiration of slot of 0, so nothing will be pruned, but the pruner's cache
Expand Down Expand Up @@ -312,6 +324,13 @@ func (bs *BlobStorage) SaveDataColumn(column blocks.VerifiedRODataColumn) error
return errors.Wrap(err, "failed to rename partial file to final name")
}
partialMoved = true

// Notify the data column notifier that a new data column has been saved.
bs.DataColumnFeed.Send(RootIndexPair{
Root: column.BlockRoot(),
Index: column.ColumnIndex,
})

// TODO: Use new metrics for data columns
blobsWrittenCounter.Inc()
blobSaveLatency.Observe(float64(time.Since(startTime).Milliseconds()))
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/decode_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *Service) decodePubsubMessage(msg *pubsub.Message) (ssz.Unmarshaler, err
}

// Replaces our fork digest with the formatter.
func (_ *Service) replaceForkDigest(topic string) (string, error) {
func (*Service) replaceForkDigest(topic string) (string, error) {
subStrings := strings.Split(topic, "/")
if len(subStrings) != 4 {
return "", errInvalidTopic
Expand Down
71 changes: 45 additions & 26 deletions beacon-chain/sync/rpc_data_column_sidecars_by_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
Expand All @@ -19,7 +20,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
Expand Down Expand Up @@ -71,12 +71,13 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
return errors.Wrapf(err, "unexpected error computing min valid blob request slot, current_slot=%d", cs)
}

// Compute all custodied columns.
// Compute all custodied subnets.
custodiedSubnets := params.BeaconConfig().CustodyRequirement
if flags.Get().SubscribeToAllSubnets {
custodiedSubnets = params.BeaconConfig().DataColumnSidecarSubnetCount
}

// Compute all custodied columns.
custodiedColumns, err := peerdas.CustodyColumns(s.cfg.p2p.NodeID(), custodiedSubnets)
if err != nil {
log.WithError(err).Errorf("unexpected error retrieving the node id")
Expand All @@ -101,6 +102,11 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
"requestedCount": len(requestedColumnsList),
}).Debug("Received data column sidecar by root request")

// Subscribe to the data column feed.
rootIndexChan := make(chan filesystem.RootIndexPair)
subscription := s.cfg.blobStorage.DataColumnFeed.Subscribe(rootIndexChan)
defer subscription.Unsubscribe()

for i := range requestedColumnIdents {
if err := ctx.Err(); err != nil {
closeStream(stream, log)
Expand All @@ -121,9 +127,10 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
}

s.rateLimiter.add(stream, 1)
root, idx := bytesutil.ToBytes32(requestedColumnIdents[i].BlockRoot), requestedColumnIdents[i].Index
requestedRoot, requestedIndex := bytesutil.ToBytes32(requestedColumnIdents[i].BlockRoot), requestedColumnIdents[i].Index

isCustodied := custodiedColumns[idx]
// Decrease the peer's score if it requests a column that is not custodied.
isCustodied := custodiedColumns[requestedIndex]
if !isCustodied {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.writeErrorResponseToStream(responseCodeInvalidRequest, types.ErrInvalidColumnIndex.Error(), stream)
Expand All @@ -133,50 +140,62 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
// TODO: Differentiate between blobs and columns for our storage engine
// If the data column is nil, it means it is not yet available in the db.
// We wait for it to be available.
// TODO: Use a real feed like `nc := s.blobNotifiers.forRoot(root)` instead of this for/sleep loop looking in the DB.
var sc *eth.DataColumnSidecar

for {
sc, err = s.cfg.blobStorage.GetColumn(root, idx)
if err != nil {
if ctxErr := ctx.Err(); ctxErr != nil {
closeStream(stream, log)
return ctxErr
}
// Retrieve the data column from the database.
dataColumnSidecar, err := s.cfg.blobStorage.GetColumn(requestedRoot, requestedIndex)

if err != nil && !db.IsNotFound(err) {
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return errors.Wrap(err, "get column")
}

if err != nil && db.IsNotFound(err) {
fields := logrus.Fields{
"root": fmt.Sprintf("%#x", requestedRoot),
"index": requestedIndex,
}

if db.IsNotFound(err) {
fields := logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"index": idx,
log.WithFields(fields).Debug("Peer requested data column sidecar by root not found in db, waiting for it to be available")

loop:
for {
select {
case receivedRootIndex := <-rootIndexChan:
if receivedRootIndex.Root == requestedRoot && receivedRootIndex.Index == requestedIndex {
// This is the data column we are looking for.
log.WithFields(fields).Debug("Data column sidecar by root is now available in the db")

break loop
}

log.WithFields(fields).Debugf("Peer requested data column sidecar by root not found in db, waiting for it to be available")
time.Sleep(100 * time.Millisecond) // My heart is crying
continue
case <-ctx.Done():
closeStream(stream, log)
return errors.Errorf("context closed while waiting for data column with root %#x and index %d", requestedRoot, requestedIndex)
}
}

log.WithError(err).Errorf("unexpected db error retrieving data column, root=%x, index=%d", root, idx)
// Retrieve the data column from the db.
dataColumnSidecar, err = s.cfg.blobStorage.GetColumn(requestedRoot, requestedIndex)
if err != nil {
// This time, no error (even not found error) should be returned.
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)

return errors.Wrap(err, "get column")
}

break
}

// If any root in the request content references a block earlier than minimum_request_epoch,
// peers MAY respond with error code 3: ResourceUnavailable or not include the data column in the response.
// note: we are deviating from the spec to allow requests for data column that are before minimum_request_epoch,
// up to the beginning of the retention period.
if sc.SignedBlockHeader.Header.Slot < minReqSlot {
if dataColumnSidecar.SignedBlockHeader.Header.Slot < minReqSlot {
s.writeErrorResponseToStream(responseCodeResourceUnavailable, types.ErrDataColumnLTMinRequest.Error(), stream)
log.WithError(types.ErrDataColumnLTMinRequest).
Debugf("requested data column for block %#x before minimum_request_epoch", requestedColumnIdents[i].BlockRoot)
return types.ErrDataColumnLTMinRequest
}

SetStreamWriteDeadline(stream, defaultWriteDuration)
if chunkErr := WriteDataColumnSidecarChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), sc); chunkErr != nil {
if chunkErr := WriteDataColumnSidecarChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), dataColumnSidecar); chunkErr != nil {
log.WithError(chunkErr).Debug("Could not send a chunked response")
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
tracing.AnnotateError(span, chunkErr)
Expand Down
6 changes: 3 additions & 3 deletions beacon-chain/sync/subscriber_beacon_attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ func (s *Service) committeeIndexBeaconAttestationSubscriber(_ context.Context, m
return s.cfg.attPool.SaveUnaggregatedAttestation(a)
}

func (_ *Service) persistentSubnetIndices() []uint64 {
func (*Service) persistentSubnetIndices() []uint64 {
return cache.SubnetIDs.GetAllSubnets()
}

func (_ *Service) aggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 {
func (*Service) aggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 {
endEpoch := slots.ToEpoch(currentSlot) + 1
endSlot := params.BeaconConfig().SlotsPerEpoch.Mul(uint64(endEpoch))
var commIds []uint64
Expand All @@ -50,7 +50,7 @@ func (_ *Service) aggregatorSubnetIndices(currentSlot primitives.Slot) []uint64
return slice.SetUint64(commIds)
}

func (_ *Service) attesterSubnetIndices(currentSlot primitives.Slot) []uint64 {
func (*Service) attesterSubnetIndices(currentSlot primitives.Slot) []uint64 {
endEpoch := slots.ToEpoch(currentSlot) + 1
endSlot := params.BeaconConfig().SlotsPerEpoch.Mul(uint64(endEpoch))
var commIds []uint64
Expand Down

0 comments on commit 54f2d91

Please sign in to comment.