Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PeerDAS: Implement / use data column feed from database. #14062

Merged
merged 4 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions beacon-chain/db/filesystem/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem",
visibility = ["//visibility:public"],
deps = [
"//async/event:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
Expand Down
25 changes: 22 additions & 3 deletions beacon-chain/db/filesystem/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/async/event"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
Expand Down Expand Up @@ -39,8 +40,15 @@ const (
directoryPermissions = 0700
)

// BlobStorageOption is a functional option for configuring a BlobStorage.
type BlobStorageOption func(*BlobStorage) error
type (
// BlobStorageOption is a functional option for configuring a BlobStorage.
BlobStorageOption func(*BlobStorage) error

RootIndexPair struct {
Root [fieldparams.RootLength]byte
Index uint64
}
)

// WithBasePath is a required option that sets the base path of blob storage.
func WithBasePath(base string) BlobStorageOption {
Expand Down Expand Up @@ -70,7 +78,10 @@ func WithSaveFsync(fsync bool) BlobStorageOption {
// attempt to hold a file lock to guarantee exclusive control of the blob storage directory, so this should only be
// initialized once per beacon node.
func NewBlobStorage(opts ...BlobStorageOption) (*BlobStorage, error) {
b := &BlobStorage{}
b := &BlobStorage{
DataColumnFeed: new(event.Feed),
}

for _, o := range opts {
if err := o(b); err != nil {
return nil, errors.Wrap(err, "failed to create blob storage")
Expand Down Expand Up @@ -99,6 +110,7 @@ type BlobStorage struct {
fsync bool
fs afero.Fs
pruner *blobPruner
DataColumnFeed *event.Feed
}

// WarmCache runs the prune routine with an expiration of slot of 0, so nothing will be pruned, but the pruner's cache
Expand Down Expand Up @@ -312,6 +324,13 @@ func (bs *BlobStorage) SaveDataColumn(column blocks.VerifiedRODataColumn) error
return errors.Wrap(err, "failed to rename partial file to final name")
}
partialMoved = true

// Notify the data column notifier that a new data column has been saved.
bs.DataColumnFeed.Send(RootIndexPair{
Root: column.BlockRoot(),
Index: column.ColumnIndex,
})

// TODO: Use new metrics for data columns
blobsWrittenCounter.Inc()
blobSaveLatency.Observe(float64(time.Since(startTime).Milliseconds()))
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/decode_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *Service) decodePubsubMessage(msg *pubsub.Message) (ssz.Unmarshaler, err
}

// Replaces our fork digest with the formatter.
func (_ *Service) replaceForkDigest(topic string) (string, error) {
func (*Service) replaceForkDigest(topic string) (string, error) {
subStrings := strings.Split(topic, "/")
if len(subStrings) != 4 {
return "", errInvalidTopic
Expand Down
71 changes: 45 additions & 26 deletions beacon-chain/sync/rpc_data_column_sidecars_by_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
Expand All @@ -19,7 +20,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
Expand Down Expand Up @@ -71,12 +71,13 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
return errors.Wrapf(err, "unexpected error computing min valid blob request slot, current_slot=%d", cs)
}

// Compute all custodied columns.
// Compute all custodied subnets.
custodiedSubnets := params.BeaconConfig().CustodyRequirement
if flags.Get().SubscribeToAllSubnets {
custodiedSubnets = params.BeaconConfig().DataColumnSidecarSubnetCount
}

// Compute all custodied columns.
custodiedColumns, err := peerdas.CustodyColumns(s.cfg.p2p.NodeID(), custodiedSubnets)
if err != nil {
log.WithError(err).Errorf("unexpected error retrieving the node id")
Expand All @@ -101,6 +102,11 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
"requestedCount": len(requestedColumnsList),
}).Debug("Received data column sidecar by root request")

// Subscribe to the data column feed.
rootIndexChan := make(chan filesystem.RootIndexPair)
subscription := s.cfg.blobStorage.DataColumnFeed.Subscribe(rootIndexChan)
defer subscription.Unsubscribe()

for i := range requestedColumnIdents {
if err := ctx.Err(); err != nil {
closeStream(stream, log)
Expand All @@ -121,9 +127,10 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
}

s.rateLimiter.add(stream, 1)
root, idx := bytesutil.ToBytes32(requestedColumnIdents[i].BlockRoot), requestedColumnIdents[i].Index
requestedRoot, requestedIndex := bytesutil.ToBytes32(requestedColumnIdents[i].BlockRoot), requestedColumnIdents[i].Index

isCustodied := custodiedColumns[idx]
// Decrease the peer's score if it requests a column that is not custodied.
isCustodied := custodiedColumns[requestedIndex]
if !isCustodied {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.writeErrorResponseToStream(responseCodeInvalidRequest, types.ErrInvalidColumnIndex.Error(), stream)
Expand All @@ -133,50 +140,62 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
// TODO: Differentiate between blobs and columns for our storage engine
// If the data column is nil, it means it is not yet available in the db.
// We wait for it to be available.
// TODO: Use a real feed like `nc := s.blobNotifiers.forRoot(root)` instead of this for/sleep loop looking in the DB.
var sc *eth.DataColumnSidecar

for {
sc, err = s.cfg.blobStorage.GetColumn(root, idx)
if err != nil {
if ctxErr := ctx.Err(); ctxErr != nil {
closeStream(stream, log)
return ctxErr
}
// Retrieve the data column from the database.
dataColumnSidecar, err := s.cfg.blobStorage.GetColumn(requestedRoot, requestedIndex)

if err != nil && !db.IsNotFound(err) {
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return errors.Wrap(err, "get column")
}

if err != nil && db.IsNotFound(err) {
fields := logrus.Fields{
"root": fmt.Sprintf("%#x", requestedRoot),
"index": requestedIndex,
}

if db.IsNotFound(err) {
fields := logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"index": idx,
log.WithFields(fields).Debug("Peer requested data column sidecar by root not found in db, waiting for it to be available")

loop:
for {
select {
case receivedRootIndex := <-rootIndexChan:
if receivedRootIndex.Root == requestedRoot && receivedRootIndex.Index == requestedIndex {
// This is the data column we are looking for.
log.WithFields(fields).Debug("Data column sidecar by root is now available in the db")

break loop
}

log.WithFields(fields).Debugf("Peer requested data column sidecar by root not found in db, waiting for it to be available")
time.Sleep(100 * time.Millisecond) // My heart is crying
continue
case <-ctx.Done():
closeStream(stream, log)
return errors.Errorf("context closed while waiting for data column with root %#x and index %d", requestedRoot, requestedIndex)
}
}

log.WithError(err).Errorf("unexpected db error retrieving data column, root=%x, index=%d", root, idx)
// Retrieve the data column from the db.
dataColumnSidecar, err = s.cfg.blobStorage.GetColumn(requestedRoot, requestedIndex)
if err != nil {
// This time, no error (even not found error) should be returned.
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)

return errors.Wrap(err, "get column")
}

break
}

// If any root in the request content references a block earlier than minimum_request_epoch,
// peers MAY respond with error code 3: ResourceUnavailable or not include the data column in the response.
// note: we are deviating from the spec to allow requests for data column that are before minimum_request_epoch,
// up to the beginning of the retention period.
if sc.SignedBlockHeader.Header.Slot < minReqSlot {
if dataColumnSidecar.SignedBlockHeader.Header.Slot < minReqSlot {
s.writeErrorResponseToStream(responseCodeResourceUnavailable, types.ErrDataColumnLTMinRequest.Error(), stream)
log.WithError(types.ErrDataColumnLTMinRequest).
Debugf("requested data column for block %#x before minimum_request_epoch", requestedColumnIdents[i].BlockRoot)
return types.ErrDataColumnLTMinRequest
}

SetStreamWriteDeadline(stream, defaultWriteDuration)
if chunkErr := WriteDataColumnSidecarChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), sc); chunkErr != nil {
if chunkErr := WriteDataColumnSidecarChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), dataColumnSidecar); chunkErr != nil {
log.WithError(chunkErr).Debug("Could not send a chunked response")
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
tracing.AnnotateError(span, chunkErr)
Expand Down
6 changes: 3 additions & 3 deletions beacon-chain/sync/subscriber_beacon_attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ func (s *Service) committeeIndexBeaconAttestationSubscriber(_ context.Context, m
return s.cfg.attPool.SaveUnaggregatedAttestation(a)
}

func (_ *Service) persistentSubnetIndices() []uint64 {
func (*Service) persistentSubnetIndices() []uint64 {
return cache.SubnetIDs.GetAllSubnets()
}

func (_ *Service) aggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 {
func (*Service) aggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 {
endEpoch := slots.ToEpoch(currentSlot) + 1
endSlot := params.BeaconConfig().SlotsPerEpoch.Mul(uint64(endEpoch))
var commIds []uint64
Expand All @@ -50,7 +50,7 @@ func (_ *Service) aggregatorSubnetIndices(currentSlot primitives.Slot) []uint64
return slice.SetUint64(commIds)
}

func (_ *Service) attesterSubnetIndices(currentSlot primitives.Slot) []uint64 {
func (*Service) attesterSubnetIndices(currentSlot primitives.Slot) []uint64 {
endEpoch := slots.ToEpoch(currentSlot) + 1
endSlot := params.BeaconConfig().SlotsPerEpoch.Mul(uint64(endEpoch))
var commIds []uint64
Expand Down
Loading