Skip to content

Commit

Permalink
validator startup deadline bug (#12049)
Browse files Browse the repository at this point in the history
* trying fix for validator startup deadline

* updating deadline duration to be set by params

* adding a runner test

* trying nishant's suggestion

* editing based on review feedback

* reverting a change

* fixing epoch deadline

* reverting aliasing
  • Loading branch information
james-prysm authored Mar 14, 2023
1 parent f6eb42b commit 7d5d30a
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 33 deletions.
2 changes: 1 addition & 1 deletion validator/accounts/testing/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (MockValidator) HasProposerSettings() bool {
}

// PushProposerSettings for mocking
func (_ MockValidator) PushProposerSettings(_ context.Context, _ keymanager.IKeymanager) error {
func (_ MockValidator) PushProposerSettings(_ context.Context, _ keymanager.IKeymanager, _ time.Time) error {
panic("implement me")
}

Expand Down
2 changes: 1 addition & 1 deletion validator/client/iface/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type Validator interface {
ReceiveBlocks(ctx context.Context, connectionErrorChannel chan<- error)
HandleKeyReload(ctx context.Context, currentKeys [][fieldparams.BLSPubkeyLength]byte) (bool, error)
CheckDoppelGanger(ctx context.Context) error
PushProposerSettings(ctx context.Context, km keymanager.IKeymanager) error
PushProposerSettings(ctx context.Context, km keymanager.IKeymanager, deadline time.Time) error
SignValidatorRegistrationRequest(ctx context.Context, signer SigningFunc, newValidatorRegistration *ethpb.ValidatorRegistrationV1) (*ethpb.SignedValidatorRegistrationV1, error)
ProposerSettings() *validatorserviceconfig.ProposerSettings
SetProposerSettings(*validatorserviceconfig.ProposerSettings)
Expand Down
18 changes: 10 additions & 8 deletions validator/client/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/prysmaticlabs/prysm/v3/cmd/validator/flags"
fieldparams "github.com/prysmaticlabs/prysm/v3/config/fieldparams"
"github.com/prysmaticlabs/prysm/v3/config/params"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v3/time/slots"
"github.com/prysmaticlabs/prysm/v3/validator/client/iface"
Expand Down Expand Up @@ -57,7 +57,8 @@ func run(ctx context.Context, v iface.Validator) {
if v.ProposerSettings() != nil {
log.Infof("Validator client started with provided proposer settings that sets options such as fee recipient"+
" and will periodically update the beacon node and custom builder (if --%s)", flags.EnableBuilderFlag.Name)
if err := v.PushProposerSettings(ctx, km); err != nil {
deadline := time.Now().Add(time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second)
if err := v.PushProposerSettings(ctx, km, deadline); err != nil {
if errors.Is(err, ErrBuilderValidatorRegistration) {
log.WithError(err).Warn("Push proposer settings error")
} else {
Expand Down Expand Up @@ -126,8 +127,9 @@ func run(ctx context.Context, v iface.Validator) {

if slots.IsEpochStart(slot) && v.ProposerSettings() != nil {
go func() {
//deadline set for next epoch rounded up
if err := v.PushProposerSettings(ctx, km); err != nil {
//deadline set for end of epoch
epochDeadline := v.SlotDeadline(slot + params.BeaconConfig().SlotsPerEpoch - 1)
if err := v.PushProposerSettings(ctx, km, epochDeadline); err != nil {
log.WithError(err).Warn("Failed to update proposer settings")
}
}()
Expand All @@ -152,11 +154,11 @@ func run(ctx context.Context, v iface.Validator) {
}
}

func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) (types.Slot, error) {
func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) (primitives.Slot, error) {
ticker := time.NewTicker(backOffPeriod)
defer ticker.Stop()

var headSlot types.Slot
var headSlot primitives.Slot
firstTime := true
for {
if !firstTime {
Expand Down Expand Up @@ -218,7 +220,7 @@ func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) (
return headSlot, nil
}

func performRoles(slotCtx context.Context, allRoles map[[48]byte][]iface.ValidatorRole, v iface.Validator, slot types.Slot, wg *sync.WaitGroup, span *trace.Span) {
func performRoles(slotCtx context.Context, allRoles map[[48]byte][]iface.ValidatorRole, v iface.Validator, slot primitives.Slot, wg *sync.WaitGroup, span *trace.Span) {
for pubKey, roles := range allRoles {
wg.Add(len(roles))
for _, role := range roles {
Expand Down Expand Up @@ -268,7 +270,7 @@ func isConnectionError(err error) bool {
return err != nil && errors.Is(err, iface.ErrConnectionIssue)
}

func handleAssignmentError(err error, slot types.Slot) {
func handleAssignmentError(err error, slot primitives.Slot) {
if errCode, ok := status.FromError(err); ok && errCode.Code() == codes.NotFound {
log.WithField(
"epoch", slot/params.BeaconConfig().SlotsPerEpoch,
Expand Down
82 changes: 65 additions & 17 deletions validator/client/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
fieldparams "github.com/prysmaticlabs/prysm/v3/config/fieldparams"
"github.com/prysmaticlabs/prysm/v3/config/params"
validatorserviceconfig "github.com/prysmaticlabs/prysm/v3/config/validator/service"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/testing/assert"
"github.com/prysmaticlabs/prysm/v3/testing/require"
"github.com/prysmaticlabs/prysm/v3/validator/client/iface"
Expand Down Expand Up @@ -68,8 +68,8 @@ func TestUpdateDuties_NextSlot(t *testing.T) {
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
ctx, cancel := context.WithCancel(context.Background())

slot := types.Slot(55)
ticker := make(chan types.Slot)
slot := primitives.Slot(55)
ticker := make(chan primitives.Slot)
v.NextSlotRet = ticker
go func() {
ticker <- slot
Expand All @@ -88,8 +88,8 @@ func TestUpdateDuties_HandlesError(t *testing.T) {
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
ctx, cancel := context.WithCancel(context.Background())

slot := types.Slot(55)
ticker := make(chan types.Slot)
slot := primitives.Slot(55)
ticker := make(chan primitives.Slot)
v.NextSlotRet = ticker
go func() {
ticker <- slot
Expand All @@ -107,8 +107,8 @@ func TestRoleAt_NextSlot(t *testing.T) {
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
ctx, cancel := context.WithCancel(context.Background())

slot := types.Slot(55)
ticker := make(chan types.Slot)
slot := primitives.Slot(55)
ticker := make(chan primitives.Slot)
v.NextSlotRet = ticker
go func() {
ticker <- slot
Expand All @@ -126,8 +126,8 @@ func TestAttests_NextSlot(t *testing.T) {
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
ctx, cancel := context.WithCancel(context.Background())

slot := types.Slot(55)
ticker := make(chan types.Slot)
slot := primitives.Slot(55)
ticker := make(chan primitives.Slot)
v.NextSlotRet = ticker
v.RolesAtRet = []iface.ValidatorRole{iface.RoleAttester}
go func() {
Expand All @@ -146,8 +146,8 @@ func TestProposes_NextSlot(t *testing.T) {
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
ctx, cancel := context.WithCancel(context.Background())

slot := types.Slot(55)
ticker := make(chan types.Slot)
slot := primitives.Slot(55)
ticker := make(chan primitives.Slot)
v.NextSlotRet = ticker
v.RolesAtRet = []iface.ValidatorRole{iface.RoleProposer}
go func() {
Expand All @@ -166,8 +166,8 @@ func TestBothProposesAndAttests_NextSlot(t *testing.T) {
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
ctx, cancel := context.WithCancel(context.Background())

slot := types.Slot(55)
ticker := make(chan types.Slot)
slot := primitives.Slot(55)
ticker := make(chan primitives.Slot)
v.NextSlotRet = ticker
v.RolesAtRet = []iface.ValidatorRole{iface.RoleAttester, iface.RoleProposer}
go func() {
Expand All @@ -189,8 +189,8 @@ func TestAllValidatorsAreExited_NextSlot(t *testing.T) {
ctx, cancel := context.WithCancel(context.WithValue(context.Background(), testutil.AllValidatorsAreExitedCtxKey, true))
hook := logTest.NewGlobal()

slot := types.Slot(55)
ticker := make(chan types.Slot)
slot := primitives.Slot(55)
ticker := make(chan primitives.Slot)
v.NextSlotRet = ticker
go func() {
ticker <- slot
Expand Down Expand Up @@ -243,7 +243,7 @@ func TestUpdateProposerSettingsAt_EpochStart(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
hook := logTest.NewGlobal()
slot := params.BeaconConfig().SlotsPerEpoch
ticker := make(chan types.Slot)
ticker := make(chan primitives.Slot)
v.NextSlotRet = ticker
go func() {
ticker <- slot
Expand All @@ -255,6 +255,54 @@ func TestUpdateProposerSettingsAt_EpochStart(t *testing.T) {
assert.LogsContain(t, hook, "updated proposer settings")
}

func TestUpdateProposerSettingsAt_EpochEndExceeded(t *testing.T) {
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, ProposerSettingWait: time.Duration(params.BeaconConfig().SecondsPerSlot+1) * time.Second}
v.SetProposerSettings(&validatorserviceconfig.ProposerSettings{
DefaultConfig: &validatorserviceconfig.ProposerOption{
FeeRecipientConfig: &validatorserviceconfig.FeeRecipientConfig{
FeeRecipient: common.HexToAddress("0x046Fb65722E7b2455012BFEBf6177F1D2e9738D9"),
},
},
})
ctx, cancel := context.WithCancel(context.Background())
hook := logTest.NewGlobal()
slot := params.BeaconConfig().SlotsPerEpoch - 1 //have it set close to the end of epoch
ticker := make(chan primitives.Slot)
v.NextSlotRet = ticker
go func() {
ticker <- slot
cancel()
}()

run(ctx, v)
// can't test "Failed to update proposer settings" because of log.fatal
assert.LogsContain(t, hook, "deadline exceeded")
}

func TestUpdateProposerSettingsAt_EpochEndOk(t *testing.T) {
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, ProposerSettingWait: time.Duration(params.BeaconConfig().SecondsPerSlot-1) * time.Second}
v.SetProposerSettings(&validatorserviceconfig.ProposerSettings{
DefaultConfig: &validatorserviceconfig.ProposerOption{
FeeRecipientConfig: &validatorserviceconfig.FeeRecipientConfig{
FeeRecipient: common.HexToAddress("0x046Fb65722E7b2455012BFEBf6177F1D2e9738D9"),
},
},
})
ctx, cancel := context.WithCancel(context.Background())
hook := logTest.NewGlobal()
slot := params.BeaconConfig().SlotsPerEpoch - 1 //have it set close to the end of epoch
ticker := make(chan primitives.Slot)
v.NextSlotRet = ticker
go func() {
ticker <- slot
cancel()
}()

run(ctx, v)
// can't test "Failed to update proposer settings" because of log.fatal
assert.LogsContain(t, hook, "Mock updated proposer settings")
}

func TestUpdateProposerSettings_ContinuesAfterValidatorRegistrationFails(t *testing.T) {
errSomeotherError := errors.New("some internal error")
v := &testutil.FakeValidator{
Expand All @@ -271,7 +319,7 @@ func TestUpdateProposerSettings_ContinuesAfterValidatorRegistrationFails(t *test
ctx, cancel := context.WithCancel(context.Background())
hook := logTest.NewGlobal()
slot := params.BeaconConfig().SlotsPerEpoch
ticker := make(chan types.Slot)
ticker := make(chan primitives.Slot)
v.NextSlotRet = ticker
go func() {
ticker <- slot
Expand Down
14 changes: 13 additions & 1 deletion validator/client/testutil/mock_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type FakeValidator struct {
PubkeyToIndexMap map[[fieldparams.BLSPubkeyLength]byte]uint64
PubkeysToStatusesMap map[[fieldparams.BLSPubkeyLength]byte]ethpb.ValidatorStatus
proposerSettings *validatorserviceconfig.ProposerSettings
ProposerSettingWait time.Duration
Km keymanager.IKeymanager
}

Expand Down Expand Up @@ -258,10 +259,21 @@ func (*FakeValidator) HasProposerSettings() bool {
}

// PushProposerSettings for mocking
func (fv *FakeValidator) PushProposerSettings(_ context.Context, _ keymanager.IKeymanager) error {
func (fv *FakeValidator) PushProposerSettings(ctx context.Context, _ keymanager.IKeymanager, deadline time.Time) error {
nctx, cancel := context.WithDeadline(ctx, deadline)
ctx = nctx
defer cancel()
time.Sleep(fv.ProposerSettingWait)
if ctx.Err() == context.DeadlineExceeded {
log.Error("deadline exceeded")
// can't return error or it will trigger a log.fatal
return nil
}

if fv.ProposerSettingsErr != nil {
return fv.ProposerSettingsErr
}

log.Infoln("Mock updated proposer settings")
return nil
}
Expand Down
7 changes: 3 additions & 4 deletions validator/client/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,13 +985,12 @@ func (v *validator) SetProposerSettings(settings *validatorserviceconfig.Propose
}

// PushProposerSettings calls the prepareBeaconProposer RPC to set the fee recipient and also the register validator API if using a custom builder.
func (v *validator) PushProposerSettings(ctx context.Context, km keymanager.IKeymanager) error {
func (v *validator) PushProposerSettings(ctx context.Context, km keymanager.IKeymanager, deadline time.Time) error {
if km == nil {
return errors.New("keymanager is nil when calling PrepareBeaconProposer")
}

deadline := v.SlotDeadline(slots.RoundUpToNearestEpoch(slots.CurrentSlot(v.genesisTime)))
ctx, cancel := context.WithDeadline(ctx, deadline)
nctx, cancel := context.WithDeadline(ctx, deadline)
ctx = nctx
defer cancel()

pubkeys, err := km.FetchValidatingPublicKeys(ctx)
Expand Down
3 changes: 2 additions & 1 deletion validator/client/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1958,7 +1958,8 @@ func TestValidator_PushProposerSettings(t *testing.T) {
require.Equal(t, len(tt.mockExpectedRequests), len(signedRegisterValidatorRequests))
require.Equal(t, len(signedRegisterValidatorRequests), len(v.signedValidatorRegistrations))
}
if err := v.PushProposerSettings(ctx, km); tt.err != "" {
deadline := time.Now().Add(time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second)
if err := v.PushProposerSettings(ctx, km, deadline); tt.err != "" {
assert.ErrorContains(t, tt.err, err)
}
if len(tt.logMessages) > 0 {
Expand Down

0 comments on commit 7d5d30a

Please sign in to comment.