From 723ad4bfa42d7bd3e99cbb763da4c7f144047149 Mon Sep 17 00:00:00 2001 From: james-prysm Date: Wed, 10 Jan 2024 14:22:54 -0600 Subject: [PATCH 01/20] removing timeout on wait for activation, instead switched to an event driven approach --- validator/client/key_reload.go | 3 - validator/client/validator.go | 18 ++--- validator/client/wait_for_activation.go | 88 +++++++++---------------- 3 files changed, 38 insertions(+), 71 deletions(-) diff --git a/validator/client/key_reload.go b/validator/client/key_reload.go index cbfc67a74eed..3a768b40d871 100644 --- a/validator/client/key_reload.go +++ b/validator/client/key_reload.go @@ -49,9 +49,6 @@ func (v *validator) HandleKeyReload(ctx context.Context, currentKeys [][fieldpar } anyActive = v.checkAndLogValidatorStatus(statuses, valCount) - if anyActive { - logActiveValidatorStatus(statuses) - } return anyActive, nil } diff --git a/validator/client/validator.go b/validator/client/validator.go index 9678992320b9..47c1fb1261c2 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -387,6 +387,12 @@ func (v *validator) checkAndLogValidatorStatus(statuses []*validatorStatus, acti } case ethpb.ValidatorStatus_ACTIVE, ethpb.ValidatorStatus_EXITING: validatorActivated = true + if status.status.Status == ethpb.ValidatorStatus_ACTIVE { + log.WithFields(logrus.Fields{ + "publicKey": fmt.Sprintf("%#x", bytesutil.Trunc(status.publicKey)), + "index": status.index, + }).Info("Validator activated") + } case ethpb.ValidatorStatus_EXITED: log.Info("Validator exited") case ethpb.ValidatorStatus_INVALID: @@ -400,18 +406,6 @@ func (v *validator) checkAndLogValidatorStatus(statuses []*validatorStatus, acti return validatorActivated } -func logActiveValidatorStatus(statuses []*validatorStatus) { - for _, s := range statuses { - if s.status.Status != ethpb.ValidatorStatus_ACTIVE { - continue - } - log.WithFields(logrus.Fields{ - "publicKey": fmt.Sprintf("%#x", bytesutil.Trunc(s.publicKey)), - "index": s.index, - }).Info("Validator activated") - } -} - // CanonicalHeadSlot returns the slot of canonical block currently found in the // beacon chain via RPC. func (v *validator) CanonicalHeadSlot(ctx context.Context) (primitives.Slot, error) { diff --git a/validator/client/wait_for_activation.go b/validator/client/wait_for_activation.go index c2c3d10fab34..f79d73df1be3 100644 --- a/validator/client/wait_for_activation.go +++ b/validator/client/wait_for_activation.go @@ -5,17 +5,16 @@ import ( "io" "time" - validator2 "github.com/prysmaticlabs/prysm/v4/consensus-types/validator" - "github.com/prysmaticlabs/prysm/v4/validator/client/iface" - "github.com/pkg/errors" fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" "github.com/prysmaticlabs/prysm/v4/config/params" + validator2 "github.com/prysmaticlabs/prysm/v4/consensus-types/validator" "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v4/math" "github.com/prysmaticlabs/prysm/v4/monitoring/tracing" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v4/time/slots" + "github.com/prysmaticlabs/prysm/v4/validator/client/iface" "go.opencensus.io/trace" ) @@ -33,18 +32,18 @@ func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan c if err != nil { return err } + // subscribe to the channel if it's the first time sub := km.SubscribeAccountChanges(accountsChangedChan) defer func() { sub.Unsubscribe() close(accountsChangedChan) }() } - return v.internalWaitForActivation(ctx, accountsChangedChan) } // internalWaitForActivation performs the following: -// 1) While the key manager is empty, poll the key manager until some validator keys exist. +// 1) While the key manager is empty, subscribe to keymanager changes until some validator keys exist. // 2) Open a server side stream for activation events against the given keys. // 3) In another go routine, the key manager is monitored for updates and emits an update event on // the accountsChangedChan. When an event signal is received, restart the internalWaitForActivation routine. @@ -53,65 +52,41 @@ func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan c func (v *validator) internalWaitForActivation(ctx context.Context, accountsChangedChan <-chan [][fieldparams.BLSPubkeyLength]byte) error { ctx, span := trace.StartSpan(ctx, "validator.WaitForActivation") defer span.End() - validatingKeys, err := v.keyManager.FetchValidatingPublicKeys(ctx) if err != nil { return errors.Wrap(err, "could not fetch validating keys") } if len(validatingKeys) == 0 { log.Warn(msgNoKeysFetched) - - ticker := time.NewTicker(keyRefetchPeriod) - defer ticker.Stop() - for { - select { - case <-ticker.C: - validatingKeys, err = v.keyManager.FetchValidatingPublicKeys(ctx) - if err != nil { - return errors.Wrap(err, msgCouldNotFetchKeys) - } - if len(validatingKeys) == 0 { - log.Warn(msgNoKeysFetched) - continue - } - case <-ctx.Done(): - log.Debug("Context closed, exiting fetching validating keys") - return ctx.Err() - } - break - } } - - req := ðpb.ValidatorActivationRequest{ - PublicKeys: bytesutil.FromBytes48Array(validatingKeys), - } - stream, err := v.validatorClient.WaitForActivation(ctx, req) - if err != nil { - tracing.AnnotateError(span, err) - attempts := streamAttempts(ctx) - log.WithError(err).WithField("attempts", attempts). - Error("Stream broken while waiting for activation. Reconnecting...") - // Reconnection attempt backoff, up to 60s. - time.Sleep(time.Second * time.Duration(math.Min(uint64(attempts), 60))) - return v.internalWaitForActivation(incrementRetries(ctx), accountsChangedChan) - } - - if err = v.handleAccountsChanged(ctx, accountsChangedChan, &stream, span); err != nil { - return err - } - - v.ticker = slots.NewSlotTicker(time.Unix(int64(v.genesisTime), 0), params.BeaconConfig().SecondsPerSlot) - return nil -} - -func (v *validator) handleAccountsChanged(ctx context.Context, accountsChangedChan <-chan [][fieldparams.BLSPubkeyLength]byte, stream *ethpb.BeaconNodeValidator_WaitForActivationClient, span *trace.Span) error { + // loop while there are no validator keys ... for { select { + case <-ctx.Done(): + log.Debug("Context closed, exiting fetching validating keys") + return ctx.Err() case <-accountsChangedChan: - // Accounts (keys) changed, restart the process. + // if the accounts changed try it again return v.internalWaitForActivation(ctx, accountsChangedChan) default: - res, err := (*stream).Recv() + if len(validatingKeys) == 0 { + continue + } + stream, err := v.validatorClient.WaitForActivation(ctx, ðpb.ValidatorActivationRequest{ + PublicKeys: bytesutil.FromBytes48Array(validatingKeys), + }) + if err != nil { + tracing.AnnotateError(span, err) + attempts := streamAttempts(ctx) + log.WithError(err).WithField("attempts", attempts). + Error("Stream broken while waiting for activation. Reconnecting...") + // Reconnection attempt backoff, up to 60s. + time.Sleep(time.Second * time.Duration(math.Min(uint64(attempts), 60))) + return v.internalWaitForActivation(incrementRetries(ctx), accountsChangedChan) + } + + // Recv polls for validator statuses + res, err := stream.Recv() // If the stream is closed, we stop the loop. if errors.Is(err, io.EOF) { break @@ -150,15 +125,16 @@ func (v *validator) handleAccountsChanged(ctx context.Context, accountsChangedCh valCount = int64(valCounts[0].Count) } - valActivated := v.checkAndLogValidatorStatus(statuses, valCount) - if valActivated { - logActiveValidatorStatus(statuses) - } else { + if !v.checkAndLogValidatorStatus(statuses, valCount) { continue } } + // If a validator is active, break out of this loop break } + + // reset the ticker when they are all active + v.ticker = slots.NewSlotTicker(time.Unix(int64(v.genesisTime), 0), params.BeaconConfig().SecondsPerSlot) return nil } From c81fb14a0c7b17a9ad5011b2ad0a57412f0f9021 Mon Sep 17 00:00:00 2001 From: james-prysm Date: Wed, 10 Jan 2024 16:24:23 -0600 Subject: [PATCH 02/20] fixing unit tests --- validator/client/validator.go | 1 - validator/client/wait_for_activation_test.go | 41 +++++++++++++------- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/validator/client/validator.go b/validator/client/validator.go index 47c1fb1261c2..0cb02ddee6d6 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -53,7 +53,6 @@ import ( // keyFetchPeriod is the frequency that we try to refetch validating keys // in case no keys were fetched previously. var ( - keyRefetchPeriod = 30 * time.Second ErrBuilderValidatorRegistration = errors.New("Builder API validator registration unsuccessful") ErrValidatorsAllExited = errors.New("All validators are exited, no more work to perform...") ) diff --git a/validator/client/wait_for_activation_test.go b/validator/client/wait_for_activation_test.go index e28aa2451481..c6f3fa616c65 100644 --- a/validator/client/wait_for_activation_test.go +++ b/validator/client/wait_for_activation_test.go @@ -39,7 +39,7 @@ func TestWaitActivation_ContextCanceled(t *testing.T) { beaconClient: beaconClient, } clientStream := mock.NewMockBeaconNodeValidator_WaitForActivationClient(ctrl) - + ctx, cancel := context.WithCancel(context.Background()) validatorClient.EXPECT().WaitForActivation( gomock.Any(), ðpb.ValidatorActivationRequest{ @@ -49,9 +49,7 @@ func TestWaitActivation_ContextCanceled(t *testing.T) { clientStream.EXPECT().Recv().Return( ðpb.ValidatorActivationResponse{}, nil, - ) - ctx, cancel := context.WithCancel(context.Background()) - cancel() + ).Do(func() { cancel() }) assert.ErrorContains(t, cancelledCtx, v.WaitForActivation(ctx, nil)) } @@ -193,12 +191,6 @@ func TestWaitForActivation_Exiting(t *testing.T) { } func TestWaitForActivation_RefetchKeys(t *testing.T) { - originalPeriod := keyRefetchPeriod - defer func() { - keyRefetchPeriod = originalPeriod - }() - keyRefetchPeriod = 1 * time.Second - hook := logTest.NewGlobal() ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -207,8 +199,7 @@ func TestWaitForActivation_RefetchKeys(t *testing.T) { prysmBeaconClient := validatormock.NewMockPrysmBeaconChainClient(ctrl) kp := randKeypair(t) - km := newMockKeymanager(t, kp) - km.fetchNoKeys = true + km := newMockKeymanager(t) v := validator{ validatorClient: validatorClient, @@ -233,7 +224,19 @@ func TestWaitForActivation_RefetchKeys(t *testing.T) { clientStream.EXPECT().Recv().Return( resp, nil) - assert.NoError(t, v.internalWaitForActivation(context.Background(), make(chan [][fieldparams.BLSPubkeyLength]byte)), "Could not wait for activation") + accountChan := make(chan [][fieldparams.BLSPubkeyLength]byte) + sub := km.SubscribeAccountChanges(accountChan) + defer func() { + sub.Unsubscribe() + close(accountChan) + }() + // update the accounts after a delay + go func() { + time.Sleep(2 * time.Second) + require.NoError(t, km.add(kp)) + km.SimulateAccountChanges([][48]byte{kp.pub}) + }() + assert.NoError(t, v.internalWaitForActivation(context.Background(), accountChan), "Could not wait for activation") assert.LogsContain(t, hook, msgNoKeysFetched) assert.LogsContain(t, hook, "Validator activated") } @@ -265,7 +268,11 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) { ðpb.ValidatorActivationRequest{ PublicKeys: [][]byte{inactive.pub[:]}, }, - ).Return(inactiveClientStream, nil) + ).DoAndReturn(func(ctx context.Context, in *ethpb.ValidatorActivationRequest) (*mock.MockBeaconNodeValidator_WaitForActivationClient, error) { + //delay a bit so that other key can be added + time.Sleep(time.Second * 2) + return inactiveClientStream, nil + }) prysmBeaconClient.EXPECT().GetValidatorCount( gomock.Any(), "head", @@ -353,7 +360,11 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) { ðpb.ValidatorActivationRequest{ PublicKeys: [][]byte{inactivePubKey[:]}, }, - ).Return(inactiveClientStream, nil) + ).DoAndReturn(func(ctx context.Context, in *ethpb.ValidatorActivationRequest) (*mock.MockBeaconNodeValidator_WaitForActivationClient, error) { + //delay a bit so that other key can be added + time.Sleep(time.Second * 2) + return inactiveClientStream, nil + }) prysmBeaconClient.EXPECT().GetValidatorCount( gomock.Any(), "head", From c0a4afdba16cb67ecf06bd0ebfd43a205b54a077 Mon Sep 17 00:00:00 2001 From: james-prysm Date: Wed, 10 Jan 2024 16:34:12 -0600 Subject: [PATCH 03/20] linting --- validator/client/wait_for_activation.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator/client/wait_for_activation.go b/validator/client/wait_for_activation.go index f79d73df1be3..a2c52dad8e4b 100644 --- a/validator/client/wait_for_activation.go +++ b/validator/client/wait_for_activation.go @@ -54,7 +54,7 @@ func (v *validator) internalWaitForActivation(ctx context.Context, accountsChang defer span.End() validatingKeys, err := v.keyManager.FetchValidatingPublicKeys(ctx) if err != nil { - return errors.Wrap(err, "could not fetch validating keys") + return errors.Wrap(err, msgCouldNotFetchKeys) } if len(validatingKeys) == 0 { log.Warn(msgNoKeysFetched) From f79b41109f5dffc88d58a2d92c6c0340263f121d Mon Sep 17 00:00:00 2001 From: james-prysm Date: Wed, 10 Jan 2024 21:26:35 -0600 Subject: [PATCH 04/20] simplifying return --- validator/client/key_reload.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/validator/client/key_reload.go b/validator/client/key_reload.go index 3a768b40d871..bb8add0fde3f 100644 --- a/validator/client/key_reload.go +++ b/validator/client/key_reload.go @@ -48,7 +48,5 @@ func (v *validator) HandleKeyReload(ctx context.Context, currentKeys [][fieldpar valCount = int64(valCounts[0].Count) } - anyActive = v.checkAndLogValidatorStatus(statuses, valCount) - - return anyActive, nil + return v.checkAndLogValidatorStatus(statuses, valCount), nil } From d8b3be4ac2fcd0e43bed3afe305f9507bfe1508b Mon Sep 17 00:00:00 2001 From: james-prysm Date: Thu, 11 Jan 2024 10:09:32 -0600 Subject: [PATCH 05/20] adding sleep for the remaining slot to avoid cpu spikes --- validator/client/wait_for_activation.go | 1 + validator/client/wait_for_activation_test.go | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/validator/client/wait_for_activation.go b/validator/client/wait_for_activation.go index a2c52dad8e4b..0ef9e7fc1adf 100644 --- a/validator/client/wait_for_activation.go +++ b/validator/client/wait_for_activation.go @@ -70,6 +70,7 @@ func (v *validator) internalWaitForActivation(ctx context.Context, accountsChang return v.internalWaitForActivation(ctx, accountsChangedChan) default: if len(validatingKeys) == 0 { + time.Sleep((time.Duration(params.BeaconConfig().SecondsPerSlot) - slots.TimeIntoSlot(v.genesisTime)) * time.Second) // sleep for the rest of the slot continue } stream, err := v.validatorClient.WaitForActivation(ctx, ðpb.ValidatorActivationRequest{ diff --git a/validator/client/wait_for_activation_test.go b/validator/client/wait_for_activation_test.go index c6f3fa616c65..a637e289b231 100644 --- a/validator/client/wait_for_activation_test.go +++ b/validator/client/wait_for_activation_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/prysmaticlabs/prysm/v4/config/params" validatorType "github.com/prysmaticlabs/prysm/v4/consensus-types/validator" "github.com/prysmaticlabs/prysm/v4/validator/client/iface" @@ -191,6 +192,11 @@ func TestWaitForActivation_Exiting(t *testing.T) { } func TestWaitForActivation_RefetchKeys(t *testing.T) { + params.SetupTestConfigCleanup(t) + cfg := params.MainnetConfig().Copy() + cfg.ConfigName = "test" + cfg.SecondsPerSlot = 1 + params.OverrideBeaconConfig(cfg) hook := logTest.NewGlobal() ctrl := gomock.NewController(t) defer ctrl.Finish() From 5d141df979d3b07a7cb85d5ccac7b92b0cae50bd Mon Sep 17 00:00:00 2001 From: james-prysm Date: Thu, 11 Jan 2024 10:57:53 -0600 Subject: [PATCH 06/20] removing ifstatement on log --- validator/client/validator.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/validator/client/validator.go b/validator/client/validator.go index a73ca24f8a1e..b1a618b43aa8 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -386,12 +386,10 @@ func (v *validator) checkAndLogValidatorStatus(statuses []*validatorStatus, acti } case ethpb.ValidatorStatus_ACTIVE, ethpb.ValidatorStatus_EXITING: validatorActivated = true - if status.status.Status == ethpb.ValidatorStatus_ACTIVE { - log.WithFields(logrus.Fields{ - "publicKey": fmt.Sprintf("%#x", bytesutil.Trunc(status.publicKey)), - "index": status.index, - }).Info("Validator activated") - } + log.WithFields(logrus.Fields{ + "publicKey": fmt.Sprintf("%#x", bytesutil.Trunc(status.publicKey)), + "index": status.index, + }).Info("Validator activated") case ethpb.ValidatorStatus_EXITED: log.Info("Validator exited") case ethpb.ValidatorStatus_INVALID: From 9435b40f3f97dd2f71baf65d79fa2a18b905c804 Mon Sep 17 00:00:00 2001 From: james-prysm Date: Thu, 11 Jan 2024 14:53:14 -0600 Subject: [PATCH 07/20] removing ifstatement on log --- validator/client/wait_for_activation.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/validator/client/wait_for_activation.go b/validator/client/wait_for_activation.go index 0ef9e7fc1adf..6aa08f5be6d1 100644 --- a/validator/client/wait_for_activation.go +++ b/validator/client/wait_for_activation.go @@ -70,7 +70,9 @@ func (v *validator) internalWaitForActivation(ctx context.Context, accountsChang return v.internalWaitForActivation(ctx, accountsChangedChan) default: if len(validatingKeys) == 0 { - time.Sleep((time.Duration(params.BeaconConfig().SecondsPerSlot) - slots.TimeIntoSlot(v.genesisTime)) * time.Second) // sleep for the rest of the slot + remainingTime := params.BeaconConfig().SecondsPerSlot + log.Debugf("Waiting for active validator keys... secs: %d", remainingTime) + time.Sleep(time.Duration(remainingTime) * time.Second) // sleep for the rest of the slot continue } stream, err := v.validatorClient.WaitForActivation(ctx, ðpb.ValidatorActivationRequest{ From 58eab7bbb7aae61e3ba361448f59900d90c2379d Mon Sep 17 00:00:00 2001 From: james-prysm Date: Thu, 11 Jan 2024 16:00:01 -0600 Subject: [PATCH 08/20] improving switch statement --- validator/client/validator.go | 2 +- validator/client/wait_for_activation.go | 126 +++++++++++------------- 2 files changed, 58 insertions(+), 70 deletions(-) diff --git a/validator/client/validator.go b/validator/client/validator.go index b1a618b43aa8..25679dcecc58 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -59,7 +59,7 @@ var ( var ( msgCouldNotFetchKeys = "could not fetch validating keys" - msgNoKeysFetched = "No validating keys fetched. Trying again" + msgNoKeysFetched = "No validating keys fetched. Waiting for keys..." ) type validator struct { diff --git a/validator/client/wait_for_activation.go b/validator/client/wait_for_activation.go index 6aa08f5be6d1..0fab2682708a 100644 --- a/validator/client/wait_for_activation.go +++ b/validator/client/wait_for_activation.go @@ -7,13 +7,11 @@ import ( "github.com/pkg/errors" fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" - "github.com/prysmaticlabs/prysm/v4/config/params" validator2 "github.com/prysmaticlabs/prysm/v4/consensus-types/validator" "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v4/math" "github.com/prysmaticlabs/prysm/v4/monitoring/tracing" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" - "github.com/prysmaticlabs/prysm/v4/time/slots" "github.com/prysmaticlabs/prysm/v4/validator/client/iface" "go.opencensus.io/trace" ) @@ -56,11 +54,9 @@ func (v *validator) internalWaitForActivation(ctx context.Context, accountsChang if err != nil { return errors.Wrap(err, msgCouldNotFetchKeys) } - if len(validatingKeys) == 0 { - log.Warn(msgNoKeysFetched) - } // loop while there are no validator keys ... - for { + for len(validatingKeys) == 0 { + log.Warn(msgNoKeysFetched) select { case <-ctx.Done(): log.Debug("Context closed, exiting fetching validating keys") @@ -68,76 +64,68 @@ func (v *validator) internalWaitForActivation(ctx context.Context, accountsChang case <-accountsChangedChan: // if the accounts changed try it again return v.internalWaitForActivation(ctx, accountsChangedChan) - default: - if len(validatingKeys) == 0 { - remainingTime := params.BeaconConfig().SecondsPerSlot - log.Debugf("Waiting for active validator keys... secs: %d", remainingTime) - time.Sleep(time.Duration(remainingTime) * time.Second) // sleep for the rest of the slot - continue - } - stream, err := v.validatorClient.WaitForActivation(ctx, ðpb.ValidatorActivationRequest{ - PublicKeys: bytesutil.FromBytes48Array(validatingKeys), - }) - if err != nil { - tracing.AnnotateError(span, err) - attempts := streamAttempts(ctx) - log.WithError(err).WithField("attempts", attempts). - Error("Stream broken while waiting for activation. Reconnecting...") - // Reconnection attempt backoff, up to 60s. - time.Sleep(time.Second * time.Duration(math.Min(uint64(attempts), 60))) - return v.internalWaitForActivation(incrementRetries(ctx), accountsChangedChan) - } + } + } - // Recv polls for validator statuses - res, err := stream.Recv() - // If the stream is closed, we stop the loop. - if errors.Is(err, io.EOF) { - break - } - // If context is canceled we return from the function. - if ctx.Err() == context.Canceled { - return errors.Wrap(ctx.Err(), "context has been canceled so shutting down the loop") - } - if err != nil { - tracing.AnnotateError(span, err) - attempts := streamAttempts(ctx) - log.WithError(err).WithField("attempts", attempts). - Error("Stream broken while waiting for activation. Reconnecting...") - // Reconnection attempt backoff, up to 60s. - time.Sleep(time.Second * time.Duration(math.Min(uint64(attempts), 60))) - return v.internalWaitForActivation(incrementRetries(ctx), accountsChangedChan) - } + stream, err := v.validatorClient.WaitForActivation(ctx, ðpb.ValidatorActivationRequest{ + PublicKeys: bytesutil.FromBytes48Array(validatingKeys), + }) + if err != nil { + tracing.AnnotateError(span, err) + attempts := streamAttempts(ctx) + log.WithError(err).WithField("attempts", attempts). + Error("Stream broken while waiting for activation. Reconnecting...") + // Reconnection attempt backoff, up to 60s. + time.Sleep(time.Second * time.Duration(math.Min(uint64(attempts), 60))) + return v.internalWaitForActivation(incrementRetries(ctx), accountsChangedChan) + } - statuses := make([]*validatorStatus, len(res.Statuses)) - for i, s := range res.Statuses { - statuses[i] = &validatorStatus{ - publicKey: s.PublicKey, - status: s.Status, - index: s.Index, - } - } + // Recv polls for validator statuses + res, err := stream.Recv() + // If the stream is closed, we stop the loop. + if errors.Is(err, io.EOF) { + log.Warn("validator wait for activation stream closed...") + return nil + } + // If context is canceled we return from the function. + if ctx.Err() == context.Canceled { + return errors.Wrap(ctx.Err(), "context has been canceled so shutting down the loop") + } + if err != nil { + tracing.AnnotateError(span, err) + attempts := streamAttempts(ctx) + log.WithError(err).WithField("attempts", attempts). + Error("Stream broken while waiting for activation. Reconnecting...") + // Reconnection attempt backoff, up to 60s. + time.Sleep(time.Second * time.Duration(math.Min(uint64(attempts), 60))) + return v.internalWaitForActivation(incrementRetries(ctx), accountsChangedChan) + } + + statuses := make([]*validatorStatus, len(res.Statuses)) + for i, s := range res.Statuses { + statuses[i] = &validatorStatus{ + publicKey: s.PublicKey, + status: s.Status, + index: s.Index, + } + } - // "-1" indicates that validator count endpoint is not supported by the beacon node. - var valCount int64 = -1 - valCounts, err := v.prysmBeaconClient.GetValidatorCount(ctx, "head", []validator2.Status{validator2.Active}) - if err != nil && !errors.Is(err, iface.ErrNotSupported) { - return errors.Wrap(err, "could not get active validator count") - } + // "-1" indicates that validator count endpoint is not supported by the beacon node. + var valCount int64 = -1 + valCounts, err := v.prysmBeaconClient.GetValidatorCount(ctx, "head", []validator2.Status{validator2.Active}) + if err != nil && !errors.Is(err, iface.ErrNotSupported) { + return errors.Wrap(err, "could not get active validator count") + } - if len(valCounts) > 0 { - valCount = int64(valCounts[0].Count) - } + if len(valCounts) > 0 { + valCount = int64(valCounts[0].Count) + } - if !v.checkAndLogValidatorStatus(statuses, valCount) { - continue - } - } - // If a validator is active, break out of this loop - break + if !v.checkAndLogValidatorStatus(statuses, valCount) { + // if it's not activated try this process again + return v.internalWaitForActivation(ctx, accountsChangedChan) } - // reset the ticker when they are all active - v.ticker = slots.NewSlotTicker(time.Unix(int64(v.genesisTime), 0), params.BeaconConfig().SecondsPerSlot) return nil } From e9d6615ebea4732cb264a88bf13cf6042992a841 Mon Sep 17 00:00:00 2001 From: james-prysm Date: Thu, 11 Jan 2024 16:16:41 -0600 Subject: [PATCH 09/20] removing the loop entirely --- validator/client/wait_for_activation.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/validator/client/wait_for_activation.go b/validator/client/wait_for_activation.go index 0fab2682708a..06c0f4cfcb6f 100644 --- a/validator/client/wait_for_activation.go +++ b/validator/client/wait_for_activation.go @@ -54,8 +54,8 @@ func (v *validator) internalWaitForActivation(ctx context.Context, accountsChang if err != nil { return errors.Wrap(err, msgCouldNotFetchKeys) } - // loop while there are no validator keys ... - for len(validatingKeys) == 0 { + // if there are no validating keys, wait for some + if len(validatingKeys) == 0 { log.Warn(msgNoKeysFetched) select { case <-ctx.Done(): From d5326f507c8c930b12db0eae1e73581c1c415a5f Mon Sep 17 00:00:00 2001 From: james-prysm Date: Thu, 11 Jan 2024 16:40:19 -0600 Subject: [PATCH 10/20] fixing unit test --- validator/client/validator_test.go | 37 -------------------- validator/client/wait_for_activation_test.go | 37 ++++++++++++++++++++ 2 files changed, 37 insertions(+), 37 deletions(-) diff --git a/validator/client/validator_test.go b/validator/client/validator_test.go index 8ea977848006..55aea89d7c3c 100644 --- a/validator/client/validator_test.go +++ b/validator/client/validator_test.go @@ -388,43 +388,6 @@ func TestWaitMultipleActivation_LogsActivationEpochOK(t *testing.T) { require.LogsContain(t, hook, "Validator activated") } -func TestWaitActivation_NotAllValidatorsActivatedOK(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - validatorClient := validatormock.NewMockValidatorClient(ctrl) - beaconClient := validatormock.NewMockBeaconChainClient(ctrl) - prysmBeaconClient := validatormock.NewMockPrysmBeaconChainClient(ctrl) - - kp := randKeypair(t) - v := validator{ - validatorClient: validatorClient, - keyManager: newMockKeymanager(t, kp), - beaconClient: beaconClient, - prysmBeaconClient: prysmBeaconClient, - } - resp := generateMockStatusResponse([][]byte{kp.pub[:]}) - resp.Statuses[0].Status.Status = ethpb.ValidatorStatus_ACTIVE - clientStream := mock2.NewMockBeaconNodeValidator_WaitForActivationClient(ctrl) - validatorClient.EXPECT().WaitForActivation( - gomock.Any(), - gomock.Any(), - ).Return(clientStream, nil) - prysmBeaconClient.EXPECT().GetValidatorCount( - gomock.Any(), - "head", - []validatorType.Status{validatorType.Active}, - ).Return([]iface.ValidatorCount{}, nil).Times(2) - clientStream.EXPECT().Recv().Return( - ðpb.ValidatorActivationResponse{}, - nil, - ) - clientStream.EXPECT().Recv().Return( - resp, - nil, - ) - assert.NoError(t, v.WaitForActivation(context.Background(), nil), "Could not wait for activation") -} - func TestWaitSync_ContextCanceled(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/validator/client/wait_for_activation_test.go b/validator/client/wait_for_activation_test.go index a637e289b231..199c5de22fb2 100644 --- a/validator/client/wait_for_activation_test.go +++ b/validator/client/wait_for_activation_test.go @@ -410,3 +410,40 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) { assert.LogsContain(t, hook, "Validator activated") }) } + +func TestWaitActivation_NotAllValidatorsActivatedOK(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + validatorClient := validatormock.NewMockValidatorClient(ctrl) + beaconClient := validatormock.NewMockBeaconChainClient(ctrl) + prysmBeaconClient := validatormock.NewMockPrysmBeaconChainClient(ctrl) + + kp := randKeypair(t) + v := validator{ + validatorClient: validatorClient, + keyManager: newMockKeymanager(t, kp), + beaconClient: beaconClient, + prysmBeaconClient: prysmBeaconClient, + } + resp := generateMockStatusResponse([][]byte{kp.pub[:]}) + resp.Statuses[0].Status.Status = ethpb.ValidatorStatus_ACTIVE + clientStream := mock.NewMockBeaconNodeValidator_WaitForActivationClient(ctrl) + validatorClient.EXPECT().WaitForActivation( + gomock.Any(), + gomock.Any(), + ).Return(clientStream, nil).Times(2) + prysmBeaconClient.EXPECT().GetValidatorCount( + gomock.Any(), + "head", + []validatorType.Status{validatorType.Active}, + ).Return([]iface.ValidatorCount{}, nil).Times(2) + clientStream.EXPECT().Recv().Return( + ðpb.ValidatorActivationResponse{}, + nil, + ) + clientStream.EXPECT().Recv().Return( + resp, + nil, + ) + assert.NoError(t, v.WaitForActivation(context.Background(), nil), "Could not wait for activation") +} From a991302c66ea2ca1649afc839944be95884c333b Mon Sep 17 00:00:00 2001 From: james-prysm Date: Fri, 12 Jan 2024 15:59:37 -0600 Subject: [PATCH 11/20] fixing manu's reported issue with deletion of json file --- validator/accounts/iface/wallet.go | 2 +- validator/accounts/testing/mock.go | 6 +-- validator/accounts/wallet_create.go | 3 +- validator/keymanager/local/keymanager.go | 26 +++++++++---- validator/keymanager/local/refresh.go | 48 +++++++++++++----------- 5 files changed, 51 insertions(+), 34 deletions(-) diff --git a/validator/accounts/iface/wallet.go b/validator/accounts/iface/wallet.go index 9eb8299fa5b6..b5afbc90aa25 100644 --- a/validator/accounts/iface/wallet.go +++ b/validator/accounts/iface/wallet.go @@ -23,7 +23,7 @@ type Wallet interface { // Read methods for important wallet and accounts-related files. ReadFileAtPath(ctx context.Context, filePath string, fileName string) ([]byte, error) // Write methods to persist important wallet and accounts-related files to disk. - WriteFileAtPath(ctx context.Context, pathName string, fileName string, data []byte) error + WriteFileAtPath(ctx context.Context, pathName string, fileName string, data []byte) (bool, error) // Method for initializing a new keymanager. InitializeKeymanager(ctx context.Context, cfg InitKeymanagerConfig) (keymanager.IKeymanager, error) } diff --git a/validator/accounts/testing/mock.go b/validator/accounts/testing/mock.go index cb903c65ff46..f43ed5130e89 100644 --- a/validator/accounts/testing/mock.go +++ b/validator/accounts/testing/mock.go @@ -55,19 +55,19 @@ func (w *Wallet) Password() string { } // WriteFileAtPath -- -func (w *Wallet) WriteFileAtPath(_ context.Context, pathName, fileName string, data []byte) error { +func (w *Wallet) WriteFileAtPath(_ context.Context, pathName, fileName string, data []byte) (bool, error) { w.lock.Lock() defer w.lock.Unlock() if w.HasWriteFileError { // reset the flag to not contaminate other tests w.HasWriteFileError = false - return errors.New("could not write keystore file for accounts") + return false, errors.New("could not write keystore file for accounts") } if w.Files[pathName] == nil { w.Files[pathName] = make(map[string][]byte) } w.Files[pathName][fileName] = data - return nil + return true, nil } // ReadFileAtPath -- diff --git a/validator/accounts/wallet_create.go b/validator/accounts/wallet_create.go index b1b081bf4f2b..a45393562d68 100644 --- a/validator/accounts/wallet_create.go +++ b/validator/accounts/wallet_create.go @@ -32,7 +32,8 @@ func (acm *CLIManager) WalletCreate(ctx context.Context) (*wallet.Wallet, error) if err != nil { return nil, err } - if err = w.WriteFileAtPath(ctx, local.AccountsPath, local.AccountsKeystoreFileName, encodedAccounts); err != nil { + _, err = w.WriteFileAtPath(ctx, local.AccountsPath, local.AccountsKeystoreFileName, encodedAccounts); + if err != nil { return nil, err } log.WithField("--wallet-dir", acm.walletDir).Info( diff --git a/validator/keymanager/local/keymanager.go b/validator/keymanager/local/keymanager.go index 3b4a2c933f98..ce80fa812244 100644 --- a/validator/keymanager/local/keymanager.go +++ b/validator/keymanager/local/keymanager.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "path/filepath" "strings" "sync" @@ -282,18 +283,27 @@ func (km *Keymanager) SaveStoreAndReInitialize(ctx context.Context, store *accou if err != nil { return err } - if err := km.wallet.WriteFileAtPath(ctx, AccountsPath, AccountsKeystoreFileName, encodedAccounts); err != nil { + + existedPreviously, err := km.wallet.WriteFileAtPath(ctx, AccountsPath, AccountsKeystoreFileName, encodedAccounts) + if err != nil { return err } - // Reinitialize account store and cache - // This will update the in-memory information instead of reading from the file itself for safety concerns - km.accountsStore = store - err = km.initializeKeysCachesFromKeystore() - if err != nil { - return errors.Wrap(err, "failed to initialize keys caches") + if !existedPreviously { + // manually reload the account from the keystore the first time + km.reloadAccountsFromKeystoreFile(filepath.Join(km.wallet.AccountsDir(), AccountsPath, AccountsKeystoreFileName)) + // listen to account changes of the new file + go km.listenForAccountChanges(ctx) + } else { + // Reinitialize account store and cache + // This will update the in-memory information instead of reading from the file itself for safety concerns + km.accountsStore = store + err = km.initializeKeysCachesFromKeystore() + if err != nil { + return errors.Wrap(err, "failed to initialize keys caches") + } } - return err + return nil } // CreateAccountsKeystoreRepresentation is a pure function that takes an accountStore and wallet password and returns the encrypted formatted json version for local writing. diff --git a/validator/keymanager/local/refresh.go b/validator/keymanager/local/refresh.go index 472d42831dd8..7761a184e48f 100644 --- a/validator/keymanager/local/refresh.go +++ b/validator/keymanager/local/refresh.go @@ -26,6 +26,7 @@ func (km *Keymanager) listenForAccountChanges(ctx context.Context) { debounceFileChangesInterval := features.Get().KeystoreImportDebounceInterval accountsFilePath := filepath.Join(km.wallet.AccountsDir(), AccountsPath, AccountsKeystoreFileName) if !file.Exists(accountsFilePath) { + log.Warnf("Starting without accounts located in wallet at %s", accountsFilePath) return } watcher, err := fsnotify.NewWatcher() @@ -56,27 +57,7 @@ func (km *Keymanager) listenForAccountChanges(ctx context.Context) { log.Errorf("Type %T is not a valid file system event", event) return } - fileBytes, err := os.ReadFile(ev.Name) - if err != nil { - log.WithError(err).Errorf("Could not read file at path: %s", ev.Name) - return - } - if fileBytes == nil { - log.WithError(err).Errorf("Loaded in an empty file: %s", ev.Name) - return - } - accountsKeystore := &AccountsKeystoreRepresentation{} - if err := json.Unmarshal(fileBytes, accountsKeystore); err != nil { - log.WithError( - err, - ).Errorf("Could not read valid, EIP-2335 keystore json file at path: %s", ev.Name) - return - } - if err := km.reloadAccountsFromKeystore(accountsKeystore); err != nil { - log.WithError( - err, - ).Error("Could not replace the accounts store from keystore file") - } + km.reloadAccountsFromKeystoreFile(ev.Name) }) for { select { @@ -92,6 +73,30 @@ func (km *Keymanager) listenForAccountChanges(ctx context.Context) { } } +func (km *Keymanager) reloadAccountsFromKeystoreFile(accountsFilePath string) { + fileBytes, err := os.ReadFile(accountsFilePath) + if err != nil { + log.WithError(err).Errorf("Could not read file at path: %s", accountsFilePath) + return + } + if fileBytes == nil { + log.WithError(err).Errorf("Loaded in an empty file: %s", accountsFilePath) + return + } + accountsKeystore := &AccountsKeystoreRepresentation{} + if err := json.Unmarshal(fileBytes, accountsKeystore); err != nil { + log.WithError( + err, + ).Errorf("Could not read valid, EIP-2335 keystore json file at path: %s", accountsFilePath) + return + } + if err := km.reloadAccountsFromKeystore(accountsKeystore); err != nil { + log.WithError( + err, + ).Error("Could not replace the accounts store from keystore file") + } +} + // Replaces the accounts store struct in the local keymanager with // the contents of a keystore file by decrypting it with the accounts password. func (km *Keymanager) reloadAccountsFromKeystore(keystore *AccountsKeystoreRepresentation) error { @@ -107,6 +112,7 @@ func (km *Keymanager) reloadAccountsFromKeystore(keystore *AccountsKeystoreRepre if len(newAccountsStore.PublicKeys) != len(newAccountsStore.PrivateKeys) { return errors.New("number of public and private keys in keystore do not match") } + pubKeys := make([][fieldparams.BLSPubkeyLength]byte, len(newAccountsStore.PublicKeys)) for i := 0; i < len(newAccountsStore.PrivateKeys); i++ { privKey, err := bls.SecretKeyFromBytes(newAccountsStore.PrivateKeys[i]) From fc5180b5d6fcc04d5b387e6f04a158162c1ae2d9 Mon Sep 17 00:00:00 2001 From: james-prysm Date: Fri, 12 Jan 2024 16:07:41 -0600 Subject: [PATCH 12/20] missed change around writefile at path --- validator/accounts/wallet/wallet.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/validator/accounts/wallet/wallet.go b/validator/accounts/wallet/wallet.go index acbc07a1e998..fc97d577f29d 100644 --- a/validator/accounts/wallet/wallet.go +++ b/validator/accounts/wallet/wallet.go @@ -366,26 +366,27 @@ func (w *Wallet) InitializeKeymanager(ctx context.Context, cfg iface.InitKeymana } // WriteFileAtPath within the wallet directory given the desired path, filename, and raw data. -func (w *Wallet) WriteFileAtPath(_ context.Context, filePath, fileName string, data []byte) error { +func (w *Wallet) WriteFileAtPath(_ context.Context, filePath, fileName string, data []byte) (bool /* exited previously */, error) { accountPath := filepath.Join(w.accountsPath, filePath) hasDir, err := file.HasDir(accountPath) if err != nil { - return err + return false, err } if !hasDir { if err := file.MkdirAll(accountPath); err != nil { - return errors.Wrapf(err, "could not create path: %s", accountPath) + return false, errors.Wrapf(err, "could not create path: %s", accountPath) } } fullPath := filepath.Join(accountPath, fileName) + existedPreviously := file.Exists(fullPath) if err := file.WriteFile(fullPath, data); err != nil { - return errors.Wrapf(err, "could not write %s", filePath) + return false, errors.Wrapf(err, "could not write %s", filePath) } log.WithFields(logrus.Fields{ "path": fullPath, "fileName": fileName, }).Debug("Wrote new file at path") - return nil + return existedPreviously, nil } // ReadFileAtPath within the wallet directory given the desired path and filename. From 02672d5bcec9e3f3a4599dd2fb726bff67eb504b Mon Sep 17 00:00:00 2001 From: james-prysm Date: Fri, 12 Jan 2024 16:12:06 -0600 Subject: [PATCH 13/20] gofmt --- validator/accounts/wallet_create.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator/accounts/wallet_create.go b/validator/accounts/wallet_create.go index a45393562d68..1e2852863228 100644 --- a/validator/accounts/wallet_create.go +++ b/validator/accounts/wallet_create.go @@ -32,7 +32,7 @@ func (acm *CLIManager) WalletCreate(ctx context.Context) (*wallet.Wallet, error) if err != nil { return nil, err } - _, err = w.WriteFileAtPath(ctx, local.AccountsPath, local.AccountsKeystoreFileName, encodedAccounts); + _, err = w.WriteFileAtPath(ctx, local.AccountsPath, local.AccountsKeystoreFileName, encodedAccounts) if err != nil { return nil, err } From e3e63c65d1633c9407493aa44ccb704aad5fb46f Mon Sep 17 00:00:00 2001 From: james-prysm Date: Fri, 12 Jan 2024 16:31:06 -0600 Subject: [PATCH 14/20] fixing deepsource issue with reading file --- validator/keymanager/local/keymanager.go | 3 +-- validator/keymanager/local/refresh.go | 29 ++++++++++++++++++++++-- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/validator/keymanager/local/keymanager.go b/validator/keymanager/local/keymanager.go index ce80fa812244..e810f62b0f3d 100644 --- a/validator/keymanager/local/keymanager.go +++ b/validator/keymanager/local/keymanager.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "path/filepath" "strings" "sync" @@ -291,7 +290,7 @@ func (km *Keymanager) SaveStoreAndReInitialize(ctx context.Context, store *accou if !existedPreviously { // manually reload the account from the keystore the first time - km.reloadAccountsFromKeystoreFile(filepath.Join(km.wallet.AccountsDir(), AccountsPath, AccountsKeystoreFileName)) + km.reloadAccountsFromKeystoreFile() // listen to account changes of the new file go km.listenForAccountChanges(ctx) } else { diff --git a/validator/keymanager/local/refresh.go b/validator/keymanager/local/refresh.go index 7761a184e48f..8f8bc0a237f2 100644 --- a/validator/keymanager/local/refresh.go +++ b/validator/keymanager/local/refresh.go @@ -57,7 +57,27 @@ func (km *Keymanager) listenForAccountChanges(ctx context.Context) { log.Errorf("Type %T is not a valid file system event", event) return } - km.reloadAccountsFromKeystoreFile(ev.Name) + fileBytes, err := os.ReadFile(ev.Name) + if err != nil { + log.WithError(err).Errorf("Could not read file at path: %s", ev.Name) + return + } + if fileBytes == nil { + log.WithError(err).Errorf("Loaded in an empty file: %s", ev.Name) + return + } + accountsKeystore := &AccountsKeystoreRepresentation{} + if err := json.Unmarshal(fileBytes, accountsKeystore); err != nil { + log.WithError( + err, + ).Errorf("Could not read valid, EIP-2335 keystore json file at path: %s", ev.Name) + return + } + if err := km.reloadAccountsFromKeystore(accountsKeystore); err != nil { + log.WithError( + err, + ).Error("Could not replace the accounts store from keystore file") + } }) for { select { @@ -73,7 +93,12 @@ func (km *Keymanager) listenForAccountChanges(ctx context.Context) { } } -func (km *Keymanager) reloadAccountsFromKeystoreFile(accountsFilePath string) { +func (km *Keymanager) reloadAccountsFromKeystoreFile() { + if km.wallet == nil { + log.Error("Could not reload accounts because wallet was undefined") + return + } + accountsFilePath := filepath.Join(km.wallet.AccountsDir(), AccountsPath, AccountsKeystoreFileName) fileBytes, err := os.ReadFile(accountsFilePath) if err != nil { log.WithError(err).Errorf("Could not read file at path: %s", accountsFilePath) From 580cd9b2655784dc118fe4f6f8524a8d6964cdec Mon Sep 17 00:00:00 2001 From: james-prysm Date: Fri, 12 Jan 2024 16:38:56 -0600 Subject: [PATCH 15/20] trying to clean file to avoid deepsource issue --- validator/keymanager/local/refresh.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator/keymanager/local/refresh.go b/validator/keymanager/local/refresh.go index 8f8bc0a237f2..84017f3a37f5 100644 --- a/validator/keymanager/local/refresh.go +++ b/validator/keymanager/local/refresh.go @@ -98,7 +98,7 @@ func (km *Keymanager) reloadAccountsFromKeystoreFile() { log.Error("Could not reload accounts because wallet was undefined") return } - accountsFilePath := filepath.Join(km.wallet.AccountsDir(), AccountsPath, AccountsKeystoreFileName) + accountsFilePath := filepath.Join(km.wallet.AccountsDir(), AccountsPath, filepath.Clean(AccountsKeystoreFileName)) fileBytes, err := os.ReadFile(accountsFilePath) if err != nil { log.WithError(err).Errorf("Could not read file at path: %s", accountsFilePath) From 12186e3fb1980ef560b791751f60d89eaf873315 Mon Sep 17 00:00:00 2001 From: james-prysm Date: Fri, 12 Jan 2024 16:45:34 -0600 Subject: [PATCH 16/20] still getting error trying a different approach --- validator/keymanager/local/keymanager.go | 3 ++- validator/keymanager/local/refresh.go | 27 +++--------------------- 2 files changed, 5 insertions(+), 25 deletions(-) diff --git a/validator/keymanager/local/keymanager.go b/validator/keymanager/local/keymanager.go index e810f62b0f3d..ce80fa812244 100644 --- a/validator/keymanager/local/keymanager.go +++ b/validator/keymanager/local/keymanager.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "path/filepath" "strings" "sync" @@ -290,7 +291,7 @@ func (km *Keymanager) SaveStoreAndReInitialize(ctx context.Context, store *accou if !existedPreviously { // manually reload the account from the keystore the first time - km.reloadAccountsFromKeystoreFile() + km.reloadAccountsFromKeystoreFile(filepath.Join(km.wallet.AccountsDir(), AccountsPath, AccountsKeystoreFileName)) // listen to account changes of the new file go km.listenForAccountChanges(ctx) } else { diff --git a/validator/keymanager/local/refresh.go b/validator/keymanager/local/refresh.go index 84017f3a37f5..eeaa9e766e4a 100644 --- a/validator/keymanager/local/refresh.go +++ b/validator/keymanager/local/refresh.go @@ -57,27 +57,7 @@ func (km *Keymanager) listenForAccountChanges(ctx context.Context) { log.Errorf("Type %T is not a valid file system event", event) return } - fileBytes, err := os.ReadFile(ev.Name) - if err != nil { - log.WithError(err).Errorf("Could not read file at path: %s", ev.Name) - return - } - if fileBytes == nil { - log.WithError(err).Errorf("Loaded in an empty file: %s", ev.Name) - return - } - accountsKeystore := &AccountsKeystoreRepresentation{} - if err := json.Unmarshal(fileBytes, accountsKeystore); err != nil { - log.WithError( - err, - ).Errorf("Could not read valid, EIP-2335 keystore json file at path: %s", ev.Name) - return - } - if err := km.reloadAccountsFromKeystore(accountsKeystore); err != nil { - log.WithError( - err, - ).Error("Could not replace the accounts store from keystore file") - } + km.reloadAccountsFromKeystoreFile(ev.Name) }) for { select { @@ -93,13 +73,12 @@ func (km *Keymanager) listenForAccountChanges(ctx context.Context) { } } -func (km *Keymanager) reloadAccountsFromKeystoreFile() { +func (km *Keymanager) reloadAccountsFromKeystoreFile(accountsFilePath string) { if km.wallet == nil { log.Error("Could not reload accounts because wallet was undefined") return } - accountsFilePath := filepath.Join(km.wallet.AccountsDir(), AccountsPath, filepath.Clean(AccountsKeystoreFileName)) - fileBytes, err := os.ReadFile(accountsFilePath) + fileBytes, err := os.ReadFile(filepath.Clean(accountsFilePath)) if err != nil { log.WithError(err).Errorf("Could not read file at path: %s", accountsFilePath) return From 1def8b881609b335a469b81f296b59c5dacb40de Mon Sep 17 00:00:00 2001 From: james-prysm Date: Fri, 12 Jan 2024 22:49:15 -0600 Subject: [PATCH 17/20] fixing stream loop --- validator/client/wait_for_activation.go | 87 +++++++++++++------------ 1 file changed, 47 insertions(+), 40 deletions(-) diff --git a/validator/client/wait_for_activation.go b/validator/client/wait_for_activation.go index 06c0f4cfcb6f..61bbd6810c1b 100644 --- a/validator/client/wait_for_activation.go +++ b/validator/client/wait_for_activation.go @@ -80,50 +80,57 @@ func (v *validator) internalWaitForActivation(ctx context.Context, accountsChang return v.internalWaitForActivation(incrementRetries(ctx), accountsChangedChan) } - // Recv polls for validator statuses - res, err := stream.Recv() - // If the stream is closed, we stop the loop. - if errors.Is(err, io.EOF) { - log.Warn("validator wait for activation stream closed...") - return nil - } - // If context is canceled we return from the function. - if ctx.Err() == context.Canceled { - return errors.Wrap(ctx.Err(), "context has been canceled so shutting down the loop") - } - if err != nil { - tracing.AnnotateError(span, err) - attempts := streamAttempts(ctx) - log.WithError(err).WithField("attempts", attempts). - Error("Stream broken while waiting for activation. Reconnecting...") - // Reconnection attempt backoff, up to 60s. - time.Sleep(time.Second * time.Duration(math.Min(uint64(attempts), 60))) - return v.internalWaitForActivation(incrementRetries(ctx), accountsChangedChan) - } + someAreActive := false + for !someAreActive { + select { + case <-ctx.Done(): + log.Debug("Context closed, exiting fetching validating keys") + return ctx.Err() + case <-accountsChangedChan: + // Accounts (keys) changed, restart the process. + return v.internalWaitForActivation(ctx, accountsChangedChan) + default: + res, err := (stream).Recv() // retrieve from stream one loop at a time + // If the stream is closed, we stop the loop. + if errors.Is(err, io.EOF) { + break + } + // If context is canceled we return from the function. + if ctx.Err() == context.Canceled { + return errors.Wrap(ctx.Err(), "context has been canceled so shutting down the loop") + } + if err != nil { + tracing.AnnotateError(span, err) + attempts := streamAttempts(ctx) + log.WithError(err).WithField("attempts", attempts). + Error("Stream broken while waiting for activation. Reconnecting...") + // Reconnection attempt backoff, up to 60s. + time.Sleep(time.Second * time.Duration(math.Min(uint64(attempts), 60))) + return v.internalWaitForActivation(incrementRetries(ctx), accountsChangedChan) + } - statuses := make([]*validatorStatus, len(res.Statuses)) - for i, s := range res.Statuses { - statuses[i] = &validatorStatus{ - publicKey: s.PublicKey, - status: s.Status, - index: s.Index, - } - } + statuses := make([]*validatorStatus, len(res.Statuses)) + for i, s := range res.Statuses { + statuses[i] = &validatorStatus{ + publicKey: s.PublicKey, + status: s.Status, + index: s.Index, + } + } - // "-1" indicates that validator count endpoint is not supported by the beacon node. - var valCount int64 = -1 - valCounts, err := v.prysmBeaconClient.GetValidatorCount(ctx, "head", []validator2.Status{validator2.Active}) - if err != nil && !errors.Is(err, iface.ErrNotSupported) { - return errors.Wrap(err, "could not get active validator count") - } + // "-1" indicates that validator count endpoint is not supported by the beacon node. + var valCount int64 = -1 + valCounts, err := v.prysmBeaconClient.GetValidatorCount(ctx, "head", []validator2.Status{validator2.Active}) + if err != nil && !errors.Is(err, iface.ErrNotSupported) { + return errors.Wrap(err, "could not get active validator count") + } - if len(valCounts) > 0 { - valCount = int64(valCounts[0].Count) - } + if len(valCounts) > 0 { + valCount = int64(valCounts[0].Count) + } - if !v.checkAndLogValidatorStatus(statuses, valCount) { - // if it's not activated try this process again - return v.internalWaitForActivation(ctx, accountsChangedChan) + someAreActive = v.checkAndLogValidatorStatus(statuses, valCount) + } } return nil From 959e4a43bc099cdbd74ac48e98a12e73fba85029 Mon Sep 17 00:00:00 2001 From: james-prysm Date: Fri, 12 Jan 2024 23:01:41 -0600 Subject: [PATCH 18/20] fixing unit test --- validator/client/wait_for_activation_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator/client/wait_for_activation_test.go b/validator/client/wait_for_activation_test.go index 199c5de22fb2..25c8ca39096f 100644 --- a/validator/client/wait_for_activation_test.go +++ b/validator/client/wait_for_activation_test.go @@ -431,7 +431,7 @@ func TestWaitActivation_NotAllValidatorsActivatedOK(t *testing.T) { validatorClient.EXPECT().WaitForActivation( gomock.Any(), gomock.Any(), - ).Return(clientStream, nil).Times(2) + ).Return(clientStream, nil) prysmBeaconClient.EXPECT().GetValidatorCount( gomock.Any(), "head", From 90eec051396052a409cd3d43adacfc9ead634f42 Mon Sep 17 00:00:00 2001 From: james-prysm <90280386+james-prysm@users.noreply.github.com> Date: Tue, 16 Jan 2024 09:14:32 -0600 Subject: [PATCH 19/20] Update validator/keymanager/local/keymanager.go Co-authored-by: Manu NALEPA --- validator/keymanager/local/keymanager.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/validator/keymanager/local/keymanager.go b/validator/keymanager/local/keymanager.go index ce80fa812244..039884e44597 100644 --- a/validator/keymanager/local/keymanager.go +++ b/validator/keymanager/local/keymanager.go @@ -289,12 +289,7 @@ func (km *Keymanager) SaveStoreAndReInitialize(ctx context.Context, store *accou return err } - if !existedPreviously { - // manually reload the account from the keystore the first time - km.reloadAccountsFromKeystoreFile(filepath.Join(km.wallet.AccountsDir(), AccountsPath, AccountsKeystoreFileName)) - // listen to account changes of the new file - go km.listenForAccountChanges(ctx) - } else { + if existedPreviously { // Reinitialize account store and cache // This will update the in-memory information instead of reading from the file itself for safety concerns km.accountsStore = store @@ -302,6 +297,14 @@ func (km *Keymanager) SaveStoreAndReInitialize(ctx context.Context, store *accou if err != nil { return errors.Wrap(err, "failed to initialize keys caches") } + + return nil + } + + // manually reload the account from the keystore the first time + km.reloadAccountsFromKeystoreFile(filepath.Join(km.wallet.AccountsDir(), AccountsPath, AccountsKeystoreFileName)) + // listen to account changes of the new file + go km.listenForAccountChanges(ctx) } return nil } From 3ac8251cc1d8425e0f982e8a855da3c439b40b84 Mon Sep 17 00:00:00 2001 From: james-prysm Date: Tue, 16 Jan 2024 09:24:00 -0600 Subject: [PATCH 20/20] fixing linting --- validator/keymanager/local/keymanager.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/validator/keymanager/local/keymanager.go b/validator/keymanager/local/keymanager.go index 039884e44597..4cae2ed8f299 100644 --- a/validator/keymanager/local/keymanager.go +++ b/validator/keymanager/local/keymanager.go @@ -289,7 +289,7 @@ func (km *Keymanager) SaveStoreAndReInitialize(ctx context.Context, store *accou return err } - if existedPreviously { + if existedPreviously { // Reinitialize account store and cache // This will update the in-memory information instead of reading from the file itself for safety concerns km.accountsStore = store @@ -297,15 +297,14 @@ func (km *Keymanager) SaveStoreAndReInitialize(ctx context.Context, store *accou if err != nil { return errors.Wrap(err, "failed to initialize keys caches") } - + return nil } - - // manually reload the account from the keystore the first time - km.reloadAccountsFromKeystoreFile(filepath.Join(km.wallet.AccountsDir(), AccountsPath, AccountsKeystoreFileName)) - // listen to account changes of the new file - go km.listenForAccountChanges(ctx) - } + + // manually reload the account from the keystore the first time + km.reloadAccountsFromKeystoreFile(filepath.Join(km.wallet.AccountsDir(), AccountsPath, AccountsKeystoreFileName)) + // listen to account changes of the new file + go km.listenForAccountChanges(ctx) return nil }