Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Activate peerDAS at electra. #14731

Draft
wants to merge 1 commit into
base: peerDAS
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion beacon-chain/core/time/slot_epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func HigherEqualThanAltairVersionAndEpoch(s state.BeaconState, e primitives.Epoc

// PeerDASIsActive checks whether peerDAS is active at the provided slot.
func PeerDASIsActive(slot primitives.Slot) bool {
return params.PeerDASEnabled() && slots.ToEpoch(slot) >= params.BeaconConfig().Eip7594ForkEpoch
return params.PeerDASEnabled() && slots.ToEpoch(slot) >= params.BeaconConfig().ElectraForkEpoch
}

// CanUpgradeToAltair returns true if the input `slot` can upgrade to Altair.
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/das/availability_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (s *LazilyPersistentStoreColumn) IsDataAvailable(
// ignore their response and decrease their peer score.
roDataColumns, err := entry.filterColumns(blockRoot, blockCommitments)
if err != nil {
return errors.Wrap(err, "incomplete BlobSidecar batch")
return errors.Wrap(err, "incomplete DataColumnSidecar batch")
}

// Create verified RO data columns from RO data columns.
Expand Down
6 changes: 3 additions & 3 deletions beacon-chain/das/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (e *cacheEntry) filter(root [32]byte, kc safeCommitmentArray) ([]blocks.ROB
return nil, nil
}
scs := make([]blocks.ROBlob, 0, kc.count())
for i := uint64(0); i < fieldparams.MaxBlobsPerBlock; i++ {
for i := range uint64(fieldparams.MaxBlobsPerBlock) {
// We already have this blob, we don't need to write it or validate it.
if e.diskSummary.HasIndex(i) {
continue
Expand Down Expand Up @@ -143,8 +143,8 @@ func (e *cacheEntry) filterColumns(root [32]byte, commitmentsArray *safeCommitme
commitmentsCount := commitmentsArray.count()
sidecars := make([]blocks.RODataColumn, 0, commitmentsCount)

for i := uint64(0); i < fieldparams.NumberOfColumns; i++ {
// Skip if we arleady store this data column.
for i := range uint64(fieldparams.NumberOfColumns) {
// Skip if we already store this data column.
if e.diskSummary.HasIndex(i) {
continue
}
Expand Down
6 changes: 3 additions & 3 deletions beacon-chain/p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,10 @@ func (s *Service) RefreshPersistentSubnets() {

isBitSUpToDate := bytes.Equal(bitS, inRecordBitS) && bytes.Equal(bitS, currentBitSInMetadata)

// Compare current epoch with EIP-7594 fork epoch.
eip7594ForkEpoch := params.BeaconConfig().Eip7594ForkEpoch
// Compare current epoch with the Electra fork epoch.
electraForkEpoch := params.BeaconConfig().ElectraForkEpoch

if currentEpoch < eip7594ForkEpoch {
if currentEpoch < electraForkEpoch {
// Altair behaviour.
if metadataVersion == version.Altair && isBitVUpToDate && isBitSUpToDate {
// Nothing to do, return early.
Expand Down
8 changes: 4 additions & 4 deletions beacon-chain/p2p/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestStartDiscV5_DiscoverAllPeers(t *testing.T) {
func TestCreateLocalNode(t *testing.T) {
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig()
cfg.Eip7594ForkEpoch = 1
cfg.ElectraForkEpoch = 1
params.OverrideBeaconConfig(cfg)
testCases := []struct {
name string
Expand Down Expand Up @@ -626,7 +626,7 @@ func TestRefreshPersistentSubnets(t *testing.T) {

const (
altairForkEpoch = 5
eip7594ForkEpoch = 10
electraForkEpoch = 10
)

custodySubnetCount := params.BeaconConfig().CustodyRequirement
Expand All @@ -635,7 +635,7 @@ func TestRefreshPersistentSubnets(t *testing.T) {
defaultCfg := params.BeaconConfig()
cfg := defaultCfg.Copy()
cfg.AltairForkEpoch = altairForkEpoch
cfg.Eip7594ForkEpoch = eip7594ForkEpoch
cfg.ElectraForkEpoch = electraForkEpoch
params.OverrideBeaconConfig(cfg)

// Compute the number of seconds per epoch.
Expand Down Expand Up @@ -706,7 +706,7 @@ func TestRefreshPersistentSubnets(t *testing.T) {
},
{
name: "PeerDAS",
epochSinceGenesis: eip7594ForkEpoch,
epochSinceGenesis: electraForkEpoch,
checks: []check{
{
pingCount: 0,
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/rpc_topic_mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func TopicFromMessage(msg string, epoch primitives.Epoch) (string, error) {
}

// Check if the message is to be updated in peerDAS.
isPeerDAS := epoch >= params.BeaconConfig().Eip7594ForkEpoch
isPeerDAS := epoch >= params.BeaconConfig().ElectraForkEpoch
if isPeerDAS && peerDASMapping[msg] {
version = SchemaVersionV3
}
Expand Down
5 changes: 1 addition & 4 deletions beacon-chain/rpc/eth/config/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func TestGetSpec(t *testing.T) {
config.DenebForkEpoch = 105
config.ElectraForkVersion = []byte("ElectraForkVersion")
config.ElectraForkEpoch = 107
config.Eip7594ForkEpoch = 109
config.BLSWithdrawalPrefixByte = byte('b')
config.ETH1AddressWithdrawalPrefixByte = byte('c')
config.GenesisDelay = 24
Expand Down Expand Up @@ -190,7 +189,7 @@ func TestGetSpec(t *testing.T) {
data, ok := resp.Data.(map[string]interface{})
require.Equal(t, true, ok)

assert.Equal(t, 156, len(data))
assert.Equal(t, 155, len(data))
for k, v := range data {
t.Run(k, func(t *testing.T) {
switch k {
Expand Down Expand Up @@ -268,8 +267,6 @@ func TestGetSpec(t *testing.T) {
assert.Equal(t, "0x"+hex.EncodeToString([]byte("ElectraForkVersion")), v)
case "ELECTRA_FORK_EPOCH":
assert.Equal(t, "107", v)
case "EIP7594_FORK_EPOCH":
assert.Equal(t, "109", v)
case "MIN_ANCHOR_POW_BLOCK_DIFFICULTY":
assert.Equal(t, "1000", v)
case "BLS_WITHDRAWAL_PREFIX":
Expand Down
6 changes: 3 additions & 3 deletions beacon-chain/rpc/lookup/blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,12 +471,12 @@ func (p *BeaconDbBlocker) Blobs(ctx context.Context, id string, indices map[uint
blockSlot := b.Block().Slot()

// Get the first peerDAS epoch.
eip7594ForkEpoch := params.BeaconConfig().Eip7594ForkEpoch
electraForkEpoch := params.BeaconConfig().ElectraForkEpoch

// Compute the first peerDAS slot.
peerDASStartSlot := primitives.Slot(math.MaxUint64)
if eip7594ForkEpoch != primitives.Epoch(math.MaxUint64) {
peerDASStartSlot, err = slots.EpochStart(eip7594ForkEpoch)
if electraForkEpoch != primitives.Epoch(math.MaxUint64) {
peerDASStartSlot, err = slots.EpochStart(electraForkEpoch)
if err != nil {
return nil, &core.RpcError{Err: errors.Wrap(err, "could not calculate peerDAS start slot"), Reason: core.Internal}
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/data_columns_sampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func setupDataColumnSamplerTest(t *testing.T, blobCount uint64) (*dataSamplerTes
ctx: context.Background(),
p2pSvc: p2pSvc,
peers: []*p2ptest.TestP2P{},
ctxMap: map[[4]byte]int{{245, 165, 253, 66}: version.Deneb},
ctxMap: map[[4]byte]int{{245, 165, 253, 66}: version.Electra},
chainSvc: chainSvc,
blockProcessedData: blockProcessedData,
blobs: blobs,
Expand Down
146 changes: 89 additions & 57 deletions beacon-chain/sync/fork_watcher.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sync

import (
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v5/config/params"
Expand Down Expand Up @@ -39,80 +40,114 @@ func (s *Service) forkWatcher() {
}
}

// Checks if there is a fork in the next epoch and if there is
// it registers the appropriate gossip and rpc topics.
func (s *Service) registerForUpcomingFork(currEpoch primitives.Epoch) error {
genRoot := s.cfg.clock.GenesisValidatorsRoot()
isNextForkEpoch, err := forks.IsForkNextEpoch(s.cfg.clock.GenesisTime(), genRoot[:])
// Register appropriate gossip and RPC topic if there is a fork in the next epoch.
func (s *Service) registerForUpcomingFork(currentEpoch primitives.Epoch) error {
// Get the genesis validators root.
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()

// Check if there is a fork in the next epoch.
isForkNextEpoch, err := forks.IsForkNextEpoch(s.cfg.clock.GenesisTime(), genesisValidatorsRoot[:])
if err != nil {
return errors.Wrap(err, "Could not retrieve next fork epoch")
}
// In preparation for the upcoming fork
// in the following epoch, the node
// will subscribe the new topics in advance.
if isNextForkEpoch {
nextEpoch := currEpoch + 1
digest, err := forks.ForkDigestFromEpoch(nextEpoch, genRoot[:])
if err != nil {
return errors.Wrap(err, "could not retrieve fork digest")
}
if s.subHandler.digestExists(digest) {
return nil
}
s.registerSubscribers(nextEpoch, digest)
if nextEpoch == params.BeaconConfig().AltairForkEpoch {
s.registerRPCHandlersAltair()
}
if nextEpoch == params.BeaconConfig().DenebForkEpoch {
s.registerRPCHandlersDeneb()
}

// Exit early if there is no fork in the next epoch.
if !isForkNextEpoch {
return nil
}

// Compute the next epoch.
nextEpoch := currentEpoch + 1

// Get the fork digest for the next epoch.
digest, err := forks.ForkDigestFromEpoch(nextEpoch, genesisValidatorsRoot[:])
if err != nil {
return errors.Wrap(err, "could not retrieve fork digest")
}

// Exit early if the topics for the next epoch are already registered.
// It likely to be the case for all slots of the epoch that are not the first one.
if s.subHandler.digestExists(digest) {
return nil
}

// Register the subscribers (gossipsub) for the next epoch.
s.registerSubscribers(nextEpoch, digest)

// Get the handlers for the current and next fork.
currentHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(currentEpoch)
if err != nil {
return errors.Wrap(err, "RPC handler by topic")
}
// Specially handle peerDAS
if params.PeerDASEnabled() && currEpoch+1 == params.BeaconConfig().Eip7594ForkEpoch {
s.registerRPCHandlersPeerDAS()

nextHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(nextEpoch)
if err != nil {
return errors.Wrap(err, "RPC handler by topic")
}

// Compute newsly added topics.
newRPCHandlerByTopic := addedRPCHandlerByTopic(currentHandlerByTopic, nextHandlerByTopic)

// Register the new RPC handlers.
for topic, handler := range newRPCHandlerByTopic {
s.registerRPC(topic, handler)
}

return nil
}

// Checks if there was a fork in the previous epoch, and if there
// was then we deregister the topics from that particular fork.
func (s *Service) deregisterFromPastFork(currEpoch primitives.Epoch) error {
genRoot := s.cfg.clock.GenesisValidatorsRoot()
// This method takes care of the de-registration of
// old gossip pubsub handlers. Once we are at the epoch
// after the fork, we de-register from all the outdated topics.
currFork, err := forks.Fork(currEpoch)
// deregisterFromPastFork checks if there was a fork in the previous epoch,
// and if there was then we deregister the gossipsub topics from that particular fork,
// and the RPC handlers that are no longer relevant.
func (s *Service) deregisterFromPastFork(currentEpoch primitives.Epoch) error {
// Extract the genesis validators root.
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()

// Get the fork.
currentFork, err := forks.Fork(currentEpoch)
if err != nil {
return err
return errors.Wrap(err, "genesis validators root")
}
// If we are still in our genesis fork version then
// we simply exit early.
if currFork.Epoch == params.BeaconConfig().GenesisEpoch {

// If we are still in our genesis fork version then exit early.
if currentFork.Epoch == params.BeaconConfig().GenesisEpoch {
return nil
}
epochAfterFork := currFork.Epoch + 1
// If we are in the epoch after the fork, we start de-registering.
if epochAfterFork == currEpoch {

epochAfterFork := currentFork.Epoch + 1

// Start de-registring if the current epoch is the first epoch after the fork.
if epochAfterFork == currentEpoch {
// Look at the previous fork's digest.
epochBeforeFork := currFork.Epoch - 1
prevDigest, err := forks.ForkDigestFromEpoch(epochBeforeFork, genRoot[:])
epochBeforeFork := currentFork.Epoch - 1

previousDigest, err := forks.ForkDigestFromEpoch(epochBeforeFork, genesisValidatorsRoot[:])
if err != nil {
return errors.Wrap(err, "Failed to determine previous epoch fork digest")
return errors.Wrap(err, "fork digest from epoch")
}

// Exit early if there are no topics with that particular
// digest.
if !s.subHandler.digestExists(prevDigest) {
// Exit early if there are no topics with that particular digest.
if !s.subHandler.digestExists(previousDigest) {
return nil
}
prevFork, err := forks.Fork(epochBeforeFork)

// Compute the RPC handlers that are no longer needed.
currentHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(currentEpoch)
if err != nil {
return errors.Wrap(err, "failed to determine previous epoch fork data")
return errors.Wrap(err, "RPC handler by topic from epoch")
}
if prevFork.Epoch == params.BeaconConfig().GenesisEpoch {
s.unregisterPhase0Handlers()

nextHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(epochAfterFork)
if err != nil {
return errors.Wrap(err, "RPC handler by topic from epoch")
}

topicsToRemove := removedRPCTopics(currentHandlerByTopic, nextHandlerByTopic)
for topic := range topicsToRemove {
fullTopic := topic + s.cfg.p2p.Encoding().ProtocolSuffix()
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullTopic))
}

// Run through all our current active topics and see
// if there are any subscriptions to be removed.
for _, t := range s.subHandler.allTopics() {
Expand All @@ -121,14 +156,11 @@ func (s *Service) deregisterFromPastFork(currEpoch primitives.Epoch) error {
log.WithError(err).Error("Could not retrieve digest")
continue
}
if retDigest == prevDigest {
if retDigest == previousDigest {
s.unSubscribeFromTopic(t)
}
}
}
// Handle PeerDAS as its a special case.
if params.PeerDASEnabled() && currEpoch > 0 && (currEpoch-1) == params.BeaconConfig().Eip7594ForkEpoch {
s.unregisterBlobHandlers()
}

return nil
}
19 changes: 16 additions & 3 deletions beacon-chain/sync/fork_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/network/forks"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/testing/assert"
)

Expand Down Expand Up @@ -230,7 +231,8 @@ func TestService_CheckForPreviousEpochFork(t *testing.T) {
chainStarted: abool.New(),
subHandler: newSubTopicHandler(),
}
r.registerRPCHandlers()
err := r.registerRPCHandlers()
assert.NoError(t, err)
return r
},
currEpoch: 10,
Expand Down Expand Up @@ -278,10 +280,21 @@ func TestService_CheckForPreviousEpochFork(t *testing.T) {
prevGenesis := chainService.Genesis
// To allow registration of v1 handlers
chainService.Genesis = time.Now().Add(-1 * oneEpoch())
r.registerRPCHandlers()
err := r.registerRPCHandlers()
assert.NoError(t, err)

chainService.Genesis = prevGenesis
r.registerRPCHandlersAltair()
previous, err := r.rpcHandlerByTopicFromFork(version.Phase0)
assert.NoError(t, err)

next, err := r.rpcHandlerByTopicFromFork(version.Altair)
assert.NoError(t, err)

handlerByTopic := addedRPCHandlerByTopic(previous, next)

for topic, handler := range handlerByTopic {
r.registerRPC(topic, handler)
}

genRoot := r.cfg.clock.GenesisValidatorsRoot()
digest, err := forks.ForkDigestFromEpoch(0, genRoot[:])
Expand Down
Loading