From 117037ee8f6012570f21a87ccceeda9d74b857b4 Mon Sep 17 00:00:00 2001 From: nisdas Date: Tue, 30 Apr 2024 14:17:53 +0800 Subject: [PATCH 1/8] Add new DA check --- beacon-chain/blockchain/BUILD.bazel | 4 +- beacon-chain/blockchain/error.go | 1 + beacon-chain/blockchain/options.go | 4 +- beacon-chain/blockchain/process_block.go | 92 ++++++++++++++++ .../blockchain/receive_data_column.go | 22 ++++ beacon-chain/blockchain/receive_sidecar.go | 12 -- beacon-chain/blockchain/service.go | 2 +- beacon-chain/db/filesystem/blob.go | 103 ++++++++++++++++++ beacon-chain/p2p/interfaces.go | 5 + 9 files changed, 229 insertions(+), 16 deletions(-) create mode 100644 beacon-chain/blockchain/receive_data_column.go delete mode 100644 beacon-chain/blockchain/receive_sidecar.go diff --git a/beacon-chain/blockchain/BUILD.bazel b/beacon-chain/blockchain/BUILD.bazel index d8ae4789c7f3..94c85fd1ff29 100644 --- a/beacon-chain/blockchain/BUILD.bazel +++ b/beacon-chain/blockchain/BUILD.bazel @@ -26,7 +26,7 @@ go_library( "receive_attestation.go", "receive_blob.go", "receive_block.go", - "receive_sidecar.go", + "receive_data_column.go", "service.go", "tracked_proposer.go", "weak_subjectivity_checks.go", @@ -49,6 +49,7 @@ go_library( "//beacon-chain/core/feed:go_default_library", "//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/helpers:go_default_library", + "//beacon-chain/core/peerdas:go_default_library", "//beacon-chain/core/signing:go_default_library", "//beacon-chain/core/time:go_default_library", "//beacon-chain/core/transition:go_default_library", @@ -92,6 +93,7 @@ go_library( "//time/slots:go_default_library", "@com_github_ethereum_go_ethereum//common:go_default_library", "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", + "@com_github_ethereum_go_ethereum//p2p/enode:go_default_library", "@com_github_holiman_uint256//:go_default_library", "@com_github_pkg_errors//:go_default_library", "@com_github_prometheus_client_golang//prometheus:go_default_library", diff --git a/beacon-chain/blockchain/error.go b/beacon-chain/blockchain/error.go index 87ed0d2416db..04b13012fcc6 100644 --- a/beacon-chain/blockchain/error.go +++ b/beacon-chain/blockchain/error.go @@ -33,6 +33,7 @@ var ( ) var errMaxBlobsExceeded = errors.New("Expected commitments in block exceeds MAX_BLOBS_PER_BLOCK") +var errMaxDataColumnsExceeded = errors.New("Expected data columns for node exceeds NUMBER_OF_COLUMNS") // An invalid block is the block that fails state transition based on the core protocol rules. // The beacon node shall not be accepting nor building blocks that branch off from an invalid block. diff --git a/beacon-chain/blockchain/options.go b/beacon-chain/blockchain/options.go index 38492502a1f9..f215c470c62d 100644 --- a/beacon-chain/blockchain/options.go +++ b/beacon-chain/blockchain/options.go @@ -118,9 +118,9 @@ func WithBLSToExecPool(p blstoexec.PoolManager) Option { } // WithP2PBroadcaster to broadcast messages after appropriate processing. -func WithP2PBroadcaster(p p2p.Broadcaster) Option { +func WithP2PBroadcaster(p p2p.Acceser) Option { return func(s *Service) error { - s.cfg.P2p = p + s.cfg.P2P = p return nil } } diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index 34a16e39e446..d87f138f9412 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -6,6 +6,7 @@ import ( "time" "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" "github.com/sirupsen/logrus" "go.opencensus.io/trace" @@ -514,12 +515,35 @@ func missingIndices(bs *filesystem.BlobStorage, root [32]byte, expected [][]byte return missing, nil } +func missingDataColumns(bs *filesystem.BlobStorage, root [32]byte, expected map[uint64]bool) (map[uint64]struct{}, error) { + if len(expected) == 0 { + return nil, nil + } + if len(expected) > int(params.BeaconConfig().NumberOfColumns) { + return nil, errMaxDataColumnsExceeded + } + indices, err := bs.Indices(root) + if err != nil { + return nil, err + } + missing := make(map[uint64]struct{}, len(expected)) + for col := range expected { + if !indices[col] { + missing[col] = struct{}{} + } + } + return missing, nil +} + // isDataAvailable blocks until all BlobSidecars committed to in the block are available, // or an error or context cancellation occurs. A nil result means that the data availability check is successful. // The function will first check the database to see if all sidecars have been persisted. If any // sidecars are missing, it will then read from the blobNotifier channel for the given root until the channel is // closed, the context hits cancellation/timeout, or notifications have been received for all the missing sidecars. func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed interfaces.ReadOnlySignedBeaconBlock) error { + if features.Get().EnablePeerDAS { + return s.isDataAvailableDataColumns(ctx, root, signed) + } if signed.Version() < version.Deneb { return nil } @@ -591,6 +615,74 @@ func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed int } } +func (s *Service) isDataAvailableDataColumns(ctx context.Context, root [32]byte, signed interfaces.ReadOnlySignedBeaconBlock) error { + if signed.Version() < version.Deneb { + return nil + } + + block := signed.Block() + if block == nil { + return errors.New("invalid nil beacon block") + } + // We are only required to check within MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS + if !params.WithinDAPeriod(slots.ToEpoch(block.Slot()), slots.ToEpoch(s.CurrentSlot())) { + return nil + } + + colMap, err := peerdas.CustodyColumns(s.cfg.P2P.NodeID(), params.BeaconConfig().CustodyRequirement) + if err != nil { + return err + } + // expected is the number of custodied data columnns a node is expected to have. + expected := len(colMap) + if expected == 0 { + return nil + } + // get a map of data column indices that are not currently available. + missing, err := missingDataColumns(s.blobStorage, root, colMap) + if err != nil { + return err + } + // If there are no missing indices, all data column sidecars are available. + if len(missing) == 0 { + return nil + } + + // The gossip handler for data columns writes the index of each verified data column referencing the given + // root to the channel returned by blobNotifiers.forRoot. + nc := s.blobNotifiers.forRoot(root) + + // Log for DA checks that cross over into the next slot; helpful for debugging. + nextSlot := slots.BeginsAt(signed.Block().Slot()+1, s.genesisTime) + // Avoid logging if DA check is called after next slot start. + if nextSlot.After(time.Now()) { + nst := time.AfterFunc(time.Until(nextSlot), func() { + if len(missing) == 0 { + return + } + log.WithFields(daCheckLogFields(root, signed.Block().Slot(), expected, len(missing))). + Error("Still waiting for DA check at slot end.") + }) + defer nst.Stop() + } + for { + select { + case idx := <-nc: + // Delete each index seen in the notification channel. + delete(missing, idx) + // Read from the channel until there are no more missing sidecars. + if len(missing) > 0 { + continue + } + // Once all sidecars have been observed, clean up the notification channel. + s.blobNotifiers.delete(root) + return nil + case <-ctx.Done(): + return errors.Wrapf(ctx.Err(), "context deadline waiting for blob sidecars slot: %d, BlockRoot: %#x", block.Slot(), root) + } + } +} + func daCheckLogFields(root [32]byte, slot primitives.Slot, expected, missing int) logrus.Fields { return logrus.Fields{ "slot": slot, diff --git a/beacon-chain/blockchain/receive_data_column.go b/beacon-chain/blockchain/receive_data_column.go new file mode 100644 index 000000000000..766bba6b57bc --- /dev/null +++ b/beacon-chain/blockchain/receive_data_column.go @@ -0,0 +1,22 @@ +package blockchain + +import ( + "context" + + ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" +) + +func (s *Service) ReceiveDataColumn(ctx context.Context, ds *ethpb.DataColumnSidecar) error { + if err := s.blobStorage.SaveDataColumn(ds); err != nil { + return err + } + hRoot, err := ds.SignedBlockHeader.Header.HashTreeRoot() + if err != nil { + return err + } + + // TODO use a custom event or new method of for data columns. For speed + // we are simply reusing blob paths here. + s.sendNewBlobEvent(hRoot, uint64(ds.SignedBlockHeader.Header.Slot)) + return nil +} diff --git a/beacon-chain/blockchain/receive_sidecar.go b/beacon-chain/blockchain/receive_sidecar.go deleted file mode 100644 index 8bdeaf4705ec..000000000000 --- a/beacon-chain/blockchain/receive_sidecar.go +++ /dev/null @@ -1,12 +0,0 @@ -package blockchain - -import ( - "context" - - "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" -) - -func (s *Service) ReceiveDataColumn(ctx context.Context, verifiedRODataColumn blocks.VerifiedRODataColumn) error { - // TODO - return nil -} diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index 09d0cce7906d..b603aeaf3453 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -82,7 +82,7 @@ type config struct { ExitPool voluntaryexits.PoolManager SlashingPool slashings.PoolManager BLSToExecPool blstoexec.PoolManager - P2p p2p.Broadcaster + P2P p2p.Acceser MaxRoutines int StateNotifier statefeed.Notifier ForkChoiceStore f.ForkChoicer diff --git a/beacon-chain/db/filesystem/blob.go b/beacon-chain/db/filesystem/blob.go index 0ce63b290e7f..5fea6f44c16b 100644 --- a/beacon-chain/db/filesystem/blob.go +++ b/beacon-chain/db/filesystem/blob.go @@ -218,6 +218,101 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error { return nil } +// SaveDataColumn saves a data column to our local filesystem. +func (bs *BlobStorage) SaveDataColumn(column *ethpb.DataColumnSidecar) error { + startTime := time.Now() + fname := namerForDataColumn(column) + sszPath := fname.path() + exists, err := afero.Exists(bs.fs, sszPath) + if err != nil { + return err + } + if exists { + log.Debug("Ignoring a duplicate data column sidecar save attempt") + return nil + } + if bs.pruner != nil { + hRoot, err := column.SignedBlockHeader.Header.HashTreeRoot() + if err != nil { + return err + } + if err := bs.pruner.notify(hRoot, column.SignedBlockHeader.Header.Slot, column.ColumnIndex); err != nil { + return errors.Wrapf(err, "problem maintaining pruning cache/metrics for sidecar with root=%#x", hRoot) + } + } + + // Serialize the ethpb.DataColumnSidecar to binary data using SSZ. + sidecarData, err := column.MarshalSSZ() + if err != nil { + return errors.Wrap(err, "failed to serialize sidecar data") + } else if len(sidecarData) == 0 { + return errSidecarEmptySSZData + } + + if err := bs.fs.MkdirAll(fname.dir(), directoryPermissions); err != nil { + return err + } + partPath := fname.partPath(fmt.Sprintf("%p", sidecarData)) + + partialMoved := false + // Ensure the partial file is deleted. + defer func() { + if partialMoved { + return + } + // It's expected to error if the save is successful. + err = bs.fs.Remove(partPath) + if err == nil { + log.WithFields(logrus.Fields{ + "partPath": partPath, + }).Debugf("Removed partial file") + } + }() + + // Create a partial file and write the serialized data to it. + partialFile, err := bs.fs.Create(partPath) + if err != nil { + return errors.Wrap(err, "failed to create partial file") + } + + n, err := partialFile.Write(sidecarData) + if err != nil { + closeErr := partialFile.Close() + if closeErr != nil { + return closeErr + } + return errors.Wrap(err, "failed to write to partial file") + } + if bs.fsync { + if err := partialFile.Sync(); err != nil { + return err + } + } + + if err := partialFile.Close(); err != nil { + return err + } + + if n != len(sidecarData) { + return fmt.Errorf("failed to write the full bytes of sidecarData, wrote only %d of %d bytes", n, len(sidecarData)) + } + + if n == 0 { + return errEmptyBlobWritten + } + + // Atomically rename the partial file to its final name. + err = bs.fs.Rename(partPath, sszPath) + if err != nil { + return errors.Wrap(err, "failed to rename partial file to final name") + } + partialMoved = true + // TODO: Use new metrics for data columns + blobsWrittenCounter.Inc() + blobSaveLatency.Observe(float64(time.Since(startTime).Milliseconds())) + return nil +} + // Get retrieves a single BlobSidecar by its root and index. // Since BlobStorage only writes blobs that have undergone full verification, the return // value is always a VerifiedROBlob. @@ -332,6 +427,14 @@ func namerForSidecar(sc blocks.VerifiedROBlob) blobNamer { return blobNamer{root: sc.BlockRoot(), index: sc.Index} } +func namerForDataColumn(col *ethpb.DataColumnSidecar) blobNamer { + bRoot, err := col.SignedBlockHeader.Header.HashTreeRoot() + if err != nil { + panic(err) + } + return blobNamer{root: bRoot, index: col.ColumnIndex} +} + func (p blobNamer) dir() string { return rootString(p.root) } diff --git a/beacon-chain/p2p/interfaces.go b/beacon-chain/p2p/interfaces.go index 8f37977c5ee5..f83efcff0669 100644 --- a/beacon-chain/p2p/interfaces.go +++ b/beacon-chain/p2p/interfaces.go @@ -31,6 +31,11 @@ type P2P interface { MetadataProvider } +type Acceser interface { + Broadcaster + PeerManager +} + // Broadcaster broadcasts messages to peers over the p2p pubsub protocol. type Broadcaster interface { Broadcast(context.Context, proto.Message) error From cc939fa741dbb764903a1662b03bf8c12e9a8b67 Mon Sep 17 00:00:00 2001 From: nisdas Date: Tue, 30 Apr 2024 14:26:56 +0800 Subject: [PATCH 2/8] Exit early in the event no commitments exist. --- beacon-chain/blockchain/process_block.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index d87f138f9412..046ed64d6abc 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -628,6 +628,18 @@ func (s *Service) isDataAvailableDataColumns(ctx context.Context, root [32]byte, if !params.WithinDAPeriod(slots.ToEpoch(block.Slot()), slots.ToEpoch(s.CurrentSlot())) { return nil } + body := block.Body() + if body == nil { + return errors.New("invalid nil beacon block body") + } + kzgCommitments, err := body.BlobKzgCommitments() + if err != nil { + return errors.Wrap(err, "could not get KZG commitments") + } + // If block has not commitments there is nothing to wait for. + if len(kzgCommitments) == 0 { + return nil + } colMap, err := peerdas.CustodyColumns(s.cfg.P2P.NodeID(), params.BeaconConfig().CustodyRequirement) if err != nil { From 41ddd0dab2b62308f8211fdd78e3a643265b9d77 Mon Sep 17 00:00:00 2001 From: nisdas Date: Tue, 30 Apr 2024 14:29:25 +0800 Subject: [PATCH 3/8] Gazelle --- beacon-chain/blockchain/BUILD.bazel | 1 - 1 file changed, 1 deletion(-) diff --git a/beacon-chain/blockchain/BUILD.bazel b/beacon-chain/blockchain/BUILD.bazel index 94c85fd1ff29..dc771ce79e00 100644 --- a/beacon-chain/blockchain/BUILD.bazel +++ b/beacon-chain/blockchain/BUILD.bazel @@ -93,7 +93,6 @@ go_library( "//time/slots:go_default_library", "@com_github_ethereum_go_ethereum//common:go_default_library", "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", - "@com_github_ethereum_go_ethereum//p2p/enode:go_default_library", "@com_github_holiman_uint256//:go_default_library", "@com_github_pkg_errors//:go_default_library", "@com_github_prometheus_client_golang//prometheus:go_default_library", From 2ccf258993b20b640ad19e5143157ed3bb0e54d0 Mon Sep 17 00:00:00 2001 From: nisdas Date: Tue, 30 Apr 2024 15:48:51 +0800 Subject: [PATCH 4/8] Fix Mock Broadcaster --- beacon-chain/p2p/testing/mock_broadcaster.go | 49 ++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/beacon-chain/p2p/testing/mock_broadcaster.go b/beacon-chain/p2p/testing/mock_broadcaster.go index 0e03dc69907f..a26b07ae5595 100644 --- a/beacon-chain/p2p/testing/mock_broadcaster.go +++ b/beacon-chain/p2p/testing/mock_broadcaster.go @@ -5,6 +5,11 @@ import ( "sync" "sync/atomic" + "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + "github.com/prysmaticlabs/bazel-go-ethereum/p2p/enode" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "google.golang.org/protobuf/proto" ) @@ -67,3 +72,47 @@ func (m *MockBroadcaster) NumAttestations() int { defer m.attLock.Unlock() return len(m.BroadcastAttestations) } +func (_ *MockBroadcaster) Disconnect(id peer.ID) error { + //TODO implement me + panic("implement me") +} + +func (_ *MockBroadcaster) PeerID() peer.ID { + //TODO implement me + panic("implement me") +} + +func (_ *MockBroadcaster) Host() host.Host { + //TODO implement me + panic("implement me") +} + +func (_ *MockBroadcaster) ENR() *enr.Record { + //TODO implement me + panic("implement me") +} + +func (_ *MockBroadcaster) NodeID() enode.ID { + //TODO implement me + panic("implement me") +} + +func (_ *MockBroadcaster) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { + //TODO implement me + panic("implement me") +} + +func (_ *MockBroadcaster) RefreshPersistentSubnets() { + //TODO implement me + panic("implement me") +} + +func (_ *MockBroadcaster) FindPeersWithSubnet(ctx context.Context, topic string, subIndex uint64, threshold int) (bool, error) { + //TODO implement me + panic("implement me") +} + +func (_ *MockBroadcaster) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error) { + //TODO implement me + panic("implement me") +} From 9453e9ec0bbf02263196bb593340335478b1d377 Mon Sep 17 00:00:00 2001 From: nisdas Date: Tue, 30 Apr 2024 15:55:43 +0800 Subject: [PATCH 5/8] Fix Test Setup --- beacon-chain/blockchain/BUILD.bazel | 1 + beacon-chain/blockchain/service_test.go | 2 +- beacon-chain/blockchain/setup_test.go | 6 +++ beacon-chain/p2p/testing/mock_broadcaster.go | 49 -------------------- 4 files changed, 8 insertions(+), 50 deletions(-) diff --git a/beacon-chain/blockchain/BUILD.bazel b/beacon-chain/blockchain/BUILD.bazel index dc771ce79e00..071e2cc13e1a 100644 --- a/beacon-chain/blockchain/BUILD.bazel +++ b/beacon-chain/blockchain/BUILD.bazel @@ -159,6 +159,7 @@ go_test( "//beacon-chain/operations/slashings:go_default_library", "//beacon-chain/operations/voluntaryexits:go_default_library", "//beacon-chain/p2p:go_default_library", + "//beacon-chain/p2p/testing:go_default_library", "//beacon-chain/startup:go_default_library", "//beacon-chain/state:go_default_library", "//beacon-chain/state/state-native:go_default_library", diff --git a/beacon-chain/blockchain/service_test.go b/beacon-chain/blockchain/service_test.go index e2447d036183..52ae4592f0e2 100644 --- a/beacon-chain/blockchain/service_test.go +++ b/beacon-chain/blockchain/service_test.go @@ -95,7 +95,7 @@ func setupBeaconChain(t *testing.T, beaconDB db.Database) *Service { WithAttestationPool(attestations.NewPool()), WithSlashingPool(slashings.NewPool()), WithExitPool(voluntaryexits.NewPool()), - WithP2PBroadcaster(&mockBroadcaster{}), + WithP2PBroadcaster(&mockAccesser{}), WithStateNotifier(&mockBeaconNode{}), WithForkChoiceStore(fc), WithAttestationService(attService), diff --git a/beacon-chain/blockchain/setup_test.go b/beacon-chain/blockchain/setup_test.go index ed7e3af5d854..f17d844a32ba 100644 --- a/beacon-chain/blockchain/setup_test.go +++ b/beacon-chain/blockchain/setup_test.go @@ -18,6 +18,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/attestations" "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/blstoexec" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" + p2pTesting "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing" "github.com/prysmaticlabs/prysm/v5/beacon-chain/startup" "github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" @@ -44,6 +45,11 @@ type mockBroadcaster struct { broadcastCalled bool } +type mockAccesser struct { + mockBroadcaster + p2pTesting.MockPeerManager +} + func (mb *mockBroadcaster) Broadcast(_ context.Context, _ proto.Message) error { mb.broadcastCalled = true return nil diff --git a/beacon-chain/p2p/testing/mock_broadcaster.go b/beacon-chain/p2p/testing/mock_broadcaster.go index a26b07ae5595..0e03dc69907f 100644 --- a/beacon-chain/p2p/testing/mock_broadcaster.go +++ b/beacon-chain/p2p/testing/mock_broadcaster.go @@ -5,11 +5,6 @@ import ( "sync" "sync/atomic" - "github.com/ethereum/go-ethereum/p2p/enr" - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multiaddr" - "github.com/prysmaticlabs/bazel-go-ethereum/p2p/enode" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "google.golang.org/protobuf/proto" ) @@ -72,47 +67,3 @@ func (m *MockBroadcaster) NumAttestations() int { defer m.attLock.Unlock() return len(m.BroadcastAttestations) } -func (_ *MockBroadcaster) Disconnect(id peer.ID) error { - //TODO implement me - panic("implement me") -} - -func (_ *MockBroadcaster) PeerID() peer.ID { - //TODO implement me - panic("implement me") -} - -func (_ *MockBroadcaster) Host() host.Host { - //TODO implement me - panic("implement me") -} - -func (_ *MockBroadcaster) ENR() *enr.Record { - //TODO implement me - panic("implement me") -} - -func (_ *MockBroadcaster) NodeID() enode.ID { - //TODO implement me - panic("implement me") -} - -func (_ *MockBroadcaster) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { - //TODO implement me - panic("implement me") -} - -func (_ *MockBroadcaster) RefreshPersistentSubnets() { - //TODO implement me - panic("implement me") -} - -func (_ *MockBroadcaster) FindPeersWithSubnet(ctx context.Context, topic string, subIndex uint64, threshold int) (bool, error) { - //TODO implement me - panic("implement me") -} - -func (_ *MockBroadcaster) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error) { - //TODO implement me - panic("implement me") -} From bf429723c50af9462ddc1ff1835af9cad41faed8 Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Tue, 30 Apr 2024 20:25:27 +0800 Subject: [PATCH 6/8] Update beacon-chain/blockchain/process_block.go Co-authored-by: Manu NALEPA --- beacon-chain/blockchain/process_block.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index 046ed64d6abc..019789f2cbb1 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -515,7 +515,7 @@ func missingIndices(bs *filesystem.BlobStorage, root [32]byte, expected [][]byte return missing, nil } -func missingDataColumns(bs *filesystem.BlobStorage, root [32]byte, expected map[uint64]bool) (map[uint64]struct{}, error) { +func missingDataColumns(bs *filesystem.BlobStorage, root [32]byte, expected map[uint64]bool) (map[uint64]bool, error) { if len(expected) == 0 { return nil, nil } @@ -526,10 +526,10 @@ func missingDataColumns(bs *filesystem.BlobStorage, root [32]byte, expected map[ if err != nil { return nil, err } - missing := make(map[uint64]struct{}, len(expected)) + missing := make(map[uint64]bool, len(expected)) for col := range expected { if !indices[col] { - missing[col] = struct{}{} + missing[col] = true } } return missing, nil From d5501acce37e7c157ef270adc51f566a9aa161e6 Mon Sep 17 00:00:00 2001 From: nisdas Date: Tue, 30 Apr 2024 20:35:37 +0800 Subject: [PATCH 7/8] Manu's Review --- beacon-chain/blockchain/process_block.go | 2 +- beacon-chain/db/filesystem/blob.go | 35 ++++++++++++++++++++++++ config/fieldparams/mainnet.go | 1 + config/fieldparams/minimal.go | 1 + 4 files changed, 38 insertions(+), 1 deletion(-) diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index 019789f2cbb1..96a3c260e240 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -522,7 +522,7 @@ func missingDataColumns(bs *filesystem.BlobStorage, root [32]byte, expected map[ if len(expected) > int(params.BeaconConfig().NumberOfColumns) { return nil, errMaxDataColumnsExceeded } - indices, err := bs.Indices(root) + indices, err := bs.ColumnIndices(root) if err != nil { return nil, err } diff --git a/beacon-chain/db/filesystem/blob.go b/beacon-chain/db/filesystem/blob.go index 5fea6f44c16b..6f004587b1c2 100644 --- a/beacon-chain/db/filesystem/blob.go +++ b/beacon-chain/db/filesystem/blob.go @@ -395,6 +395,41 @@ func (bs *BlobStorage) Indices(root [32]byte) ([fieldparams.MaxBlobsPerBlock]boo return mask, nil } +// ColumnIndices retrieve the stored column indexes from our filesystem. +func (bs *BlobStorage) ColumnIndices(root [32]byte) ([fieldparams.NumberOfColumns]bool, error) { + var mask [fieldparams.NumberOfColumns]bool + rootDir := blobNamer{root: root}.dir() + entries, err := afero.ReadDir(bs.fs, rootDir) + if err != nil { + if os.IsNotExist(err) { + return mask, nil + } + return mask, err + } + for i := range entries { + if entries[i].IsDir() { + continue + } + name := entries[i].Name() + if !strings.HasSuffix(name, sszExt) { + continue + } + parts := strings.Split(name, ".") + if len(parts) != 2 { + continue + } + u, err := strconv.ParseUint(parts[0], 10, 64) + if err != nil { + return mask, errors.Wrapf(err, "unexpected directory entry breaks listing, %s", parts[0]) + } + if u >= fieldparams.NumberOfColumns { + return mask, errIndexOutOfBounds + } + mask[u] = true + } + return mask, nil +} + // Clear deletes all files on the filesystem. func (bs *BlobStorage) Clear() error { dirs, err := listDir(bs.fs, ".") diff --git a/config/fieldparams/mainnet.go b/config/fieldparams/mainnet.go index 706334328313..2f5b0d601364 100644 --- a/config/fieldparams/mainnet.go +++ b/config/fieldparams/mainnet.go @@ -33,4 +33,5 @@ const ( BlobSize = 131072 // defined to match blob.size in bazel ssz codegen KzgCommitmentInclusionProofDepth = 17 // Merkle proof depth for blob_kzg_commitments list item NextSyncCommitteeBranchDepth = 5 // NextSyncCommitteeBranchDepth defines the depth of the next sync committee branch. + NumberOfColumns = 128 // NumberOfColumns refers to the specified number of data columns that can exist in a network. ) diff --git a/config/fieldparams/minimal.go b/config/fieldparams/minimal.go index 96ec9e28f875..13cdac3c0864 100644 --- a/config/fieldparams/minimal.go +++ b/config/fieldparams/minimal.go @@ -33,4 +33,5 @@ const ( BlobSize = 131072 // defined to match blob.size in bazel ssz codegen KzgCommitmentInclusionProofDepth = 17 // Merkle proof depth for blob_kzg_commitments list item NextSyncCommitteeBranchDepth = 5 // NextSyncCommitteeBranchDepth defines the depth of the next sync committee branch. + NumberOfColumns = 128 // NumberOfColumns refers to the specified number of data columns that can exist in a network. ) From 983c87429fdcbe2b93fabf3904f65c79734754ac Mon Sep 17 00:00:00 2001 From: nisdas Date: Tue, 30 Apr 2024 21:13:25 +0800 Subject: [PATCH 8/8] Fix Build --- beacon-chain/blockchain/receive_data_column.go | 10 +++------- beacon-chain/db/filesystem/blob.go | 10 +++------- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/beacon-chain/blockchain/receive_data_column.go b/beacon-chain/blockchain/receive_data_column.go index 766bba6b57bc..40973dc7b085 100644 --- a/beacon-chain/blockchain/receive_data_column.go +++ b/beacon-chain/blockchain/receive_data_column.go @@ -3,20 +3,16 @@ package blockchain import ( "context" - ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" ) -func (s *Service) ReceiveDataColumn(ctx context.Context, ds *ethpb.DataColumnSidecar) error { +func (s *Service) ReceiveDataColumn(ctx context.Context, ds blocks.VerifiedRODataColumn) error { if err := s.blobStorage.SaveDataColumn(ds); err != nil { return err } - hRoot, err := ds.SignedBlockHeader.Header.HashTreeRoot() - if err != nil { - return err - } // TODO use a custom event or new method of for data columns. For speed // we are simply reusing blob paths here. - s.sendNewBlobEvent(hRoot, uint64(ds.SignedBlockHeader.Header.Slot)) + s.sendNewBlobEvent(ds.BlockRoot(), uint64(ds.SignedBlockHeader.Header.Slot)) return nil } diff --git a/beacon-chain/db/filesystem/blob.go b/beacon-chain/db/filesystem/blob.go index 6f004587b1c2..f13fe90608a0 100644 --- a/beacon-chain/db/filesystem/blob.go +++ b/beacon-chain/db/filesystem/blob.go @@ -219,7 +219,7 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error { } // SaveDataColumn saves a data column to our local filesystem. -func (bs *BlobStorage) SaveDataColumn(column *ethpb.DataColumnSidecar) error { +func (bs *BlobStorage) SaveDataColumn(column blocks.VerifiedRODataColumn) error { startTime := time.Now() fname := namerForDataColumn(column) sszPath := fname.path() @@ -462,12 +462,8 @@ func namerForSidecar(sc blocks.VerifiedROBlob) blobNamer { return blobNamer{root: sc.BlockRoot(), index: sc.Index} } -func namerForDataColumn(col *ethpb.DataColumnSidecar) blobNamer { - bRoot, err := col.SignedBlockHeader.Header.HashTreeRoot() - if err != nil { - panic(err) - } - return blobNamer{root: bRoot, index: col.ColumnIndex} +func namerForDataColumn(col blocks.VerifiedRODataColumn) blobNamer { + return blobNamer{root: col.BlockRoot(), index: col.ColumnIndex} } func (p blobNamer) dir() string {