Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposer head v5 #12075

Merged
merged 14 commits into from
Mar 4, 2023
2 changes: 1 addition & 1 deletion beacon-chain/blockchain/chain_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type ChainInfoFetcher interface {
// HeadUpdater defines a common interface for methods in blockchain service
// which allow to update the head info
type HeadUpdater interface {
UpdateHead(context.Context) error
UpdateHead(context.Context, primitives.Slot)
}

// TimeFetcher retrieves the Ethereum consensus data that's related to time.
Expand Down
54 changes: 43 additions & 11 deletions beacon-chain/blockchain/forkchoice_update_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v3/config/features"
"github.com/prysmaticlabs/prysm/v3/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
)

func (s *Service) isNewProposer() bool {
_, _, ok := s.cfg.ProposerSlotIndexCache.GetProposerPayloadIDs(s.CurrentSlot()+1, [32]byte{} /* root */)
func (s *Service) isNewProposer(slot primitives.Slot) bool {
_, _, ok := s.cfg.ProposerSlotIndexCache.GetProposerPayloadIDs(slot, [32]byte{} /* root */)
return ok
}

Expand Down Expand Up @@ -40,28 +42,58 @@ func (s *Service) getStateAndBlock(ctx context.Context, r [32]byte) (state.Beaco
return headState, newHeadBlock, nil
}

func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, newHeadRoot [32]byte) error {
func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, newHeadRoot [32]byte, proposingSlot primitives.Slot) error {
isNewHead := s.isNewHead(newHeadRoot)
if !isNewHead && !s.isNewProposer() {
isNewProposer := s.isNewProposer(proposingSlot)
if !isNewHead && !isNewProposer {
return nil
}

headState, headBlock, err := s.getStateAndBlock(ctx, newHeadRoot)
if err != nil {
log.WithError(err).Error("Could not get forkchoice update argument")
return nil
var headState state.BeaconState
var headBlock interfaces.ReadOnlySignedBeaconBlock
var headRoot [32]byte
var err error

shouldUpdate := isNewHead
if isNewHead && isNewProposer && !features.Get().DisableReorgLateBlocks {
if proposingSlot == s.CurrentSlot() {
proposerHead := s.ForkChoicer().GetProposerHead()
if proposerHead != newHeadRoot {
shouldUpdate = false
}
} else if s.ForkChoicer().ShouldOverrideFCU() {
shouldUpdate = false
}
}
if shouldUpdate {
headRoot = newHeadRoot
headState, headBlock, err = s.getStateAndBlock(ctx, newHeadRoot)
if err != nil {
log.WithError(err).Error("Could not get forkchoice update argument")
return nil
}
} else {
// We are guaranteed that the head block is the parent
// of the incoming block. We do not process the slot
// because it will be processed anyway in notifyForkchoiceUpdate
headState = s.headState(ctx)
headRoot = s.headRoot()
headBlock, err = s.headBlock()
if err != nil {
return errors.Wrap(err, "could not get head block")
}
}

_, err = s.notifyForkchoiceUpdate(ctx, &notifyForkchoiceUpdateArg{
headState: headState,
headRoot: newHeadRoot,
headRoot: headRoot,
headBlock: headBlock.Block(),
})
if err != nil {
return err
return errors.Wrap(err, "could not notify forkchoice update")
}

if isNewHead {
if shouldUpdate {
if err := s.saveHead(ctx, newHeadRoot, headBlock, headState); err != nil {
log.WithError(err).Error("could not save head")
}
Expand Down
18 changes: 10 additions & 8 deletions beacon-chain/blockchain/forkchoice_update_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
func TestService_isNewProposer(t *testing.T) {
beaconDB := testDB.SetupDB(t)
service := setupBeaconChain(t, beaconDB)
require.Equal(t, false, service.isNewProposer())
require.Equal(t, false, service.isNewProposer(service.CurrentSlot()+1))

service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(service.CurrentSlot()+1, 0, [8]byte{}, [32]byte{} /* root */)
require.Equal(t, true, service.isNewProposer())
require.Equal(t, true, service.isNewProposer(service.CurrentSlot()+1))
}

func TestService_isNewHead(t *testing.T) {
Expand Down Expand Up @@ -75,15 +75,15 @@ func TestService_forkchoiceUpdateWithExecution_exceptionalCases(t *testing.T) {
service, err := NewService(ctx, opts...)
require.NoError(t, err)
service.cfg.ProposerSlotIndexCache = cache.NewProposerPayloadIDsCache()
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, service.headRoot()))
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, service.headRoot(), service.CurrentSlot()+1))
hookErr := "could not notify forkchoice update"
invalidStateErr := "could not get state summary: could not find block in DB"
require.LogsDoNotContain(t, hook, invalidStateErr)
require.LogsDoNotContain(t, hook, hookErr)
gb, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlock())
require.NoError(t, err)
require.NoError(t, service.saveInitSyncBlock(ctx, [32]byte{'a'}, gb))
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, [32]byte{'a'}))
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, [32]byte{'a'}, service.CurrentSlot()+1))
require.LogsContain(t, hook, invalidStateErr)

