Skip to content

Commit

Permalink
dataColumnSidecarByRootRPCHandler: Remove ugly `time.Sleep(100 * ti…
Browse files Browse the repository at this point in the history
…me.Millisecond)`.
  • Loading branch information
nalepae committed May 30, 2024
1 parent 6cac46c commit a14700e
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 26 deletions.
1 change: 1 addition & 0 deletions beacon-chain/db/filesystem/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func (dcn *dataColumnNotifier) ForRoot(root [fieldparams.RootLength]byte) chan u
return channel
}

// Delete removes the channel for the given root.
func (dcn *dataColumnNotifier) Delete(root [fieldparams.RootLength]byte) {
dcn.mut.Lock()
defer dcn.mut.Unlock()
Expand Down
69 changes: 43 additions & 26 deletions beacon-chain/sync/rpc_data_column_sidecars_by_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
Expand Down Expand Up @@ -71,12 +70,13 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
return errors.Wrapf(err, "unexpected error computing min valid blob request slot, current_slot=%d", cs)
}

// Compute all custodied columns.
// Compute all custodied subnets.
custodiedSubnets := params.BeaconConfig().CustodyRequirement
if flags.Get().SubscribeToAllSubnets {
custodiedSubnets = params.BeaconConfig().DataColumnSidecarSubnetCount
}

// Compute all custodied columns.
custodiedColumns, err := peerdas.CustodyColumns(s.cfg.p2p.NodeID(), custodiedSubnets)
if err != nil {
log.WithError(err).Errorf("unexpected error retrieving the node id")
Expand Down Expand Up @@ -121,9 +121,10 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
}

s.rateLimiter.add(stream, 1)
root, idx := bytesutil.ToBytes32(requestedColumnIdents[i].BlockRoot), requestedColumnIdents[i].Index
requestedRoot, requestedIndex := bytesutil.ToBytes32(requestedColumnIdents[i].BlockRoot), requestedColumnIdents[i].Index

isCustodied := custodiedColumns[idx]
// Decrease the peer's score if it requests a column that is not custodied.
isCustodied := custodiedColumns[requestedIndex]
if !isCustodied {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.writeErrorResponseToStream(responseCodeInvalidRequest, types.ErrInvalidColumnIndex.Error(), stream)
Expand All @@ -133,50 +134,66 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
// TODO: Differentiate between blobs and columns for our storage engine
// If the data column is nil, it means it is not yet available in the db.
// We wait for it to be available.
// TODO: Use a real feed like `nc := s.blobNotifiers.forRoot(root)` instead of this for/sleep loop looking in the DB.
var sc *eth.DataColumnSidecar

for {
sc, err = s.cfg.blobStorage.GetColumn(root, idx)
if err != nil {
if ctxErr := ctx.Err(); ctxErr != nil {
closeStream(stream, log)
return ctxErr
}
// Subscribe to the data column channel for this root.
rootChannel := s.cfg.blobStorage.DataColumnNotifier.ForRoot(requestedRoot)
defer s.cfg.blobStorage.DataColumnNotifier.Delete(requestedRoot)

// Retrieve the data column from the database.
dataColumnSidecar, err := s.cfg.blobStorage.GetColumn(requestedRoot, requestedIndex)

if err != nil && !db.IsNotFound(err) {
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return errors.Wrap(err, "get column")
}

if err != nil && db.IsNotFound(err) {
fields := logrus.Fields{
"root": fmt.Sprintf("%#x", requestedRoot),
"index": requestedIndex,
}

if db.IsNotFound(err) {
fields := logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"index": idx,
log.WithFields(fields).Debug("Peer requested data column sidecar by root not found in db, waiting for it to be available")

loop:
for {
select {
case receivedIndex := <-rootChannel:
if receivedIndex == requestedIndex {
// This is the data column we are looking for.
log.WithFields(fields).Debug("Data column sidecar by root is now available in the db")

break loop
}

log.WithFields(fields).Debugf("Peer requested data column sidecar by root not found in db, waiting for it to be available")
time.Sleep(100 * time.Millisecond) // My heart is crying
continue
case <-ctx.Done():
closeStream(stream, log)
return errors.Errorf("context closed while waiting for data column with root %#x and index %d", requestedRoot, requestedIndex)
}
}

log.WithError(err).Errorf("unexpected db error retrieving data column, root=%x, index=%d", root, idx)
// Retrieve the data column from the db.
dataColumnSidecar, err = s.cfg.blobStorage.GetColumn(requestedRoot, requestedIndex)
if err != nil {
// This time, no error (even not found error) should be returned.
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)

return errors.Wrap(err, "get column")
}

break
}

// If any root in the request content references a block earlier than minimum_request_epoch,
// peers MAY respond with error code 3: ResourceUnavailable or not include the data column in the response.
// note: we are deviating from the spec to allow requests for data column that are before minimum_request_epoch,
// up to the beginning of the retention period.
if sc.SignedBlockHeader.Header.Slot < minReqSlot {
if dataColumnSidecar.SignedBlockHeader.Header.Slot < minReqSlot {
s.writeErrorResponseToStream(responseCodeResourceUnavailable, types.ErrDataColumnLTMinRequest.Error(), stream)
log.WithError(types.ErrDataColumnLTMinRequest).
Debugf("requested data column for block %#x before minimum_request_epoch", requestedColumnIdents[i].BlockRoot)
return types.ErrDataColumnLTMinRequest
}

SetStreamWriteDeadline(stream, defaultWriteDuration)
if chunkErr := WriteDataColumnSidecarChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), sc); chunkErr != nil {
if chunkErr := WriteDataColumnSidecarChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), dataColumnSidecar); chunkErr != nil {
log.WithError(chunkErr).Debug("Could not send a chunked response")
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
tracing.AnnotateError(span, chunkErr)
Expand Down

0 comments on commit a14700e

Please sign in to comment.