diff --git a/beacon-chain/blockchain/BUILD.bazel b/beacon-chain/blockchain/BUILD.bazel index 336ead350cba..b56e13856dfb 100644 --- a/beacon-chain/blockchain/BUILD.bazel +++ b/beacon-chain/blockchain/BUILD.bazel @@ -25,7 +25,7 @@ go_library( "receive_attestation.go", "receive_blob.go", "receive_block.go", - "receive_sidecar.go", + "receive_data_column.go", "service.go", "tracked_proposer.go", "weak_subjectivity_checks.go", @@ -49,6 +49,7 @@ go_library( "//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/core/light-client:go_default_library", + "//beacon-chain/core/peerdas:go_default_library", "//beacon-chain/core/signing:go_default_library", "//beacon-chain/core/time:go_default_library", "//beacon-chain/core/transition:go_default_library", @@ -158,6 +159,7 @@ go_test( "//beacon-chain/operations/slashings:go_default_library", "//beacon-chain/operations/voluntaryexits:go_default_library", "//beacon-chain/p2p:go_default_library", + "//beacon-chain/p2p/testing:go_default_library", "//beacon-chain/startup:go_default_library", "//beacon-chain/state:go_default_library", "//beacon-chain/state/state-native:go_default_library", diff --git a/beacon-chain/blockchain/error.go b/beacon-chain/blockchain/error.go index 87ed0d2416db..04b13012fcc6 100644 --- a/beacon-chain/blockchain/error.go +++ b/beacon-chain/blockchain/error.go @@ -33,6 +33,7 @@ var ( ) var errMaxBlobsExceeded = errors.New("Expected commitments in block exceeds MAX_BLOBS_PER_BLOCK") +var errMaxDataColumnsExceeded = errors.New("Expected data columns for node exceeds NUMBER_OF_COLUMNS") // An invalid block is the block that fails state transition based on the core protocol rules. // The beacon node shall not be accepting nor building blocks that branch off from an invalid block. diff --git a/beacon-chain/blockchain/options.go b/beacon-chain/blockchain/options.go index 38492502a1f9..f215c470c62d 100644 --- a/beacon-chain/blockchain/options.go +++ b/beacon-chain/blockchain/options.go @@ -118,9 +118,9 @@ func WithBLSToExecPool(p blstoexec.PoolManager) Option { } // WithP2PBroadcaster to broadcast messages after appropriate processing. -func WithP2PBroadcaster(p p2p.Broadcaster) Option { +func WithP2PBroadcaster(p p2p.Acceser) Option { return func(s *Service) error { - s.cfg.P2p = p + s.cfg.P2P = p return nil } } diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index 425da163ecd0..0c4811b54701 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -10,6 +10,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed" statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition" "github.com/prysmaticlabs/prysm/v5/beacon-chain/das" @@ -513,12 +514,35 @@ func missingIndices(bs *filesystem.BlobStorage, root [32]byte, expected [][]byte return missing, nil } +func missingDataColumns(bs *filesystem.BlobStorage, root [32]byte, expected map[uint64]bool) (map[uint64]bool, error) { + if len(expected) == 0 { + return nil, nil + } + if len(expected) > int(params.BeaconConfig().NumberOfColumns) { + return nil, errMaxDataColumnsExceeded + } + indices, err := bs.ColumnIndices(root) + if err != nil { + return nil, err + } + missing := make(map[uint64]bool, len(expected)) + for col := range expected { + if !indices[col] { + missing[col] = true + } + } + return missing, nil +} + // isDataAvailable blocks until all BlobSidecars committed to in the block are available, // or an error or context cancellation occurs. A nil result means that the data availability check is successful. // The function will first check the database to see if all sidecars have been persisted. If any // sidecars are missing, it will then read from the blobNotifier channel for the given root until the channel is // closed, the context hits cancellation/timeout, or notifications have been received for all the missing sidecars. func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed interfaces.ReadOnlySignedBeaconBlock) error { + if features.Get().EnablePeerDAS { + return s.isDataAvailableDataColumns(ctx, root, signed) + } if signed.Version() < version.Deneb { return nil } @@ -590,6 +614,86 @@ func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed int } } +func (s *Service) isDataAvailableDataColumns(ctx context.Context, root [32]byte, signed interfaces.ReadOnlySignedBeaconBlock) error { + if signed.Version() < version.Deneb { + return nil + } + + block := signed.Block() + if block == nil { + return errors.New("invalid nil beacon block") + } + // We are only required to check within MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS + if !params.WithinDAPeriod(slots.ToEpoch(block.Slot()), slots.ToEpoch(s.CurrentSlot())) { + return nil + } + body := block.Body() + if body == nil { + return errors.New("invalid nil beacon block body") + } + kzgCommitments, err := body.BlobKzgCommitments() + if err != nil { + return errors.Wrap(err, "could not get KZG commitments") + } + // If block has not commitments there is nothing to wait for. + if len(kzgCommitments) == 0 { + return nil + } + + colMap, err := peerdas.CustodyColumns(s.cfg.P2P.NodeID(), params.BeaconConfig().CustodyRequirement) + if err != nil { + return err + } + // expected is the number of custodied data columnns a node is expected to have. + expected := len(colMap) + if expected == 0 { + return nil + } + // get a map of data column indices that are not currently available. + missing, err := missingDataColumns(s.blobStorage, root, colMap) + if err != nil { + return err + } + // If there are no missing indices, all data column sidecars are available. + if len(missing) == 0 { + return nil + } + + // The gossip handler for data columns writes the index of each verified data column referencing the given + // root to the channel returned by blobNotifiers.forRoot. + nc := s.blobNotifiers.forRoot(root) + + // Log for DA checks that cross over into the next slot; helpful for debugging. + nextSlot := slots.BeginsAt(signed.Block().Slot()+1, s.genesisTime) + // Avoid logging if DA check is called after next slot start. + if nextSlot.After(time.Now()) { + nst := time.AfterFunc(time.Until(nextSlot), func() { + if len(missing) == 0 { + return + } + log.WithFields(daCheckLogFields(root, signed.Block().Slot(), expected, len(missing))). + Error("Still waiting for DA check at slot end.") + }) + defer nst.Stop() + } + for { + select { + case idx := <-nc: + // Delete each index seen in the notification channel. + delete(missing, idx) + // Read from the channel until there are no more missing sidecars. + if len(missing) > 0 { + continue + } + // Once all sidecars have been observed, clean up the notification channel. + s.blobNotifiers.delete(root) + return nil + case <-ctx.Done(): + return errors.Wrapf(ctx.Err(), "context deadline waiting for blob sidecars slot: %d, BlockRoot: %#x", block.Slot(), root) + } + } +} + func daCheckLogFields(root [32]byte, slot primitives.Slot, expected, missing int) logrus.Fields { return logrus.Fields{ "slot": slot, diff --git a/beacon-chain/blockchain/receive_data_column.go b/beacon-chain/blockchain/receive_data_column.go new file mode 100644 index 000000000000..40973dc7b085 --- /dev/null +++ b/beacon-chain/blockchain/receive_data_column.go @@ -0,0 +1,18 @@ +package blockchain + +import ( + "context" + + "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" +) + +func (s *Service) ReceiveDataColumn(ctx context.Context, ds blocks.VerifiedRODataColumn) error { + if err := s.blobStorage.SaveDataColumn(ds); err != nil { + return err + } + + // TODO use a custom event or new method of for data columns. For speed + // we are simply reusing blob paths here. + s.sendNewBlobEvent(ds.BlockRoot(), uint64(ds.SignedBlockHeader.Header.Slot)) + return nil +} diff --git a/beacon-chain/blockchain/receive_sidecar.go b/beacon-chain/blockchain/receive_sidecar.go deleted file mode 100644 index 8bdeaf4705ec..000000000000 --- a/beacon-chain/blockchain/receive_sidecar.go +++ /dev/null @@ -1,12 +0,0 @@ -package blockchain - -import ( - "context" - - "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" -) - -func (s *Service) ReceiveDataColumn(ctx context.Context, verifiedRODataColumn blocks.VerifiedRODataColumn) error { - // TODO - return nil -} diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index 23106e032a79..b4658440e698 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -81,7 +81,7 @@ type config struct { ExitPool voluntaryexits.PoolManager SlashingPool slashings.PoolManager BLSToExecPool blstoexec.PoolManager - P2p p2p.Broadcaster + P2P p2p.Acceser MaxRoutines int StateNotifier statefeed.Notifier ForkChoiceStore f.ForkChoicer diff --git a/beacon-chain/blockchain/service_test.go b/beacon-chain/blockchain/service_test.go index a2bf18ac1485..fd6b6afc607f 100644 --- a/beacon-chain/blockchain/service_test.go +++ b/beacon-chain/blockchain/service_test.go @@ -97,7 +97,7 @@ func setupBeaconChain(t *testing.T, beaconDB db.Database) *Service { WithAttestationPool(attestations.NewPool()), WithSlashingPool(slashings.NewPool()), WithExitPool(voluntaryexits.NewPool()), - WithP2PBroadcaster(&mockBroadcaster{}), + WithP2PBroadcaster(&mockAccesser{}), WithStateNotifier(&mockBeaconNode{}), WithForkChoiceStore(fc), WithAttestationService(attService), diff --git a/beacon-chain/blockchain/setup_test.go b/beacon-chain/blockchain/setup_test.go index 3069029a4d6d..8728b8bdfc32 100644 --- a/beacon-chain/blockchain/setup_test.go +++ b/beacon-chain/blockchain/setup_test.go @@ -19,6 +19,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/attestations" "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/blstoexec" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" + p2pTesting "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing" "github.com/prysmaticlabs/prysm/v5/beacon-chain/startup" "github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" @@ -45,6 +46,11 @@ type mockBroadcaster struct { broadcastCalled bool } +type mockAccesser struct { + mockBroadcaster + p2pTesting.MockPeerManager +} + func (mb *mockBroadcaster) Broadcast(_ context.Context, _ proto.Message) error { mb.broadcastCalled = true return nil diff --git a/beacon-chain/db/filesystem/blob.go b/beacon-chain/db/filesystem/blob.go index f5ed34626918..b1ca3cffd4ea 100644 --- a/beacon-chain/db/filesystem/blob.go +++ b/beacon-chain/db/filesystem/blob.go @@ -221,6 +221,101 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error { return nil } +// SaveDataColumn saves a data column to our local filesystem. +func (bs *BlobStorage) SaveDataColumn(column blocks.VerifiedRODataColumn) error { + startTime := time.Now() + fname := namerForDataColumn(column) + sszPath := fname.path() + exists, err := afero.Exists(bs.fs, sszPath) + if err != nil { + return err + } + if exists { + log.Debug("Ignoring a duplicate data column sidecar save attempt") + return nil + } + if bs.pruner != nil { + hRoot, err := column.SignedBlockHeader.Header.HashTreeRoot() + if err != nil { + return err + } + if err := bs.pruner.notify(hRoot, column.SignedBlockHeader.Header.Slot, column.ColumnIndex); err != nil { + return errors.Wrapf(err, "problem maintaining pruning cache/metrics for sidecar with root=%#x", hRoot) + } + } + + // Serialize the ethpb.DataColumnSidecar to binary data using SSZ. + sidecarData, err := column.MarshalSSZ() + if err != nil { + return errors.Wrap(err, "failed to serialize sidecar data") + } else if len(sidecarData) == 0 { + return errSidecarEmptySSZData + } + + if err := bs.fs.MkdirAll(fname.dir(), directoryPermissions); err != nil { + return err + } + partPath := fname.partPath(fmt.Sprintf("%p", sidecarData)) + + partialMoved := false + // Ensure the partial file is deleted. + defer func() { + if partialMoved { + return + } + // It's expected to error if the save is successful. + err = bs.fs.Remove(partPath) + if err == nil { + log.WithFields(logrus.Fields{ + "partPath": partPath, + }).Debugf("Removed partial file") + } + }() + + // Create a partial file and write the serialized data to it. + partialFile, err := bs.fs.Create(partPath) + if err != nil { + return errors.Wrap(err, "failed to create partial file") + } + + n, err := partialFile.Write(sidecarData) + if err != nil { + closeErr := partialFile.Close() + if closeErr != nil { + return closeErr + } + return errors.Wrap(err, "failed to write to partial file") + } + if bs.fsync { + if err := partialFile.Sync(); err != nil { + return err + } + } + + if err := partialFile.Close(); err != nil { + return err + } + + if n != len(sidecarData) { + return fmt.Errorf("failed to write the full bytes of sidecarData, wrote only %d of %d bytes", n, len(sidecarData)) + } + + if n == 0 { + return errEmptyBlobWritten + } + + // Atomically rename the partial file to its final name. + err = bs.fs.Rename(partPath, sszPath) + if err != nil { + return errors.Wrap(err, "failed to rename partial file to final name") + } + partialMoved = true + // TODO: Use new metrics for data columns + blobsWrittenCounter.Inc() + blobSaveLatency.Observe(float64(time.Since(startTime).Milliseconds())) + return nil +} + // Get retrieves a single BlobSidecar by its root and index. // Since BlobStorage only writes blobs that have undergone full verification, the return // value is always a VerifiedROBlob. @@ -303,6 +398,41 @@ func (bs *BlobStorage) Indices(root [32]byte) ([fieldparams.MaxBlobsPerBlock]boo return mask, nil } +// ColumnIndices retrieve the stored column indexes from our filesystem. +func (bs *BlobStorage) ColumnIndices(root [32]byte) ([fieldparams.NumberOfColumns]bool, error) { + var mask [fieldparams.NumberOfColumns]bool + rootDir := blobNamer{root: root}.dir() + entries, err := afero.ReadDir(bs.fs, rootDir) + if err != nil { + if os.IsNotExist(err) { + return mask, nil + } + return mask, err + } + for i := range entries { + if entries[i].IsDir() { + continue + } + name := entries[i].Name() + if !strings.HasSuffix(name, sszExt) { + continue + } + parts := strings.Split(name, ".") + if len(parts) != 2 { + continue + } + u, err := strconv.ParseUint(parts[0], 10, 64) + if err != nil { + return mask, errors.Wrapf(err, "unexpected directory entry breaks listing, %s", parts[0]) + } + if u >= fieldparams.NumberOfColumns { + return mask, errIndexOutOfBounds + } + mask[u] = true + } + return mask, nil +} + // Clear deletes all files on the filesystem. func (bs *BlobStorage) Clear() error { dirs, err := listDir(bs.fs, ".") @@ -335,6 +465,10 @@ func namerForSidecar(sc blocks.VerifiedROBlob) blobNamer { return blobNamer{root: sc.BlockRoot(), index: sc.Index} } +func namerForDataColumn(col blocks.VerifiedRODataColumn) blobNamer { + return blobNamer{root: col.BlockRoot(), index: col.ColumnIndex} +} + func (p blobNamer) dir() string { return rootString(p.root) } diff --git a/beacon-chain/p2p/interfaces.go b/beacon-chain/p2p/interfaces.go index c298df063f88..901073c535b1 100644 --- a/beacon-chain/p2p/interfaces.go +++ b/beacon-chain/p2p/interfaces.go @@ -31,6 +31,11 @@ type P2P interface { MetadataProvider } +type Acceser interface { + Broadcaster + PeerManager +} + // Broadcaster broadcasts messages to peers over the p2p pubsub protocol. type Broadcaster interface { Broadcast(context.Context, proto.Message) error diff --git a/config/fieldparams/mainnet.go b/config/fieldparams/mainnet.go index 17aeed23f8ba..b87b7ee2feb4 100644 --- a/config/fieldparams/mainnet.go +++ b/config/fieldparams/mainnet.go @@ -48,4 +48,5 @@ const ( MaxDeposits = 16 // Maximum number of deposits in a block. MaxVoluntaryExits = 16 // Maximum number of voluntary exits in a block. MaxBlsToExecutionChanges = 16 // Maximum number of bls to execution changes in a block. + NumberOfColumns = 128 // NumberOfColumns refers to the specified number of data columns that can exist in a network. ) diff --git a/config/fieldparams/minimal.go b/config/fieldparams/minimal.go index ac022e5823ea..fe18d7ac4614 100644 --- a/config/fieldparams/minimal.go +++ b/config/fieldparams/minimal.go @@ -48,4 +48,5 @@ const ( MaxDeposits = 16 // Maximum number of deposits in a block. MaxVoluntaryExits = 16 // Maximum number of voluntary exits in a block. MaxBlsToExecutionChanges = 16 // Maximum number of bls to execution changes in a block. + NumberOfColumns = 128 // NumberOfColumns refers to the specified number of data columns that can exist in a network. )