hook.Reset()
Expand All @@ -107,7 +107,7 @@ func TestService_forkchoiceUpdateWithExecution_exceptionalCases(t *testing.T) {
state: st,
}
service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(2, 1, [8]byte{1}, [32]byte{2})
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, r1))
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, r1, service.CurrentSlot()))
require.LogsDoNotContain(t, hook, invalidStateErr)
require.LogsDoNotContain(t, hook, hookErr)

Expand All @@ -124,7 +124,7 @@ func TestService_forkchoiceUpdateWithExecution_exceptionalCases(t *testing.T) {
state: st,
}
service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(2, 1, [8]byte{1}, [32]byte{2})
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, r1))
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, r1, service.CurrentSlot()+1))
require.LogsDoNotContain(t, hook, invalidStateErr)
require.LogsDoNotContain(t, hook, hookErr)
vId, payloadID, has := service.cfg.ProposerSlotIndexCache.GetProposerPayloadIDs(2, [32]byte{2})
Expand All @@ -134,7 +134,7 @@ func TestService_forkchoiceUpdateWithExecution_exceptionalCases(t *testing.T) {

// Test zero headRoot returns immediately.
headRoot := service.headRoot()
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, [32]byte{}))
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, [32]byte{}, service.CurrentSlot()+1))
require.Equal(t, service.headRoot(), headRoot)
}

