diff --git a/beacon-chain/rpc/BUILD.bazel b/beacon-chain/rpc/BUILD.bazel index 0d63ef84f782..cc056de83d33 100644 --- a/beacon-chain/rpc/BUILD.bazel +++ b/beacon-chain/rpc/BUILD.bazel @@ -24,6 +24,7 @@ go_library( "//beacon-chain/operations/synccommittee:go_default_library", "//beacon-chain/operations/voluntaryexits:go_default_library", "//beacon-chain/p2p:go_default_library", + "//beacon-chain/rpc/core:go_default_library", "//beacon-chain/rpc/eth/beacon:go_default_library", "//beacon-chain/rpc/eth/builder:go_default_library", "//beacon-chain/rpc/eth/debug:go_default_library", diff --git a/beacon-chain/rpc/core/BUILD.bazel b/beacon-chain/rpc/core/BUILD.bazel index 9d156c648c85..6c382efa00ec 100644 --- a/beacon-chain/rpc/core/BUILD.bazel +++ b/beacon-chain/rpc/core/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "errors.go", "log.go", + "service.go", "validator.go", ], importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core", @@ -20,6 +21,7 @@ go_library( "//beacon-chain/core/transition:go_default_library", "//beacon-chain/operations/synccommittee:go_default_library", "//beacon-chain/p2p:go_default_library", + "//beacon-chain/sync:go_default_library", "//config/fieldparams:go_default_library", "//config/params:go_default_library", "//consensus-types/primitives:go_default_library", diff --git a/beacon-chain/rpc/core/service.go b/beacon-chain/rpc/core/service.go new file mode 100644 index 000000000000..bfea185e827c --- /dev/null +++ b/beacon-chain/rpc/core/service.go @@ -0,0 +1,18 @@ +package core + +import ( + "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain" + opfeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/operation" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" +) + +type Service struct { + HeadFetcher blockchain.HeadFetcher + GenesisTimeFetcher blockchain.TimeFetcher + SyncChecker sync.Checker + Broadcaster p2p.Broadcaster + SyncCommitteePool synccommittee.Pool + OperationNotifier opfeed.Notifier +} diff --git a/beacon-chain/rpc/core/validator.go b/beacon-chain/rpc/core/validator.go index a9df7c665ec7..c91b653bce1c 100644 --- a/beacon-chain/rpc/core/validator.go +++ b/beacon-chain/rpc/core/validator.go @@ -5,10 +5,8 @@ import ( "context" "fmt" "sort" - "time" "github.com/pkg/errors" - "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/altair" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/epoch/precompute" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed" @@ -16,8 +14,6 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers" coreTime "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition" - "github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee" - "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" @@ -48,18 +44,21 @@ func (e *AggregateBroadcastFailedError) Error() string { // ComputeValidatorPerformance reports the validator's latest balance along with other important metrics on // rewards and penalties throughout its lifecycle in the beacon chain. -func ComputeValidatorPerformance( +func (s *Service) ComputeValidatorPerformance( ctx context.Context, req *ethpb.ValidatorPerformanceRequest, - headFetcher blockchain.HeadFetcher, - currSlot primitives.Slot, ) (*ethpb.ValidatorPerformanceResponse, *RpcError) { - headState, err := headFetcher.HeadState(ctx) + if s.SyncChecker.Syncing() { + return nil, &RpcError{Reason: Unavailable, Err: errors.New("Syncing to latest head, not ready to respond")} + } + + headState, err := s.HeadFetcher.HeadState(ctx) if err != nil { return nil, &RpcError{Err: errors.Wrap(err, "could not get head state"), Reason: Internal} } + currSlot := s.GenesisTimeFetcher.CurrentSlot() if currSlot > headState.Slot() { - headRoot, err := headFetcher.HeadRoot(ctx) + headRoot, err := s.HeadFetcher.HeadRoot(ctx) if err != nil { return nil, &RpcError{Err: errors.Wrap(err, "could not get head root"), Reason: Internal} } @@ -200,21 +199,18 @@ func ComputeValidatorPerformance( // SubmitSignedContributionAndProof is called by a sync committee aggregator // to submit signed contribution and proof object. -func SubmitSignedContributionAndProof( +func (s *Service) SubmitSignedContributionAndProof( ctx context.Context, - s *ethpb.SignedContributionAndProof, - broadcaster p2p.Broadcaster, - pool synccommittee.Pool, - notifier opfeed.Notifier, + req *ethpb.SignedContributionAndProof, ) *RpcError { errs, ctx := errgroup.WithContext(ctx) // Broadcasting and saving contribution into the pool in parallel. As one fail should not affect another. errs.Go(func() error { - return broadcaster.Broadcast(ctx, s) + return s.Broadcaster.Broadcast(ctx, req) }) - if err := pool.SaveSyncCommitteeContribution(s.Message.Contribution); err != nil { + if err := s.SyncCommitteePool.SaveSyncCommitteeContribution(req.Message.Contribution); err != nil { return &RpcError{Err: err, Reason: Internal} } @@ -224,10 +220,10 @@ func SubmitSignedContributionAndProof( return &RpcError{Err: err, Reason: Internal} } - notifier.OperationFeed().Send(&feed.Event{ + s.OperationNotifier.OperationFeed().Send(&feed.Event{ Type: opfeed.SyncCommitteeContributionReceived, Data: &opfeed.SyncCommitteeContributionReceivedData{ - Contribution: s, + Contribution: req, }, }) @@ -235,11 +231,9 @@ func SubmitSignedContributionAndProof( } // SubmitSignedAggregateSelectionProof verifies given aggregate and proofs and publishes them on appropriate gossipsub topic. -func SubmitSignedAggregateSelectionProof( +func (s *Service) SubmitSignedAggregateSelectionProof( ctx context.Context, req *ethpb.SignedAggregateSubmitRequest, - genesisTime time.Time, - broadcaster p2p.Broadcaster, ) *RpcError { if req.SignedAggregateAndProof == nil || req.SignedAggregateAndProof.Message == nil || req.SignedAggregateAndProof.Message.Aggregate == nil || req.SignedAggregateAndProof.Message.Aggregate.Data == nil { @@ -253,11 +247,11 @@ func SubmitSignedAggregateSelectionProof( // As a preventive measure, a beacon node shouldn't broadcast an attestation whose slot is out of range. if err := helpers.ValidateAttestationTime(req.SignedAggregateAndProof.Message.Aggregate.Data.Slot, - genesisTime, params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil { + s.GenesisTimeFetcher.GenesisTime(), params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil { return &RpcError{Err: errors.New("attestation slot is no longer valid from current time"), Reason: BadRequest} } - if err := broadcaster.Broadcast(ctx, req.SignedAggregateAndProof); err != nil { + if err := s.Broadcaster.Broadcast(ctx, req.SignedAggregateAndProof); err != nil { return &RpcError{Err: &AggregateBroadcastFailedError{err: err}, Reason: Internal} } diff --git a/beacon-chain/rpc/eth/validator/BUILD.bazel b/beacon-chain/rpc/eth/validator/BUILD.bazel index 5e23cf1a44b3..915cb4a49888 100644 --- a/beacon-chain/rpc/eth/validator/BUILD.bazel +++ b/beacon-chain/rpc/eth/validator/BUILD.bazel @@ -67,6 +67,7 @@ go_test( "//beacon-chain/operations/attestations:go_default_library", "//beacon-chain/operations/synccommittee:go_default_library", "//beacon-chain/p2p/testing:go_default_library", + "//beacon-chain/rpc/core:go_default_library", "//beacon-chain/rpc/prysm/v1alpha1/validator:go_default_library", "//beacon-chain/rpc/testutil:go_default_library", "//beacon-chain/state:go_default_library", diff --git a/beacon-chain/rpc/eth/validator/handlers.go b/beacon-chain/rpc/eth/validator/handlers.go index 65c50c3f198f..d9fbbd8636e7 100644 --- a/beacon-chain/rpc/eth/validator/handlers.go +++ b/beacon-chain/rpc/eth/validator/handlers.go @@ -105,7 +105,7 @@ func (s *Server) SubmitContributionAndProofs(w http.ResponseWriter, r *http.Requ http2.HandleError(w, "Could not convert request contribution to consensus contribution: "+err.Error(), http.StatusBadRequest) return } - rpcError := core.SubmitSignedContributionAndProof(r.Context(), consensusItem, s.Broadcaster, s.SyncCommitteePool, s.OperationNotifier) + rpcError := s.CoreService.SubmitSignedContributionAndProof(r.Context(), consensusItem) if rpcError != nil { http2.HandleError(w, rpcError.Err.Error(), core.ErrorReasonToHTTP(rpcError.Reason)) } @@ -131,8 +131,6 @@ func (s *Server) SubmitAggregateAndProofs(w http.ResponseWriter, r *http.Request return } - genesisTime := s.TimeFetcher.GenesisTime() - broadcastFailed := false for _, item := range req.Data { consensusItem, err := item.ToConsensus() @@ -140,11 +138,9 @@ func (s *Server) SubmitAggregateAndProofs(w http.ResponseWriter, r *http.Request http2.HandleError(w, "Could not convert request aggregate to consensus aggregate: "+err.Error(), http.StatusBadRequest) return } - rpcError := core.SubmitSignedAggregateSelectionProof( + rpcError := s.CoreService.SubmitSignedAggregateSelectionProof( r.Context(), ðpbalpha.SignedAggregateSubmitRequest{SignedAggregateAndProof: consensusItem}, - genesisTime, - s.Broadcaster, ) if rpcError != nil { _, ok := rpcError.Err.(*core.AggregateBroadcastFailedError) diff --git a/beacon-chain/rpc/eth/validator/handlers_test.go b/beacon-chain/rpc/eth/validator/handlers_test.go index 2c0f65a83178..72b835ee8feb 100644 --- a/beacon-chain/rpc/eth/validator/handlers_test.go +++ b/beacon-chain/rpc/eth/validator/handlers_test.go @@ -13,6 +13,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/attestations" "github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee" p2pmock "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core" fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" "github.com/prysmaticlabs/prysm/v4/crypto/bls" "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" @@ -287,14 +288,16 @@ func TestGetAggregateAttestation_SameSlotAndRoot_ReturnMostAggregationBits(t *te } func TestSubmitContributionAndProofs(t *testing.T) { - s := &Server{ + c := &core.Service{ OperationNotifier: (&mockChain.ChainService{}).OperationNotifier(), } + s := &Server{CoreService: c} + t.Run("single", func(t *testing.T) { broadcaster := &p2pmock.MockBroadcaster{} - s.Broadcaster = broadcaster - s.SyncCommitteePool = synccommittee.NewStore() + c.Broadcaster = broadcaster + c.SyncCommitteePool = synccommittee.NewStore() var body bytes.Buffer _, err := body.WriteString(singleContribution) @@ -306,15 +309,15 @@ func TestSubmitContributionAndProofs(t *testing.T) { s.SubmitContributionAndProofs(writer, request) assert.Equal(t, http.StatusOK, writer.Code) assert.Equal(t, 1, len(broadcaster.BroadcastMessages)) - contributions, err := s.SyncCommitteePool.SyncCommitteeContributions(1) + contributions, err := c.SyncCommitteePool.SyncCommitteeContributions(1) require.NoError(t, err) assert.Equal(t, 1, len(contributions)) }) t.Run("multiple", func(t *testing.T) { broadcaster := &p2pmock.MockBroadcaster{} - s.Broadcaster = broadcaster - s.SyncCommitteePool = synccommittee.NewStore() + c.Broadcaster = broadcaster + c.SyncCommitteePool = synccommittee.NewStore() var body bytes.Buffer _, err := body.WriteString(multipleContributions) @@ -326,13 +329,13 @@ func TestSubmitContributionAndProofs(t *testing.T) { s.SubmitContributionAndProofs(writer, request) assert.Equal(t, http.StatusOK, writer.Code) assert.Equal(t, 2, len(broadcaster.BroadcastMessages)) - contributions, err := s.SyncCommitteePool.SyncCommitteeContributions(1) + contributions, err := c.SyncCommitteePool.SyncCommitteeContributions(1) require.NoError(t, err) assert.Equal(t, 2, len(contributions)) }) t.Run("invalid", func(t *testing.T) { - s.SyncCommitteePool = synccommittee.NewStore() + c.SyncCommitteePool = synccommittee.NewStore() var body bytes.Buffer _, err := body.WriteString(invalidContribution) @@ -349,7 +352,7 @@ func TestSubmitContributionAndProofs(t *testing.T) { }) t.Run("no body", func(t *testing.T) { - s.SyncCommitteePool = synccommittee.NewStore() + c.SyncCommitteePool = synccommittee.NewStore() request := httptest.NewRequest(http.MethodPost, "http://example.com", nil) writer := httptest.NewRecorder() @@ -365,13 +368,17 @@ func TestSubmitContributionAndProofs(t *testing.T) { } func TestSubmitAggregateAndProofs(t *testing.T) { + c := &core.Service{ + GenesisTimeFetcher: &mockChain.ChainService{}, + } + s := &Server{ - TimeFetcher: &mockChain.ChainService{}, + CoreService: c, } t.Run("single", func(t *testing.T) { broadcaster := &p2pmock.MockBroadcaster{} - s.Broadcaster = broadcaster + c.Broadcaster = broadcaster var body bytes.Buffer _, err := body.WriteString(singleAggregate) @@ -387,8 +394,8 @@ func TestSubmitAggregateAndProofs(t *testing.T) { t.Run("multiple", func(t *testing.T) { broadcaster := &p2pmock.MockBroadcaster{} - s.Broadcaster = broadcaster - s.SyncCommitteePool = synccommittee.NewStore() + c.Broadcaster = broadcaster + c.SyncCommitteePool = synccommittee.NewStore() var body bytes.Buffer _, err := body.WriteString(multipleAggregates) diff --git a/beacon-chain/rpc/eth/validator/server.go b/beacon-chain/rpc/eth/validator/server.go index 03bcfe894b24..780387104d1f 100644 --- a/beacon-chain/rpc/eth/validator/server.go +++ b/beacon-chain/rpc/eth/validator/server.go @@ -9,6 +9,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/attestations" "github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee" "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core" "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/lookup" "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" @@ -32,4 +33,5 @@ type Server struct { BeaconDB db.HeadAccessDatabase BlockBuilder builder.BlockBuilder OperationNotifier operation.Notifier + CoreService *core.Service } diff --git a/beacon-chain/rpc/prysm/v1alpha1/beacon/BUILD.bazel b/beacon-chain/rpc/prysm/v1alpha1/beacon/BUILD.bazel index c524c6193da9..0d78673fde0b 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/beacon/BUILD.bazel +++ b/beacon-chain/rpc/prysm/v1alpha1/beacon/BUILD.bazel @@ -103,6 +103,7 @@ go_test( "//beacon-chain/operations/attestations:go_default_library", "//beacon-chain/operations/slashings:go_default_library", "//beacon-chain/p2p/testing:go_default_library", + "//beacon-chain/rpc/core:go_default_library", "//beacon-chain/state:go_default_library", "//beacon-chain/state/state-native:go_default_library", "//beacon-chain/state/stategen:go_default_library", diff --git a/beacon-chain/rpc/prysm/v1alpha1/beacon/server.go b/beacon-chain/rpc/prysm/v1alpha1/beacon/server.go index 7d782e5549cd..f64a9a0b9605 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/beacon/server.go +++ b/beacon-chain/rpc/prysm/v1alpha1/beacon/server.go @@ -17,6 +17,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/attestations" "github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/slashings" "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen" "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" @@ -47,4 +48,5 @@ type Server struct { SyncChecker sync.Checker ReplayerBuilder stategen.ReplayerBuilder OptimisticModeFetcher blockchain.OptimisticModeFetcher + CoreService *core.Service } diff --git a/beacon-chain/rpc/prysm/v1alpha1/beacon/validators.go b/beacon-chain/rpc/prysm/v1alpha1/beacon/validators.go index a30225d45072..6659e20f055d 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/beacon/validators.go +++ b/beacon-chain/rpc/prysm/v1alpha1/beacon/validators.go @@ -659,11 +659,7 @@ func (bs *Server) GetValidatorQueue( func (bs *Server) GetValidatorPerformance( ctx context.Context, req *ethpb.ValidatorPerformanceRequest, ) (*ethpb.ValidatorPerformanceResponse, error) { - if bs.SyncChecker.Syncing() { - return nil, status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond") - } - currSlot := bs.GenesisTimeFetcher.CurrentSlot() - response, err := core.ComputeValidatorPerformance(ctx, req, bs.HeadFetcher, currSlot) + response, err := bs.CoreService.ComputeValidatorPerformance(ctx, req) if err != nil { return nil, status.Errorf(core.ErrorReasonToGRPC(err.Reason), "Could not compute validator performance: %v", err.Err) } diff --git a/beacon-chain/rpc/prysm/v1alpha1/beacon/validators_test.go b/beacon-chain/rpc/prysm/v1alpha1/beacon/validators_test.go index 0f4e40acf3bf..a59b2272fe6e 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/beacon/validators_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/beacon/validators_test.go @@ -19,6 +19,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/db" dbTest "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing" doublylinkedtree "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/doubly-linked-tree" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state" state_native "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/state-native" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen" @@ -1797,7 +1798,9 @@ func TestGetValidatorPerformance_Syncing(t *testing.T) { ctx := context.Background() bs := &Server{ - SyncChecker: &mockSync.Sync{IsSyncing: true}, + CoreService: &core.Service{ + SyncChecker: &mockSync.Sync{IsSyncing: true}, + }, } wanted := "Syncing to latest head, not ready to respond" @@ -1857,11 +1860,13 @@ func TestGetValidatorPerformance_OK(t *testing.T) { require.NoError(t, headState.SetBalances([]uint64{100, 101, 102})) offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot)) bs := &Server{ - HeadFetcher: &mock.ChainService{ - State: headState, + CoreService: &core.Service{ + HeadFetcher: &mock.ChainService{ + State: headState, + }, + GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, }, - GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, } want := ðpb.ValidatorPerformanceResponse{ PublicKeys: [][]byte{publicKey2[:], publicKey3[:]}, @@ -1918,12 +1923,14 @@ func TestGetValidatorPerformance_Indices(t *testing.T) { require.NoError(t, headState.SetValidators(validators)) offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot)) bs := &Server{ - HeadFetcher: &mock.ChainService{ - // 10 epochs into the future. - State: headState, + CoreService: &core.Service{ + HeadFetcher: &mock.ChainService{ + // 10 epochs into the future. + State: headState, + }, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, }, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, } c := headState.Copy() vp, bp, err := precompute.New(ctx, c) @@ -1988,12 +1995,14 @@ func TestGetValidatorPerformance_IndicesPubkeys(t *testing.T) { offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot)) bs := &Server{ - HeadFetcher: &mock.ChainService{ - // 10 epochs into the future. - State: headState, + CoreService: &core.Service{ + HeadFetcher: &mock.ChainService{ + // 10 epochs into the future. + State: headState, + }, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, }, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, } c := headState.Copy() vp, bp, err := precompute.New(ctx, c) @@ -2064,11 +2073,13 @@ func TestGetValidatorPerformanceAltair_OK(t *testing.T) { require.NoError(t, headState.SetBalances([]uint64{100, 101, 102})) offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot)) bs := &Server{ - HeadFetcher: &mock.ChainService{ - State: headState, + CoreService: &core.Service{ + HeadFetcher: &mock.ChainService{ + State: headState, + }, + GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, }, - GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, } want := ðpb.ValidatorPerformanceResponse{ PublicKeys: [][]byte{publicKey2[:], publicKey3[:]}, @@ -2132,11 +2143,13 @@ func TestGetValidatorPerformanceBellatrix_OK(t *testing.T) { require.NoError(t, headState.SetBalances([]uint64{100, 101, 102})) offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot)) bs := &Server{ - HeadFetcher: &mock.ChainService{ - State: headState, + CoreService: &core.Service{ + HeadFetcher: &mock.ChainService{ + State: headState, + }, + GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, }, - GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, } want := ðpb.ValidatorPerformanceResponse{ PublicKeys: [][]byte{publicKey2[:], publicKey3[:]}, @@ -2200,11 +2213,13 @@ func TestGetValidatorPerformanceCapella_OK(t *testing.T) { require.NoError(t, headState.SetBalances([]uint64{100, 101, 102})) offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot)) bs := &Server{ - HeadFetcher: &mock.ChainService{ - State: headState, + CoreService: &core.Service{ + HeadFetcher: &mock.ChainService{ + State: headState, + }, + GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, }, - GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, } want := ðpb.ValidatorPerformanceResponse{ PublicKeys: [][]byte{publicKey2[:], publicKey3[:]}, diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/aggregator.go b/beacon-chain/rpc/prysm/v1alpha1/validator/aggregator.go index 0b972b2d2db4..b02ac3734e66 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/aggregator.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/aggregator.go @@ -113,7 +113,7 @@ func (vs *Server) SubmitSignedAggregateSelectionProof( ctx context.Context, req *ethpb.SignedAggregateSubmitRequest, ) (*ethpb.SignedAggregateSubmitResponse, error) { - if err := core.SubmitSignedAggregateSelectionProof(ctx, req, vs.TimeFetcher.GenesisTime(), vs.P2P); err != nil { + if err := vs.CoreService.SubmitSignedAggregateSelectionProof(ctx, req); err != nil { return nil, status.Errorf(core.ErrorReasonToGRPC(err.Reason), "Could not submit aggregate: %v", err.Err) } return ðpb.SignedAggregateSubmitResponse{}, nil diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/aggregator_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/aggregator_test.go index 18597d735ba4..b897deac43bc 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/aggregator_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/aggregator_test.go @@ -12,6 +12,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/signing" "github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/attestations" mockp2p "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state" state_native "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/state-native" mockSync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/initial-sync/testing" @@ -431,7 +432,11 @@ func TestSubmitSignedAggregateSelectionProof_ZeroHashesSignatures(t *testing.T) func TestSubmitSignedAggregateSelectionProof_InvalidSlot(t *testing.T) { c := &mock.ChainService{Genesis: time.Now()} - aggregatorServer := &Server{TimeFetcher: c} + aggregatorServer := &Server{ + CoreService: &core.Service{ + GenesisTimeFetcher: c, + }, + } req := ðpb.SignedAggregateSubmitRequest{ SignedAggregateAndProof: ðpb.SignedAggregateAttestationAndProof{ Signature: []byte{'a'}, diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/server.go b/beacon-chain/rpc/prysm/v1alpha1/validator/server.go index 3cc14207e469..bffa6d4a2c96 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/server.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/server.go @@ -23,6 +23,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee" "github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/voluntaryexits" "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core" "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen" "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" @@ -74,6 +75,7 @@ type Server struct { BlockBuilder builder.BlockBuilder BLSChangesPool blstoexec.PoolManager ClockWaiter startup.ClockWaiter + CoreService *core.Service } // WaitForActivation checks if a validator public key exists in the active validator registry of the current diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/sync_committee.go b/beacon-chain/rpc/prysm/v1alpha1/validator/sync_committee.go index 5fecd4f90d6b..07a0726543fd 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/sync_committee.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/sync_committee.go @@ -120,7 +120,7 @@ func (vs *Server) GetSyncCommitteeContribution( func (vs *Server) SubmitSignedContributionAndProof( ctx context.Context, s *ethpb.SignedContributionAndProof, ) (*emptypb.Empty, error) { - err := core.SubmitSignedContributionAndProof(ctx, s, vs.P2P, vs.SyncCommitteePool, vs.OperationNotifier) + err := vs.CoreService.SubmitSignedContributionAndProof(ctx, s) if err != nil { return &emptypb.Empty{}, status.Errorf(core.ErrorReasonToGRPC(err.Reason), err.Err.Error()) } diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/sync_committee_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/sync_committee_test.go index 61f342fb108f..28ccd2fb0051 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/sync_committee_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/sync_committee_test.go @@ -11,6 +11,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition" "github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee" mockp2p "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core" fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" @@ -137,9 +138,11 @@ func TestGetSyncCommitteeContribution_FiltersDuplicates(t *testing.T) { func TestSubmitSignedContributionAndProof_OK(t *testing.T) { server := &Server{ - SyncCommitteePool: synccommittee.NewStore(), - P2P: &mockp2p.MockBroadcaster{}, - OperationNotifier: (&mock.ChainService{}).OperationNotifier(), + CoreService: &core.Service{ + SyncCommitteePool: synccommittee.NewStore(), + Broadcaster: &mockp2p.MockBroadcaster{}, + OperationNotifier: (&mock.ChainService{}).OperationNotifier(), + }, } contribution := ðpb.SignedContributionAndProof{ Message: ðpb.ContributionAndProof{ @@ -151,21 +154,23 @@ func TestSubmitSignedContributionAndProof_OK(t *testing.T) { } _, err := server.SubmitSignedContributionAndProof(context.Background(), contribution) require.NoError(t, err) - savedMsgs, err := server.SyncCommitteePool.SyncCommitteeContributions(1) + savedMsgs, err := server.CoreService.SyncCommitteePool.SyncCommitteeContributions(1) require.NoError(t, err) require.DeepEqual(t, []*ethpb.SyncCommitteeContribution{contribution.Message.Contribution}, savedMsgs) } func TestSubmitSignedContributionAndProof_Notification(t *testing.T) { server := &Server{ - SyncCommitteePool: synccommittee.NewStore(), - P2P: &mockp2p.MockBroadcaster{}, - OperationNotifier: (&mock.ChainService{}).OperationNotifier(), + CoreService: &core.Service{ + SyncCommitteePool: synccommittee.NewStore(), + Broadcaster: &mockp2p.MockBroadcaster{}, + OperationNotifier: (&mock.ChainService{}).OperationNotifier(), + }, } // Subscribe to operation notifications. opChannel := make(chan *feed.Event, 1024) - opSub := server.OperationNotifier.OperationFeed().Subscribe(opChannel) + opSub := server.CoreService.OperationNotifier.OperationFeed().Subscribe(opChannel) defer opSub.Unsubscribe() contribution := ðpb.SignedContributionAndProof{ diff --git a/beacon-chain/rpc/prysm/validator/BUILD.bazel b/beacon-chain/rpc/prysm/validator/BUILD.bazel index 66fd2e1e1f12..580fc2c71a96 100644 --- a/beacon-chain/rpc/prysm/validator/BUILD.bazel +++ b/beacon-chain/rpc/prysm/validator/BUILD.bazel @@ -26,6 +26,7 @@ go_test( "//beacon-chain/blockchain/testing:go_default_library", "//beacon-chain/core/epoch/precompute:go_default_library", "//beacon-chain/core/helpers:go_default_library", + "//beacon-chain/rpc/core:go_default_library", "//beacon-chain/state:go_default_library", "//beacon-chain/sync/initial-sync/testing:go_default_library", "//config/params:go_default_library", diff --git a/beacon-chain/rpc/prysm/validator/server.go b/beacon-chain/rpc/prysm/validator/server.go index add5d0b575c6..59af3cea550c 100644 --- a/beacon-chain/rpc/prysm/validator/server.go +++ b/beacon-chain/rpc/prysm/validator/server.go @@ -2,6 +2,7 @@ package validator import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core" "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" ) @@ -11,4 +12,5 @@ type Server struct { GenesisTimeFetcher blockchain.TimeFetcher SyncChecker sync.Checker HeadFetcher blockchain.HeadFetcher + CoreService *core.Service } diff --git a/beacon-chain/rpc/prysm/validator/validator_performance.go b/beacon-chain/rpc/prysm/validator/validator_performance.go index 6e1c39b00f2a..b16d82518686 100644 --- a/beacon-chain/rpc/prysm/validator/validator_performance.go +++ b/beacon-chain/rpc/prysm/validator/validator_performance.go @@ -29,12 +29,6 @@ type ValidatorPerformanceResponse struct { // GetValidatorPerformance is an HTTP handler for GetValidatorPerformance. func (vs *Server) GetValidatorPerformance(w http.ResponseWriter, r *http.Request) { - if vs.SyncChecker.Syncing() { - handleHTTPError(w, "Syncing", http.StatusServiceUnavailable) - return - } - ctx := r.Context() - currSlot := vs.GenesisTimeFetcher.CurrentSlot() var req ValidatorPerformanceRequest if r.Body != http.NoBody { if err := json.NewDecoder(r.Body).Decode(&req); err != nil { @@ -42,14 +36,12 @@ func (vs *Server) GetValidatorPerformance(w http.ResponseWriter, r *http.Request return } } - computed, err := core.ComputeValidatorPerformance( - ctx, + computed, err := vs.CoreService.ComputeValidatorPerformance( + r.Context(), ðpb.ValidatorPerformanceRequest{ PublicKeys: req.PublicKeys, Indices: req.Indices, }, - vs.HeadFetcher, - currSlot, ) if err != nil { handleHTTPError(w, "Could not compute validator performance: "+err.Err.Error(), core.ErrorReasonToHTTP(err.Reason)) diff --git a/beacon-chain/rpc/prysm/validator/validator_performance_test.go b/beacon-chain/rpc/prysm/validator/validator_performance_test.go index 55e99c0f071e..e18790913db3 100644 --- a/beacon-chain/rpc/prysm/validator/validator_performance_test.go +++ b/beacon-chain/rpc/prysm/validator/validator_performance_test.go @@ -14,6 +14,7 @@ import ( mock "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/testing" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/epoch/precompute" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state" mockSync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/initial-sync/testing" "github.com/prysmaticlabs/prysm/v4/config/params" @@ -28,12 +29,13 @@ import ( func TestServer_GetValidatorPerformance(t *testing.T) { t.Run("Syncing", func(t *testing.T) { vs := &Server{ - SyncChecker: &mockSync.Sync{IsSyncing: true}, + CoreService: &core.Service{ + SyncChecker: &mockSync.Sync{IsSyncing: true}, + }, } - var buf bytes.Buffer srv := httptest.NewServer(http.HandlerFunc(vs.GetValidatorPerformance)) - req := httptest.NewRequest("POST", "/foo", &buf) + req := httptest.NewRequest("POST", "/foo", nil) client := &http.Client{} rawResp, err := client.Post(srv.URL, "application/json", req.Body) @@ -57,11 +59,13 @@ func TestServer_GetValidatorPerformance(t *testing.T) { offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot)) vs := &Server{ - HeadFetcher: &mock.ChainService{ - State: headState, + CoreService: &core.Service{ + HeadFetcher: &mock.ChainService{ + State: headState, + }, + GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, }, - GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, } want := &ValidatorPerformanceResponse{ PublicKeys: [][]byte{publicKeys[1][:], publicKeys[2][:]}, @@ -111,12 +115,14 @@ func TestServer_GetValidatorPerformance(t *testing.T) { offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot)) vs := &Server{ - HeadFetcher: &mock.ChainService{ - // 10 epochs into the future. - State: headState, + CoreService: &core.Service{ + HeadFetcher: &mock.ChainService{ + // 10 epochs into the future. + State: headState, + }, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, }, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, } c := headState.Copy() vp, bp, err := precompute.New(ctx, c) @@ -174,12 +180,14 @@ func TestServer_GetValidatorPerformance(t *testing.T) { offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot)) vs := &Server{ - HeadFetcher: &mock.ChainService{ - // 10 epochs into the future. - State: headState, + CoreService: &core.Service{ + HeadFetcher: &mock.ChainService{ + // 10 epochs into the future. + State: headState, + }, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, }, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, } c := headState.Copy() vp, bp, err := precompute.New(ctx, c) @@ -243,11 +251,13 @@ func TestServer_GetValidatorPerformance(t *testing.T) { require.NoError(t, headState.SetBalances([]uint64{100, 101, 102})) offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot)) vs := &Server{ - HeadFetcher: &mock.ChainService{ - State: headState, + CoreService: &core.Service{ + HeadFetcher: &mock.ChainService{ + State: headState, + }, + GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, }, - GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, } want := &ValidatorPerformanceResponse{ PublicKeys: [][]byte{publicKeys[1][:], publicKeys[2][:]}, @@ -303,11 +313,13 @@ func TestServer_GetValidatorPerformance(t *testing.T) { require.NoError(t, headState.SetBalances([]uint64{100, 101, 102})) offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot)) vs := &Server{ - HeadFetcher: &mock.ChainService{ - State: headState, + CoreService: &core.Service{ + HeadFetcher: &mock.ChainService{ + State: headState, + }, + GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, }, - GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, } want := &ValidatorPerformanceResponse{ PublicKeys: [][]byte{publicKeys[1][:], publicKeys[2][:]}, @@ -363,11 +375,13 @@ func TestServer_GetValidatorPerformance(t *testing.T) { require.NoError(t, headState.SetBalances([]uint64{100, 101, 102})) offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot)) vs := &Server{ - HeadFetcher: &mock.ChainService{ - State: headState, + CoreService: &core.Service{ + HeadFetcher: &mock.ChainService{ + State: headState, + }, + GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, }, - GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, } want := &ValidatorPerformanceResponse{ PublicKeys: [][]byte{publicKeys[1][:], publicKeys[2][:]}, diff --git a/beacon-chain/rpc/service.go b/beacon-chain/rpc/service.go index 3ba7c0300fb0..39029552bf58 100644 --- a/beacon-chain/rpc/service.go +++ b/beacon-chain/rpc/service.go @@ -30,6 +30,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee" "github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/voluntaryexits" "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core" "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/beacon" rpcBuilder "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/builder" "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/debug" @@ -231,6 +232,15 @@ func (s *Service) Start() { } s.cfg.Router.HandleFunc("/eth/v1/builder/states/{state_id}/expected_withdrawals", builderServer.ExpectedWithdrawals).Methods(http.MethodGet) + coreService := &core.Service{ + HeadFetcher: s.cfg.HeadFetcher, + GenesisTimeFetcher: s.cfg.GenesisTimeFetcher, + SyncChecker: s.cfg.SyncService, + Broadcaster: s.cfg.Broadcaster, + SyncCommitteePool: s.cfg.SyncCommitteeObjectPool, + OperationNotifier: s.cfg.OperationNotifier, + } + validatorServer := &validatorv1alpha1.Server{ Ctx: s.ctx, AttestationCache: cache.NewAttestationCache(), @@ -266,6 +276,7 @@ func (s *Service) Start() { BlockBuilder: s.cfg.BlockBuilder, BLSChangesPool: s.cfg.BLSChangesPool, ClockWaiter: s.cfg.ClockWaiter, + CoreService: coreService, } validatorServerV1 := &validator.Server{ HeadFetcher: s.cfg.HeadFetcher, @@ -283,6 +294,7 @@ func (s *Service) Start() { BeaconDB: s.cfg.BeaconDB, BlockBuilder: s.cfg.BlockBuilder, OperationNotifier: s.cfg.OperationNotifier, + CoreService: coreService, } s.cfg.Router.HandleFunc("/eth/v1/validator/aggregate_attestation", validatorServerV1.GetAggregateAttestation).Methods(http.MethodGet) @@ -384,6 +396,7 @@ func (s *Service) Start() { GenesisTimeFetcher: s.cfg.GenesisTimeFetcher, HeadFetcher: s.cfg.HeadFetcher, SyncChecker: s.cfg.SyncService, + CoreService: coreService, } s.cfg.Router.HandleFunc("/prysm/validators/performance", httpServer.GetValidatorPerformance).Methods(http.MethodPost) s.cfg.Router.HandleFunc("/eth/v2/beacon/blocks", beaconChainServerV1.PublishBlockV2).Methods(http.MethodPost)