Skip to content

Commit

Permalink
Address Nishant's comment.
Browse files Browse the repository at this point in the history
  • Loading branch information
nalepae committed May 31, 2024
1 parent a14700e commit 7d25e08
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 88 deletions.
1 change: 1 addition & 0 deletions beacon-chain/db/filesystem/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem",
visibility = ["//visibility:public"],
deps = [
"//async/event:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
Expand Down
103 changes: 21 additions & 82 deletions beacon-chain/db/filesystem/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"path"
"strconv"
"strings"
"sync"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/async/event"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
Expand Down Expand Up @@ -40,8 +40,15 @@ const (
directoryPermissions = 0700
)

// BlobStorageOption is a functional option for configuring a BlobStorage.
type BlobStorageOption func(*BlobStorage) error
type (
// BlobStorageOption is a functional option for configuring a BlobStorage.
BlobStorageOption func(*BlobStorage) error

RootIndexPair struct {
Root [fieldparams.RootLength]byte
Index uint64
}
)

// WithBasePath is a required option that sets the base path of blob storage.
func WithBasePath(base string) BlobStorageOption {
Expand Down Expand Up @@ -72,7 +79,7 @@ func WithSaveFsync(fsync bool) BlobStorageOption {
// initialized once per beacon node.
func NewBlobStorage(opts ...BlobStorageOption) (*BlobStorage, error) {
b := &BlobStorage{
DataColumnNotifier: newDataColumnNotifier(),
DataColumnFeed: new(event.Feed),
}

for _, o := range opts {
Expand All @@ -96,85 +103,14 @@ func NewBlobStorage(opts ...BlobStorageOption) (*BlobStorage, error) {
return b, nil
}

// dataColumnNotifier is a thread-safe map of notifiers for notifying when a data column is saved into the dababase.
type dataColumnNotifier struct {
mut sync.Mutex
notifiers map[[fieldparams.RootLength]byte]chan uint64
seenIndex map[[fieldparams.RootLength]byte][fieldparams.NumberOfColumns]bool
}

// notifyIndex must be called when a data column is saved into the database.
// It will send a notification to the channel for the given root and index.
func (dcn *dataColumnNotifier) notifyRootIndex(root [fieldparams.RootLength]byte, idx uint64) {
dcn.mut.Lock()

// Retrieve the channel for the given root.
channel, ok := dcn.notifiers[root]

// If no notifiers exist, it means nobody is listening for this root/index pair.
// Exit early.
if !ok {
dcn.mut.Unlock()
return
}

seen, ok := dcn.seenIndex[root]
if ok && seen[idx] {
// If this root/index pair has already been seen, do not notify again.
dcn.mut.Unlock()
return
}

// Mark the index as seen.
seen[idx] = true
dcn.seenIndex[root] = seen

dcn.mut.Unlock()

// Send the index to the channel.
channel <- idx
}

// ForRoot returns a channel for notifying when a data colum is saved for a given root.
// When finished, the caller must call Delete to clean up the channel, else it will leak.
func (dcn *dataColumnNotifier) ForRoot(root [fieldparams.RootLength]byte) chan uint64 {
dcn.mut.Lock()
defer dcn.mut.Unlock()

channel, ok := dcn.notifiers[root]
if !ok {
// If the channel does not exist, create a new one.
channel = make(chan uint64, fieldparams.NumberOfColumns)
dcn.notifiers[root] = channel
}

return channel
}

// Delete removes the channel for the given root.
func (dcn *dataColumnNotifier) Delete(root [fieldparams.RootLength]byte) {
dcn.mut.Lock()
defer dcn.mut.Unlock()

delete(dcn.seenIndex, root)
delete(dcn.notifiers, root)
}

func newDataColumnNotifier() *dataColumnNotifier {
return &dataColumnNotifier{
notifiers: make(map[[fieldparams.RootLength]byte]chan uint64),
seenIndex: make(map[[fieldparams.RootLength]byte][fieldparams.NumberOfColumns]bool),
}
}

// BlobStorage is the concrete implementation of the filesystem backend for saving and retrieving BlobSidecars.
type BlobStorage struct {
base string
retentionEpochs primitives.Epoch
fsync bool
fs afero.Fs
pruner *blobPruner
DataColumnNotifier *dataColumnNotifier
base string
retentionEpochs primitives.Epoch
fsync bool
fs afero.Fs
pruner *blobPruner
DataColumnFeed *event.Feed
}

// WarmCache runs the prune routine with an expiration of slot of 0, so nothing will be pruned, but the pruner's cache
Expand Down Expand Up @@ -390,7 +326,10 @@ func (bs *BlobStorage) SaveDataColumn(column blocks.VerifiedRODataColumn) error
partialMoved = true

// Notify the data column notifier that a new data column has been saved.
bs.DataColumnNotifier.notifyRootIndex(column.BlockRoot(), column.ColumnIndex)
bs.DataColumnFeed.Send(RootIndexPair{
Root: column.BlockRoot(),
Index: column.ColumnIndex,
})

// TODO: Use new metrics for data columns
blobsWrittenCounter.Inc()
Expand Down
14 changes: 8 additions & 6 deletions beacon-chain/sync/rpc_data_column_sidecars_by_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
Expand Down Expand Up @@ -101,6 +102,11 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
"requestedCount": len(requestedColumnsList),
}).Debug("Received data column sidecar by root request")

// Subscribe to the data column feed.
rootIndexChan := make(chan filesystem.RootIndexPair)
subscription := s.cfg.blobStorage.DataColumnFeed.Subscribe(rootIndexChan)
defer subscription.Unsubscribe()

for i := range requestedColumnIdents {
if err := ctx.Err(); err != nil {
closeStream(stream, log)
Expand Down Expand Up @@ -135,10 +141,6 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
// If the data column is nil, it means it is not yet available in the db.
// We wait for it to be available.

// Subscribe to the data column channel for this root.
rootChannel := s.cfg.blobStorage.DataColumnNotifier.ForRoot(requestedRoot)
defer s.cfg.blobStorage.DataColumnNotifier.Delete(requestedRoot)

// Retrieve the data column from the database.
dataColumnSidecar, err := s.cfg.blobStorage.GetColumn(requestedRoot, requestedIndex)

Expand All @@ -158,8 +160,8 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
loop:
for {
select {
case receivedIndex := <-rootChannel:
if receivedIndex == requestedIndex {
case receivedRootIndex := <-rootIndexChan:
if receivedRootIndex.Root == requestedRoot && receivedRootIndex.Index == requestedIndex {
// This is the data column we are looking for.
log.WithFields(fields).Debug("Data column sidecar by root is now available in the db")

Expand Down

0 comments on commit 7d25e08

Please sign in to comment.