Expand Down Expand Up @@ -184,7 +184,9 @@ func TestService_forkchoiceUpdateWithExecution_SameHeadRootNewProposer(t *testin

// Set head to be the same but proposing next slot
service.head.root = r
service.head.block = sb
service.head.state = st
service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(service.CurrentSlot()+1, 0, [8]byte{}, [32]byte{} /* root */)
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, r))
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, r, service.CurrentSlot()+1))

}
9 changes: 4 additions & 5 deletions beacon-chain/blockchain/process_attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1/attestation"
Expand Down Expand Up @@ -37,7 +36,7 @@ import (
//
// # Update latest messages for attesting indices
// update_latest_messages(store, indexed_attestation.attesting_indices, attestation)
func (s *Service) OnAttestation(ctx context.Context, a *ethpb.Attestation) error {
func (s *Service) OnAttestation(ctx context.Context, a *ethpb.Attestation, disparity time.Duration) error {
ctx, span := trace.StartSpan(ctx, "blockChain.onAttestation")
defer span.End()

Expand All @@ -63,7 +62,7 @@ func (s *Service) OnAttestation(ctx context.Context, a *ethpb.Attestation) error
genesisTime := uint64(s.genesisTime.Unix())

// Verify attestation target is from current epoch or previous epoch.
if err := verifyAttTargetEpoch(ctx, genesisTime, uint64(time.Now().Unix()), tgt); err != nil {
if err := verifyAttTargetEpoch(ctx, genesisTime, uint64(time.Now().Add(disparity).Unix()), tgt); err != nil {
return err
}

Expand All @@ -72,11 +71,11 @@ func (s *Service) OnAttestation(ctx context.Context, a *ethpb.Attestation) error
return errors.Wrap(err, "could not verify attestation beacon block")
}

// Note that LMG GHOST and FFG consistency check is ignored because it was performed in sync's validation pipeline:
// Note that LMD GHOST and FFG consistency check is ignored because it was performed in sync's validation pipeline:
// validate_aggregate_proof.go and validate_beacon_attestation.go

// Verify attestations can only affect the fork choice of subsequent slots.
if err := slots.VerifyTime(genesisTime, a.Data.Slot+1, params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil {
if err := slots.VerifyTime(genesisTime, a.Data.Slot+1, disparity); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/blockchain/process_attestation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestStore_OnAttestation_ErrorConditions(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := service.OnAttestation(ctx, tt.a)
err := service.OnAttestation(ctx, tt.a, 0)
if tt.wantedErr != "" {
assert.ErrorContains(t, tt.wantedErr, err)
} else {
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestStore_OnAttestation_Ok_DoublyLinkedTree(t *testing.T) {
state, blkRoot, err := prepareForkchoiceState(ctx, 0, tRoot, tRoot, params.BeaconConfig().ZeroHash, ojc, ofc)
require.NoError(t, err)
require.NoError(t, service.cfg.ForkChoiceStore.InsertNode(ctx, state, blkRoot))
require.NoError(t, service.OnAttestation(ctx, att[0]))
require.NoError(t, service.OnAttestation(ctx, att[0], 0))
}

func TestStore_SaveCheckpointState(t *testing.T) {
Expand Down
23 changes: 7 additions & 16 deletions beacon-chain/blockchain/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (s *Service) onBlock(ctx context.Context, signed interfaces.ReadOnlySignedB
}
newBlockHeadElapsedTime.Observe(float64(time.Since(start).Milliseconds()))

if err := s.forkchoiceUpdateWithExecution(ctx, headRoot); err != nil {
if err := s.forkchoiceUpdateWithExecution(ctx, headRoot, s.CurrentSlot()+1); err != nil {
return err
}

Expand Down Expand Up @@ -667,32 +667,23 @@ func (s *Service) fillMissingPayloadIDRoutine(ctx context.Context, stateFeed *ev
break
}

ticker := time.NewTicker(time.Second)
defer ticker.Stop()
attThreshold := params.BeaconConfig().SecondsPerSlot / 3
ticker := slots.NewSlotTickerWithOffset(s.genesisTime, time.Duration(attThreshold)*time.Second, params.BeaconConfig().SecondsPerSlot)
for {
select {
case ti := <-ticker.C:
if err := s.fillMissingBlockPayloadId(ctx, ti); err != nil {
case <-ticker.C():
if err := s.fillMissingBlockPayloadId(ctx); err != nil {
log.WithError(err).Error("Could not fill missing payload ID")
}
case <-s.ctx.Done():
case <-ctx.Done():
log.Debug("Context closed, exiting routine")
return
}
}
}()
}

// Returns true if time `t` is halfway through the slot in sec.
func atHalfSlot(t time.Time) bool {
s := params.BeaconConfig().SecondsPerSlot
return uint64(t.Second())%s == s/2
}

func (s *Service) fillMissingBlockPayloadId(ctx context.Context, ti time.Time) error {
if !atHalfSlot(ti) {
return nil
}
func (s *Service) fillMissingBlockPayloadId(ctx context.Context) error {
s.ForkChoicer().RLock()
highestReceivedSlot := s.cfg.ForkChoiceStore.HighestReceivedBlockSlot()
s.ForkChoicer().RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/blockchain/process_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2164,7 +2164,7 @@ func TestFillMissingBlockPayloadId_DiffSlotExitEarly(t *testing.T) {

service, err := NewService(ctx, opts...)
require.NoError(t, err)
require.NoError(t, service.fillMissingBlockPayloadId(ctx, time.Unix(int64(params.BeaconConfig().SecondsPerSlot/2), 0)))
require.NoError(t, service.fillMissingBlockPayloadId(ctx), 0)
}

// Helper function to simulate the block being on time or delayed for proposer
Expand Down
49 changes: 32 additions & 17 deletions beacon-chain/blockchain/receive_attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@ import (
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v3/config/features"
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/time/slots"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)

// reorgLateBlockCountAttestations is the time until the end of the slot in which we count
// attestations to see if we will reorg the incoming block
const reorgLateBlockCountAttestations = 2 * time.Second
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming is a little weird, I also prefer 2 more than 10 than having 12-2...


// AttestationStateFetcher allows for retrieving a beacon state corresponding to the block
// root of an attestation's target checkpoint.
type AttestationStateFetcher interface {
Expand Down Expand Up @@ -88,30 +94,40 @@ func (s *Service) spawnProcessAttestationsRoutine(stateFeed *event.Feed) {
}

st := slots.NewSlotTicker(s.genesisTime, params.BeaconConfig().SecondsPerSlot)
pat := slots.NewSlotTickerWithOffset(s.genesisTime, -reorgLateBlockCountAttestations, params.BeaconConfig().SecondsPerSlot)
for {
select {
case <-s.ctx.Done():
return
case <-pat.C():
s.ForkChoicer().Lock()
s.UpdateHead(s.ctx, s.CurrentSlot()+1)
s.ForkChoicer().Unlock()
case <-st.C():
s.ForkChoicer().Lock()
if err := s.ForkChoicer().NewSlot(s.ctx, s.CurrentSlot()); err != nil {
log.WithError(err).Error("Could not process new slot")
log.WithError(err).Error("could not process new slot")
}

if err := s.UpdateHead(s.ctx); err != nil {
log.WithError(err).Error("Could not process attestations and update head")
}
s.UpdateHead(s.ctx, s.CurrentSlot())
s.ForkChoicer().Unlock()
}
}
}()
}

// UpdateHead updates the canonical head of the chain based on information from fork-choice attestations and votes.
// It requires no external inputs.
func (s *Service) UpdateHead(ctx context.Context) error {
// The caller of this function MUST hold a lock in forkchoice
func (s *Service) UpdateHead(ctx context.Context, proposingSlot primitives.Slot) {
start := time.Now()
s.ForkChoicer().Lock()
defer s.ForkChoicer().Unlock()
s.processAttestations(ctx)

// This function is only called at 10 seconds or 0 seconds into the slot
disparity := params.BeaconNetworkConfig().MaximumGossipClockDisparity
if !features.Get().DisableReorgLateBlocks {
disparity += reorgLateBlockCountAttestations
}
s.processAttestations(ctx, disparity)

processAttsElapsedTime.Observe(float64(time.Since(start).Milliseconds()))

start = time.Now()
Expand All @@ -129,21 +145,20 @@ func (s *Service) UpdateHead(ctx context.Context) error {
}).Debug("Head changed due to attestations")
}
s.headLock.RUnlock()
if err := s.forkchoiceUpdateWithExecution(ctx, newHeadRoot); err != nil {
return err
if err := s.forkchoiceUpdateWithExecution(s.ctx, newHeadRoot, proposingSlot); err != nil {
log.WithError(err).Error("could not update forkchoice")
}
return nil
}

// This processes fork choice attestations from the pool to account for validator votes and fork choice.
func (s *Service) processAttestations(ctx context.Context) {
func (s *Service) processAttestations(ctx context.Context, disparity time.Duration) {
atts := s.cfg.AttPool.ForkchoiceAttestations()
for _, a := range atts {
// Based on the spec, don't process the attestation until the subsequent slot.
// This delays consideration in the fork choice until their slot is in the past.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/fork-choice.md#validate_on_attestation
nextSlot := a.Data.Slot + 1
if err := slots.VerifyTime(uint64(s.genesisTime.Unix()), nextSlot, params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil {
if err := slots.VerifyTime(uint64(s.genesisTime.Unix()), nextSlot, disparity); err != nil {
continue
}

Expand All @@ -161,7 +176,7 @@ func (s *Service) processAttestations(ctx context.Context) {
continue
}

if err := s.receiveAttestationNoPubsub(ctx, a); err != nil {
if err := s.receiveAttestationNoPubsub(ctx, a, disparity); err != nil {
log.WithFields(logrus.Fields{
"slot": a.Data.Slot,
"committeeIndex": a.Data.CommitteeIndex,
Expand All @@ -178,11 +193,11 @@ func (s *Service) processAttestations(ctx context.Context) {
// 1. Validate attestation, update validator's latest vote
// 2. Apply fork choice to the processed attestation
// 3. Save latest head info
func (s *Service) receiveAttestationNoPubsub(ctx context.Context, att *ethpb.Attestation) error {
func (s *Service) receiveAttestationNoPubsub(ctx context.Context, att *ethpb.Attestation, disparity time.Duration) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.receiveAttestationNoPubsub")
defer span.End()

if err := s.OnAttestation(ctx, att); err != nil {
if err := s.OnAttestation(ctx, att, disparity); err != nil {
return errors.Wrap(err, "could not process attestation")
}

Expand Down
Loading